出行大数据

需求背景

毕业论文需要用到实际的道路拥堵情况,很幸运的在公众号找到了交通数据实验室分享的城市交通拥堵分析工具,基本可以cover很多人的需求,但是我更擅长于处理json数据而不是表格文件。

此外,我需要拉取高峰时期的道路拥堵情况,数据获取行为放置于服务器且自动进行更符合我的需求。

通过尝试使用城市交通拥堵分析工具后,我决定在Claude的帮助下,一起来完成数据提取,为后面的分析打好基础。

小试牛刀

在阅读实时路况查询说明文档后,便开始了我的尝试。

  1. 输入经纬度及其他参数
    左下->右上,请参考示例代码部分
    没有特殊需求,其他内容默认即可
  2. 输入AK
    复制控制台生成的AK。
# encoding:utf-8
import requests 

# 接口地址
url = "https://api.map.baidu.com/traffic/v1/bound"

# 此处填写你在控制台-应用管理-创建应用后获取的AK
ak = "您的AK"

params = {
    "bounds":    "39.912078,116.464303;39.918276,116.475442",
    "coord_type_input":    "gcj02",
    "coord_type_output":    "gcj02",
    "ak":       ak,

}

response = requests.get(url=url, params=params)
if response:
    print(response.json())

问题来了

经过简单的调用后发现我的边界太大,即对角线超过2公里,无法输出结果。
百度免费提供的api额度为2k/日(实时路况查询),时间间隔过短,做长时间交通路况分析难免不够用。

需求设计

  1. 时间调度管理

    • TimeScheduler类用于管理高峰时段(早高峰和晚高峰),判断当前时间是否在高峰时段内,以及计算距离下一个高峰时段的等待时间。
    • 除高峰时间要求外,本文代码运行时间间隔为10min。
  2. 地理网格划分

    • GeoGrid类将指定的地理区域划分为小网格,以便进行交通状况查询。它计算两点之间的距离,并根据最大对角线长度创建网格。
    • 本文代码已有自动分区能力,注意单次运行消耗即可。
  3. API密钥管理

    • AKManager类管理多个百度地图API密钥,处理密钥的使用和配额控制。它可以重置每日使用次数,获取可用的密钥,并报告使用中的错误。
    • 本文代码预计每次拉取数据消耗100,对于每个API密钥的消耗可自定义设置。
  4. 交通状况查询

    • TrafficMonitor类负责查询每个网格的交通状况。它使用百度地图的交通API,通过query_traffic_for_cell方法查询单个网格的交通数据,并在query_all_traffic方法中查询所有网格。
    • 基础功能。

具体功能总结:

  • 高峰时段判断与等待:在高峰时段内定时查询交通数据,非高峰时段则等待下一个高峰时段。
  • 地理区域网格化:将指定区域划分为多个小网格,便于逐个查询交通状况。
  • API密钥的动态管理:根据使用情况动态选择可用的API密钥,并处理配额超限和错误。
  • 数据存储:将查询结果保存为JSON文件,便于后续分析和使用。

代码分享

import requests
import time
from datetime import datetime, time as dtime
import json
import os
import math
import sys
from typing import List, Tuple, Dict
from collections import defaultdict
import random

class TimeScheduler:
    """时间调度管理器"""
    def __init__(self):
        # 定义早晚高峰时间段
        self.peak_hours = [
            (dtime(7, 0), dtime(9, 0)),   # 早高峰:07:00-09:00
            (dtime(17, 0), dtime(19, 0))  # 晚高峰:17:00-19:00
        ]
    
    def is_peak_hour(self) -> bool:
        """判断当前是否处于高峰时段"""
        current_time = datetime.now().time()
        return any(start <= current_time <= end for start, end in self.peak_hours)
    
    def time_until_next_peak(self) -> int:
        """计算距离下一个高峰时段的等待时间(秒)"""
        current_datetime = datetime.now()
        current_time = current_datetime.time()
        
        current_minutes = current_time.hour * 60 + current_time.minute
        
        peak_minutes = []
        for start, end in self.peak_hours:
            peak_minutes.append((start.hour * 60 + start.minute, 
                               end.hour * 60 + end.minute))
        
        min_wait = 24 * 60
        for start_min, _ in peak_minutes:
            if start_min > current_minutes:
                wait = start_min - current_minutes
                min_wait = min(min_wait, wait)
        
        if min_wait == 24 * 60:
            min_wait = peak_minutes[0][0] + (24 * 60 - current_minutes)
        
        return min_wait * 60

class GeoGrid:
    EARTH_RADIUS = 6371  # 地球半径,单位公里
    
    def __init__(self, min_lng: float, min_lat: float, max_lng: float, max_lat: float, max_diagonal: float = 2.0):
        self.min_lng = min_lng
        self.min_lat = min_lat
        self.max_lng = max_lng
        self.max_lat = max_lat
        self.max_diagonal = max_diagonal
        self.grid_cells = self.create_grid()
    
    def calculate_distance(self, lng1: float, lat1: float, lng2: float, lat2: float) -> float:
        """计算两点间的距离(公里)"""
        rad_lat1 = math.radians(lat1)
        rad_lat2 = math.radians(lat2)
        delta_lat = math.radians(lat2 - lat1)
        delta_lng = math.radians(lng2 - lng1)
        
        a = (math.sin(delta_lat/2) * math.sin(delta_lat/2) +
             math.cos(rad_lat1) * math.cos(rad_lat2) *
             math.sin(delta_lng/2) * math.sin(delta_lng/2))
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
        return self.EARTH_RADIUS * c
    
    def create_grid(self) -> List[Tuple[float, float, float, float]]:
        """将大矩形区域划分为小网格"""
        diagonal = self.calculate_distance(self.min_lng, self.min_lat, self.max_lng, self.max_lat)
        n = math.ceil(diagonal / self.max_diagonal)
        
        lng_step = (self.max_lng - self.min_lng) / n
        lat_step = (self.max_lat - self.min_lat) / n
        
        grid_cells = []
        for i in range(n):
            for j in range(n):
                cell_min_lng = self.min_lng + i * lng_step
                cell_min_lat = self.min_lat + j * lat_step
                cell_max_lng = min(cell_min_lng + lng_step, self.max_lng)
                cell_max_lat = min(cell_min_lat + lat_step, self.max_lat)
                grid_cells.append((cell_min_lng, cell_min_lat, cell_max_lng, cell_max_lat))
        
        return grid_cells

class AKManager:
    """AK管理器:处理多个百度地图API密钥的使用和配额控制"""
    def __init__(self, ak_list: List[str]):
        self.ak_list = ak_list
        self.daily_quota = 18  # 每个AK每天的配额次数
        self.usage_count = defaultdict(int)  # 记录每个AK的使用次数
        self.error_count = defaultdict(int)  # 记录每个AK的错误次数
        self.last_reset = datetime.now().date()
        self.current_ak_index = 0
        self.ak_status = {ak: True for ak in ak_list}  # True表示AK可用
        
    def reset_daily_count(self):
        """每天重置使用次数"""
        current_date = datetime.now().date()
        if current_date > self.last_reset:
            self.usage_count.clear()
            self.error_count.clear()
            self.ak_status = {ak: True for ak in self.ak_list}
            self.last_reset = current_date
            print("已重置所有AK的使用计数")

    def get_next_available_ak(self) -> str:
        """获取下一个可用的AK"""
        self.reset_daily_count()
        
        available_aks = [ak for ak in self.ak_list if self.ak_status[ak]]
        if not available_aks:
            raise Exception("所有AK都已达到配额限制或出现错误")
        
        valid_aks = [ak for ak in available_aks if self.usage_count[ak] < self.daily_quota]
        if valid_aks:
            selected_ak = random.choice(valid_aks)
            self.usage_count[selected_ak] += 1
            return selected_ak
            
        selected_ak = min(available_aks, key=lambda x: self.error_count[x])
        self.usage_count[selected_ak] += 1
        return selected_ak

    def report_error(self, ak: str, error_type: str):
        """报告AK使用时的错误"""
        self.error_count[ak] += 1
        print(f"AK {ak[:8]}... 发生错误: {error_type}")
        
        if error_type == "配额超限" or self.error_count[ak] >= 3:
            self.ak_status[ak] = False
            print(f"AK {ak[:8]}... 已被标记为不可用")

    def get_status_report(self) -> Dict:
        """获取所有AK的状态报告"""
        return {
            'total_aks': len(self.ak_list),
            'available_aks': sum(1 for ak in self.ak_status if self.ak_status[ak]),
            'usage_counts': dict(self.usage_count),
            'error_counts': dict(self.error_count),
            'ak_status': self.ak_status
        }

def ensure_directory(directory_path: str) -> bool:
    """确保目录存在,如果不存在则尝试创建"""
    try:
        script_dir = os.path.dirname(os.path.abspath(__file__))
        full_path = os.path.join(script_dir, directory_path)
        
        if not os.path.exists(full_path):
            os.makedirs(full_path)
            print(f"成功创建数据存储目录: {full_path}")
        return True
    except Exception as e:
        print(f"创建目录失败: {str(e)}")
        print(f"尝试使用的路径: {full_path}")
        return False

class TrafficMonitor:
    def __init__(self, ak_list: List[str]):
        self.url = "https://api.map.baidu.com/traffic/v1/bound"
        self.ak_manager = AKManager(ak_list)
        self.grid = GeoGrid(106.4847586, 29.5319042, 106.5991668, 29.5792822)
        self.scheduler = TimeScheduler()
        
        if not ensure_directory('traffic_data'):
            print("错误:无法创建数据存储目录,程序将退出")
            sys.exit(1)
    
    def query_traffic_for_cell(self, cell: Tuple[float, float, float, float]) -> dict:
        """查询单个网格的交通状况"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                ak = self.ak_manager.get_next_available_ak()
                params = {
                    "bounds": f"{cell[1]},{cell[0]};{cell[3]},{cell[2]}",
                    "road_grade": "0",
                    "coord_type_input": "gcj02",
                    "coord_type_output": "gcj02",
                    "ak": ak
                }
                
                response = requests.get(url=self.url, params=params)
                if response.status_code == 200:
                    data = response.json()
                    if data.get('status') == 302:
                        self.ak_manager.report_error(ak, "配额超限")
                        continue
                        
                    data['query_bounds'] = params['bounds']
                    data['query_timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    return data
                else:
                    self.ak_manager.report_error(ak, f"HTTP错误 {response.status_code}")
                    
            except Exception as e:
                self.ak_manager.report_error(ak, str(e))
                
            if attempt == max_retries - 1:
                print(f"网格查询失败,已重试{max_retries}次")
                return None
                
            time.sleep(1)
    
    def query_all_traffic(self) -> List[dict]:
        """查询所有网格的交通状况"""
        all_results = []
        for cell in self.grid.grid_cells:
            result = self.query_traffic_for_cell(cell)
            if result:
                all_results.append(result)
            time.sleep(0.5)
        
        status = self.ak_manager.get_status_report()
        print("\nAK状态报告:")
        print(f"可用AK数量: {status['available_aks']}/{status['total_aks']}")
        print("使用次数:", {k[:8]: v for k, v in status['usage_counts'].items()})
        
        return all_results
    
    def save_results(self, results: List[dict]):
        """保存查询结果"""
        if not results:
            return
            
        try:
            current_time = datetime.now()
            script_dir = os.path.dirname(os.path.abspath(__file__))
            filename = os.path.join(
                script_dir,
                'traffic_data',
                f"traffic_{current_time.strftime('%Y%m%d_%H%M')}.json"
            )
            
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(results, f, ensure_ascii=False, indent=2)
            
            print(f"数据已保存至: {filename}")
            
        except Exception as e:
            print(f"保存数据时发生错误: {str(e)}")

def main():
    # 系统信息输出
    print("系统信息:")
    print(f"当前工作目录: {os.getcwd()}")
    print(f"脚本位置: {os.path.abspath(__file__)}")
    
    # API密钥列表
    ak_list = [
        "申请的AK",
        "申请的AK"
    ]
    
    monitor = TrafficMonitor(ak_list)
    
    print("\n交通监控程序已启动...")
    print(f"总网格数量: {len(monitor.grid.grid_cells)}")
    print(f"可用AK数量: {len(ak_list)}")
    print("监控时段:07:00-09:00 和 17:00-19:00")
    
    try:
        while True:
            current_time = datetime.now()
            
            if monitor.scheduler.is_peak_hour():
                print(f"\n开始新一轮查询: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
                results = monitor.query_all_traffic()
                monitor.save_results(results)
                print("等待600秒后进行下一轮查询...")
                time.sleep(600)
            else:
                wait_time = monitor.scheduler.time_until_next_peak()
                next_peak = current_time.timestamp() + wait_time
                print(f"\n当前不在监控时段,等待至下一个时段: {datetime.fromtimestamp(next_peak).strftime('%Y-%m-%d %H:%M:%S')}")
                time.sleep(wait_time)
                
    except KeyboardInterrupt:
        print("\n程序已停止")

if __name__ == "__main__":
    main()

自定义设置部分

#ctrl+f搜索关键词
self.daily_quota = 输入预估的使用次数  # 每个AK每天的配额次数
self.grid = GeoGrid(106.4847586, 29.5319042, 106.5991668, 29.5792822) # 以渝中区为例,即拉取矩形对角线坐标

有待改进方向

后续修改可以从并发运行出发,由于获取范围过大,数据保存有一定delay,但是在我的分析中基本没有问题。

再次感谢交通数据实验室对本文的启发与帮助

最后修改:2024 年 12 月 05 日
请我喝杯气泡水?打赏即可~(;´д`)ゞ