下面给你一套可落地的思路与示例代码:用 Python + CV 模型实时感知路口四个方向车流量,按需自适应分配绿灯时长,提升通勤效率。
一、整体方案(端到端)
感知层:路口相机(1 个俯视/斜俯视广角,或每个方向 1 个相机)→ RTSP 视频流
识别跟踪:YOLO 检测(car/bus/truck/motorcycle)+ ByteTrack 多目标跟踪
交通量指标:
车队长度/排队数:在“停止线前的队列区域 ROI”统计低速车辆数
到达率(可选):在“上游计数线”统计单位时间过线数
PCU 权重:按车种折算等效小客车数(car=1, moto=0.3, bus=2, truck=2.5)
控制策略:自适应相位控制(最小绿/最大绿/黄灯/全红、按需求分配绿灯,支持gap-out)
执行层:模拟(UI 可视化/MQTT)或对接信号机(Modbus/RS485/PLC)
监控与安全:异常回落固定配时、日志与告警、夜间/恶劣天气策略
二、技术选型
模型:YOLOv8n/s(ultralytics),或 YOLO-World/RT-DETR(更强更慢)
跟踪:ByteTrack(supervision 库封装)
视频:OpenCV 读取 RTSP
加速硬件:Jetson Orin/Xavier、RTX 30/40 系列;CPU 也可但路口多流会吃力
仓库与数据:COCO 预训练可用,场景适配可微调(UA-DETRAC、BDD100K、VisDrone 等)
三、实现步骤1) 相机与标定
固定相机,尽量俯视减小遮挡;保存一帧清晰画面
标定停止线与“队列区域 Polygon(每个方向一块)”,可用鼠标点多边形或以真实尺寸做单应性标定(推荐)
可选:设置“上游计数线”,用于测到达率
2) 检测与跟踪
只保留车辆类(car/motorcycle/bus/truck)
跟踪获得稳定的 track_id 与位移,估计速度与方向
3) 车流量度量
队列判定:落在队列区域且速度低于阈值即视为排队
队列需求= Σ(车辆PCU权重)
相位(NS/EW)需求=对应两个方向的需求之和
可选:到达率叠加入需求,防止刚到达方向长期饥饿
4) 自适应控制(2 相位示例:南北同绿/东西同绿)
约束:最小绿(min_green)、最大绿(max_green)、黄灯(amber)、全红(all_red)
切换条件:
达到最小绿后,若对向需求显著更大或当前相位“gap-out”(一定时间无车/队列清空),则转黄→全红→对向
达最大绿强制切换
公平性:设置需求阈值与加权,防止某方向长时间不得绿
四、核心 Python 示例(YOLOv8 + ByteTrack + OpenCV + 简易自适应控制)依赖安装:pip install ultralytics supervision==0.18.0 opencv-python numpy
示例仅演示单画面(一个广角包含四个方向)。如果你是四路相机,读四个 RTSP 并在同一时钟下融合各自 ROI 即可。
import time import cv2 import numpy as np from ultralytics import YOLO import supervision as sv # 1) 配置区域(需要你根据画面标注) # 每个方向的“队列区域”顶点(顺时针),像素坐标 QUEUE_ROIS = { "N": [(100, 100), (300, 100), (300, 260), (100, 260)], "S": [(100, 700), (300, 700), (300, 540), (100, 540)], "E": [(1000, 300), (1160, 300), (1160, 500), (1000, 500)], "W": [(20, 300), (180, 300), (180, 500), (20, 500)], } # 如需到达率,额外定义上游计数线(示例) LINE_PAIRS = { "N": ((120, 260), (280, 260)), "S": ((120, 540), (280, 540)), "E": ((1000, 320), (1000, 480)), "W": ((180, 320), (180, 480)), } # PCU 权重 PCU = {2: 1.0, 3: 0.3, 5: 2.0, 7: 2.5} # COCO: 2 car, 3 moto, 5 bus, 7 truck INTEREST_CLASSES = set(PCU.keys()) # 速度阈值(像素/秒),建议完成单应性标定后改为 m/s 阈值如 0.5 m/s QUEUE_SPEED_THRESH_PX = 12.0 # 2) 自适应控制器(两相位:NS/EW) class AdaptiveController: def __init__(self, min_green=10, max_green=45, amber=3, all_red=1, gap_out=3, demand_bias=1.15): self.state = "NS_GREEN" # NS_GREEN, NS_AMBER, ALL_RED_TO_EW, EW_GREEN, EW_AMBER, ALL_RED_TO_NS self.t_state = time.time() self.min_green = min_green self.max_green = max_green self.amber = amber self.all_red = all_red self.gap_out = gap_out self.demand_bias = demand_bias self.last_seen_vehicle = {"NS": time.time(), "EW": time.time()} def update_last_seen(self, phase, has_vehicle): if has_vehicle: self.last_seen_vehicle[phase] = time.time() def time_in_state(self): return time.time() - self.t_state def set_state(self, s): self.state = s self.t_state = time.time() def decide(self, demand_NS, demand_EW, has_NS, has_EW): # 更新“是否有车”观察,用于gap-out self.update_last_seen("NS", has_NS) self.update_last_seen("EW", has_EW) t = self.time_in_state() if self.state == "NS_GREEN": # 强制最小绿 if t < self.min_green: return self.output("NS_GREEN") # 最大绿 if t >= self.max_green: self.set_state("NS_AMBER") return self.output("NS_AMBER") # gap-out 或对向需求更大 ns_gap = time.time() - self.last_seen_vehicle["NS"] if (ns_gap >= self.gap_out and demand_EW > 0) or (demand_EW > demand_NS * self.demand_bias): self.set_state("NS_AMBER") return self.output("NS_AMBER") return self.output("NS_GREEN") if self.state == "NS_AMBER": if t >= self.amber: self.set_state("ALL_RED_TO_EW") return self.output("NS_AMBER") if self.state == "ALL_RED_TO_EW": if t >= self.all_red: self.set_state("EW_GREEN") return self.output("ALL_RED") if self.state == "EW_GREEN": if t < self.min_green: return self.output("EW_GREEN") if t >= self.max_green: self.set_state("EW_AMBER") return self.output("EW_AMBER") ew_gap = time.time() - self.last_seen_vehicle["EW"] if (ew_gap >= self.gap_out and demand_NS > 0) or (demand_NS > demand_EW * self.demand_bias): self.set_state("EW_AMBER") return self.output("EW_AMBER") return self.output("EW_GREEN") if self.state == "EW_AMBER": if t >= self.amber: self.set_state("ALL_RED_TO_NS") return self.output("EW_AMBER") if self.state == "ALL_RED_TO_NS": if t >= self.all_red: self.set_state("NS_GREEN") return self.output("ALL_RED") def output(self, mode): # 返回四个方向的灯色 if mode in ["NS_GREEN", "NS_AMBER"]: return {"N": ("G" if mode=="NS_GREEN" else "Y"), "S": ("G" if mode=="NS_GREEN" else "Y"), "E": "R", "W": "R", "state": self.state} if mode in ["EW_GREEN", "EW_AMBER"]: return {"E": ("G" if mode=="EW_GREEN" else "Y"), "W": ("G" if mode=="EW_GREEN" else "Y"), "N": "R", "S": "R", "state": self.state} # ALL_RED return {"N": "R", "S": "R", "E": "R", "W": "R", "state": self.state} # 3) 工具函数 def center_bottom(xyxy): x1, y1, x2, y2 = xyxy return (int((x1+x2)/2), int(y2)) def point_in_poly(pt, poly): # poly: list[(x,y)] res = cv2.pointPolygonTest(np.array(poly, np.int32), pt, False) return res >= 0 # 4) 初始化 model = YOLO("yolov8n.pt") tracker = sv.ByteTrack(track_thresh=0.25, track_buffer=30, match_thresh=0.8) cap = cv2.VideoCapture("your_rtsp_or_video.mp4") # 替换为 RTSP # 计数线(可选) line_zones = {k: sv.LineZone(sv.Point(*p1), sv.Point(*p2)) for k,(p1,p2) in LINE_PAIRS.items()} line_annotators = {k: sv.LineZoneAnnotator(thickness=2, text_thickness=1, text_scale=0.5) for k in line_zones} # 轨迹速度缓存 last_pos = {} # track_id -> (x,y) last_time = {} # track_id -> timestamp controller = AdaptiveController() while True: ok, frame = cap.read() if not ok: break ts = time.time() # 检测 res = model(frame, verbose=False)[0] det = sv.Detections.from_ultralytics(res) mask = np.array([cls in INTEREST_CLASSES for cls in det.class_id], dtype=bool) det = det[mask] # 跟踪 tracked = tracker.update_with_detections(det) # 计算速度与所属方向(基于队列区域) # 聚合每个方向的队列PCU和是否有车 demand = {d: 0.0 for d in QUEUE_ROIS.keys()} has_vehicle = {d: False for d in ["N", "S", "E", "W"]} for i in range(len(tracked)): xyxy = tracked.xyxy[i] cls = int(tracked.class_id[i]) tid = int(tracked.tracker_id[i]) if tracked.tracker_id is not None else -1 c = center_bottom(xyxy) # 速度估计 v = 0.0 if tid != -1: if tid in last_pos: dt = ts - last_time[tid] if dt > 0: dx = c[0] - last_pos[tid][0] dy = c[1] - last_pos[tid][1] v = (dx*dx + dy*dy) ** 0.5 / dt # px/s last_pos[tid] = c last_time[tid] = ts # 属于哪个方向的队列区域 for d, poly in QUEUE_ROIS.items(): if point_in_poly(c, poly): has_vehicle[d] = True # 排队:速度低于阈值 if v <= QUEUE_SPEED_THRESH_PX: demand[d] += PCU.get(cls, 1.0) break # 到达率(可选):过线统计 for k, lz in line_zones.items(): lz.trigger(tracked) # supervision 会用 bbox 的中心进行跨线判断 # 合并相位需求(两相位) demand_NS = demand["N"] + demand["S"] demand_EW = demand["E"] + demand["W"] has_NS = has_vehicle["N"] or has_vehicle["S"] has_EW = has_vehicle["E"] or has_vehicle["W"] lights = controller.decide(demand_NS, demand_EW, has_NS, has_EW) # 叠加可视化 for d, poly in QUEUE_ROIS.items(): color = (0,255,0) if (lights[d] == "G") else ((0,255,255) if lights[d]=="Y" else (0,0,255)) cv2.polylines(frame, [np.array(poly, np.int32)], True, color, 2) cv2.putText(frame, f"{d}: {lights[d]} q_pcu={demand[d]:.1f}", (poly[0][0], poly[0][1]-8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA) # 可选:画计数线 for k, lz in line_zones.items(): line_annotators[k].annotate(frame, lz) cv2.putText(frame, f"Phase={lights['state']} | NS_demand={demand_NS:.1f} EW_demand={demand_EW:.1f}", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (50, 220, 50), 2) cv2.imshow("Adaptive Intersection", frame) if cv2.waitKey(1) & 0xFF == 27: break cap.release() cv2.destroyAllWindows()
说明与关键点
ROI/线段请按你的画面实测点位填写。队列区域建议覆盖停止线前约 30–80 米(按道路限速/存储空间决定)。
队列判定靠“低速 + 区域内”,更稳健可增加“朝向向量与车道方向夹角限制”(即只有朝向停止线方向的才算)。
若做单应性标定(推荐),可将速度阈值/队列长度直接用米和 m/s,鲁棒性更高。
五、单应性标定(可选但强烈建议)
在画面选取 4 个地面点(如车道边角/停止线角点),在真实世界给出对应坐标(米),求单应矩阵 H。
将像素坐标映射到鸟瞰平面后,速度、距离、队列长度均可用物理量。
示例(片段):
# src: 图像坐标, dst: 实际地面坐标(米) src = np.float32([[x1,y1],[x2,y2],[x3,y3],[x4,y4]]) dst = np.float32([[0,0],[W,0],[W,H],[0,H]]) # 设定一个W×H的实际标定平面 H, _ = cv2.findHomography(src, dst) def warp_point(pt, H): px = np.array([pt[0], pt[1], 1.0], dtype=np.float32) wx = H @ px wx /= wx[2] return (float(wx[0]), float(wx[1])) # 米
六、控制策略建议
约束参数参考:min_green 8–12 s,max_green 40–80 s,amber 3–5 s,全红 1–2 s;精确值需依据路口净宽、设计车速、ITE/GB 标准计算。
需求构造:Demand = 队列PCU + α·到达率(PCU/s)。α 取 5–15 可以把“即将排队”的方向提前获绿。
防饿死:设置 demand_bias(如 1.1–1.3),以及最大红灯等待上限(到点强制切换)。
行人/左转相位:若有专用相位,扩展为 4 相位或多相位 FSM;注意相位冲突矩阵与清空时间。
七、工程落地要点
稳定性:恶劣天气/夜间/逆光 → 调参(阈值/曝光)、模型微调、红外补光;置信度过低时回落到固定配时方案。
延迟:单路口总延迟目标 < 200 ms;模型推理 + 编解码为主耗时。
隐私合规:不做人脸/车牌识别;只保留统计与轨迹要素。
硬件:Jetson Orin 运行 4×1080p@15–25fps 可行;或服务器侧汇聚。
对接信号机:强制遵从本地标准与安规;通过 PLC/Modbus 设置线圈输出;务必有看门狗与故障安全态(全红闪)。
八、测试与评估
先用历史视频离线回放验证,再上实时;关键指标:平均延误、排队长度、通行量、相位切换频次。
可借助仿真(SUMO/CityFlow)把“检测到的需求→相位控制”闭环验证。
网友回复
新手如何使用yolo进行图像数据标注与图像识别?
threejs如何将glb三维模型减少面数和定点数粗糙一些减少文件体积大小?
在哪可以免费下载各国的武器飞机坦克航母舰艇导弹三维模型glb文件?
aistudio中build如何破解gemini的api模型调用免费使用?
python如何让红绿灯看懂两侧车流智能调节红绿灯?
python+faster_whisper如何实现实时开会录音并转成文字?
python如何调用微信本地的ocr进行图片文字提取?
有没有浏览器js就能离线运行支持mcp的ai模型?
c#如何调用gemini api实现文本描述来自动化操作电脑软件?
c#如何读取正在打开的word软件正在编辑的内容进行修改操作?