下面给你一套可落地的思路与示例代码:用 Python + CV 模型实时感知路口四个方向车流量,按需自适应分配绿灯时长,提升通勤效率。
一、整体方案(端到端)
感知层:路口相机(1 个俯视/斜俯视广角,或每个方向 1 个相机)→ RTSP 视频流
识别跟踪:YOLO 检测(car/bus/truck/motorcycle)+ ByteTrack 多目标跟踪
交通量指标:
车队长度/排队数:在“停止线前的队列区域 ROI”统计低速车辆数
到达率(可选):在“上游计数线”统计单位时间过线数
PCU 权重:按车种折算等效小客车数(car=1, moto=0.3, bus=2, truck=2.5)
控制策略:自适应相位控制(最小绿/最大绿/黄灯/全红、按需求分配绿灯,支持gap-out)
执行层:模拟(UI 可视化/MQTT)或对接信号机(Modbus/RS485/PLC)
监控与安全:异常回落固定配时、日志与告警、夜间/恶劣天气策略
二、技术选型
模型:YOLOv8n/s(ultralytics),或 YOLO-World/RT-DETR(更强更慢)
跟踪:ByteTrack(supervision 库封装)
视频:OpenCV 读取 RTSP
加速硬件:Jetson Orin/Xavier、RTX 30/40 系列;CPU 也可但路口多流会吃力
仓库与数据:COCO 预训练可用,场景适配可微调(UA-DETRAC、BDD100K、VisDrone 等)
三、实现步骤1) 相机与标定
固定相机,尽量俯视减小遮挡;保存一帧清晰画面
标定停止线与“队列区域 Polygon(每个方向一块)”,可用鼠标点多边形或以真实尺寸做单应性标定(推荐)
可选:设置“上游计数线”,用于测到达率
2) 检测与跟踪
只保留车辆类(car/motorcycle/bus/truck)
跟踪获得稳定的 track_id 与位移,估计速度与方向
3) 车流量度量
队列判定:落在队列区域且速度低于阈值即视为排队
队列需求= Σ(车辆PCU权重)
相位(NS/EW)需求=对应两个方向的需求之和
可选:到达率叠加入需求,防止刚到达方向长期饥饿
4) 自适应控制(2 相位示例:南北同绿/东西同绿)
约束:最小绿(min_green)、最大绿(max_green)、黄灯(amber)、全红(all_red)
切换条件:
达到最小绿后,若对向需求显著更大或当前相位“gap-out”(一定时间无车/队列清空),则转黄→全红→对向
达最大绿强制切换
公平性:设置需求阈值与加权,防止某方向长时间不得绿
四、核心 Python 示例(YOLOv8 + ByteTrack + OpenCV + 简易自适应控制)依赖安装:pip install ultralytics supervision==0.18.0 opencv-python numpy
示例仅演示单画面(一个广角包含四个方向)。如果你是四路相机,读四个 RTSP 并在同一时钟下融合各自 ROI 即可。
import time
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
# 1) 配置区域(需要你根据画面标注)
# 每个方向的“队列区域”顶点(顺时针),像素坐标
QUEUE_ROIS = {
"N": [(100, 100), (300, 100), (300, 260), (100, 260)],
"S": [(100, 700), (300, 700), (300, 540), (100, 540)],
"E": [(1000, 300), (1160, 300), (1160, 500), (1000, 500)],
"W": [(20, 300), (180, 300), (180, 500), (20, 500)],
}
# 如需到达率,额外定义上游计数线(示例)
LINE_PAIRS = {
"N": ((120, 260), (280, 260)),
"S": ((120, 540), (280, 540)),
"E": ((1000, 320), (1000, 480)),
"W": ((180, 320), (180, 480)),
}
# PCU 权重
PCU = {2: 1.0, 3: 0.3, 5: 2.0, 7: 2.5} # COCO: 2 car, 3 moto, 5 bus, 7 truck
INTEREST_CLASSES = set(PCU.keys())
# 速度阈值(像素/秒),建议完成单应性标定后改为 m/s 阈值如 0.5 m/s
QUEUE_SPEED_THRESH_PX = 12.0
# 2) 自适应控制器(两相位:NS/EW)
class AdaptiveController:
def __init__(self, min_green=10, max_green=45, amber=3, all_red=1, gap_out=3, demand_bias=1.15):
self.state = "NS_GREEN" # NS_GREEN, NS_AMBER, ALL_RED_TO_EW, EW_GREEN, EW_AMBER, ALL_RED_TO_NS
self.t_state = time.time()
self.min_green = min_green
self.max_green = max_green
self.amber = amber
self.all_red = all_red
self.gap_out = gap_out
self.demand_bias = demand_bias
self.last_seen_vehicle = {"NS": time.time(), "EW": time.time()}
def update_last_seen(self, phase, has_vehicle):
if has_vehicle:
self.last_seen_vehicle[phase] = time.time()
def time_in_state(self):
return time.time() - self.t_state
def set_state(self, s):
self.state = s
self.t_state = time.time()
def decide(self, demand_NS, demand_EW, has_NS, has_EW):
# 更新“是否有车”观察,用于gap-out
self.update_last_seen("NS", has_NS)
self.update_last_seen("EW", has_EW)
t = self.time_in_state()
if self.state == "NS_GREEN":
# 强制最小绿
if t < self.min_green:
return self.output("NS_GREEN")
# 最大绿
if t >= self.max_green:
self.set_state("NS_AMBER")
return self.output("NS_AMBER")
# gap-out 或对向需求更大
ns_gap = time.time() - self.last_seen_vehicle["NS"]
if (ns_gap >= self.gap_out and demand_EW > 0) or (demand_EW > demand_NS * self.demand_bias):
self.set_state("NS_AMBER")
return self.output("NS_AMBER")
return self.output("NS_GREEN")
if self.state == "NS_AMBER":
if t >= self.amber:
self.set_state("ALL_RED_TO_EW")
return self.output("NS_AMBER")
if self.state == "ALL_RED_TO_EW":
if t >= self.all_red:
self.set_state("EW_GREEN")
return self.output("ALL_RED")
if self.state == "EW_GREEN":
if t < self.min_green:
return self.output("EW_GREEN")
if t >= self.max_green:
self.set_state("EW_AMBER")
return self.output("EW_AMBER")
ew_gap = time.time() - self.last_seen_vehicle["EW"]
if (ew_gap >= self.gap_out and demand_NS > 0) or (demand_NS > demand_EW * self.demand_bias):
self.set_state("EW_AMBER")
return self.output("EW_AMBER")
return self.output("EW_GREEN")
if self.state == "EW_AMBER":
if t >= self.amber:
self.set_state("ALL_RED_TO_NS")
return self.output("EW_AMBER")
if self.state == "ALL_RED_TO_NS":
if t >= self.all_red:
self.set_state("NS_GREEN")
return self.output("ALL_RED")
def output(self, mode):
# 返回四个方向的灯色
if mode in ["NS_GREEN", "NS_AMBER"]:
return {"N": ("G" if mode=="NS_GREEN" else "Y"),
"S": ("G" if mode=="NS_GREEN" else "Y"),
"E": "R", "W": "R", "state": self.state}
if mode in ["EW_GREEN", "EW_AMBER"]:
return {"E": ("G" if mode=="EW_GREEN" else "Y"),
"W": ("G" if mode=="EW_GREEN" else "Y"),
"N": "R", "S": "R", "state": self.state}
# ALL_RED
return {"N": "R", "S": "R", "E": "R", "W": "R", "state": self.state}
# 3) 工具函数
def center_bottom(xyxy):
x1, y1, x2, y2 = xyxy
return (int((x1+x2)/2), int(y2))
def point_in_poly(pt, poly):
# poly: list[(x,y)]
res = cv2.pointPolygonTest(np.array(poly, np.int32), pt, False)
return res >= 0
# 4) 初始化
model = YOLO("yolov8n.pt")
tracker = sv.ByteTrack(track_thresh=0.25, track_buffer=30, match_thresh=0.8)
cap = cv2.VideoCapture("your_rtsp_or_video.mp4") # 替换为 RTSP
# 计数线(可选)
line_zones = {k: sv.LineZone(sv.Point(*p1), sv.Point(*p2)) for k,(p1,p2) in LINE_PAIRS.items()}
line_annotators = {k: sv.LineZoneAnnotator(thickness=2, text_thickness=1, text_scale=0.5) for k in line_zones}
# 轨迹速度缓存
last_pos = {} # track_id -> (x,y)
last_time = {} # track_id -> timestamp
controller = AdaptiveController()
while True:
ok, frame = cap.read()
if not ok:
break
ts = time.time()
# 检测
res = model(frame, verbose=False)[0]
det = sv.Detections.from_ultralytics(res)
mask = np.array([cls in INTEREST_CLASSES for cls in det.class_id], dtype=bool)
det = det[mask]
# 跟踪
tracked = tracker.update_with_detections(det)
# 计算速度与所属方向(基于队列区域)
# 聚合每个方向的队列PCU和是否有车
demand = {d: 0.0 for d in QUEUE_ROIS.keys()}
has_vehicle = {d: False for d in ["N", "S", "E", "W"]}
for i in range(len(tracked)):
xyxy = tracked.xyxy[i]
cls = int(tracked.class_id[i])
tid = int(tracked.tracker_id[i]) if tracked.tracker_id is not None else -1
c = center_bottom(xyxy)
# 速度估计
v = 0.0
if tid != -1:
if tid in last_pos:
dt = ts - last_time[tid]
if dt > 0:
dx = c[0] - last_pos[tid][0]
dy = c[1] - last_pos[tid][1]
v = (dx*dx + dy*dy) ** 0.5 / dt # px/s
last_pos[tid] = c
last_time[tid] = ts
# 属于哪个方向的队列区域
for d, poly in QUEUE_ROIS.items():
if point_in_poly(c, poly):
has_vehicle[d] = True
# 排队:速度低于阈值
if v <= QUEUE_SPEED_THRESH_PX:
demand[d] += PCU.get(cls, 1.0)
break
# 到达率(可选):过线统计
for k, lz in line_zones.items():
lz.trigger(tracked) # supervision 会用 bbox 的中心进行跨线判断
# 合并相位需求(两相位)
demand_NS = demand["N"] + demand["S"]
demand_EW = demand["E"] + demand["W"]
has_NS = has_vehicle["N"] or has_vehicle["S"]
has_EW = has_vehicle["E"] or has_vehicle["W"]
lights = controller.decide(demand_NS, demand_EW, has_NS, has_EW)
# 叠加可视化
for d, poly in QUEUE_ROIS.items():
color = (0,255,0) if (lights[d] == "G") else ((0,255,255) if lights[d]=="Y" else (0,0,255))
cv2.polylines(frame, [np.array(poly, np.int32)], True, color, 2)
cv2.putText(frame, f"{d}: {lights[d]} q_pcu={demand[d]:.1f}", (poly[0][0], poly[0][1]-8),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA)
# 可选:画计数线
for k, lz in line_zones.items():
line_annotators[k].annotate(frame, lz)
cv2.putText(frame, f"Phase={lights['state']} | NS_demand={demand_NS:.1f} EW_demand={demand_EW:.1f}",
(30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (50, 220, 50), 2)
cv2.imshow("Adaptive Intersection", frame)
if cv2.waitKey(1) & 0xFF == 27:
break
cap.release()
cv2.destroyAllWindows() 说明与关键点
ROI/线段请按你的画面实测点位填写。队列区域建议覆盖停止线前约 30–80 米(按道路限速/存储空间决定)。
队列判定靠“低速 + 区域内”,更稳健可增加“朝向向量与车道方向夹角限制”(即只有朝向停止线方向的才算)。
若做单应性标定(推荐),可将速度阈值/队列长度直接用米和 m/s,鲁棒性更高。
五、单应性标定(可选但强烈建议)
在画面选取 4 个地面点(如车道边角/停止线角点),在真实世界给出对应坐标(米),求单应矩阵 H。
将像素坐标映射到鸟瞰平面后,速度、距离、队列长度均可用物理量。
示例(片段):
# src: 图像坐标, dst: 实际地面坐标(米)
src = np.float32([[x1,y1],[x2,y2],[x3,y3],[x4,y4]])
dst = np.float32([[0,0],[W,0],[W,H],[0,H]]) # 设定一个W×H的实际标定平面
H, _ = cv2.findHomography(src, dst)
def warp_point(pt, H):
px = np.array([pt[0], pt[1], 1.0], dtype=np.float32)
wx = H @ px
wx /= wx[2]
return (float(wx[0]), float(wx[1])) # 米 六、控制策略建议
约束参数参考:min_green 8–12 s,max_green 40–80 s,amber 3–5 s,全红 1–2 s;精确值需依据路口净宽、设计车速、ITE/GB 标准计算。
需求构造:Demand = 队列PCU + α·到达率(PCU/s)。α 取 5–15 可以把“即将排队”的方向提前获绿。
防饿死:设置 demand_bias(如 1.1–1.3),以及最大红灯等待上限(到点强制切换)。
行人/左转相位:若有专用相位,扩展为 4 相位或多相位 FSM;注意相位冲突矩阵与清空时间。
七、工程落地要点
稳定性:恶劣天气/夜间/逆光 → 调参(阈值/曝光)、模型微调、红外补光;置信度过低时回落到固定配时方案。
延迟:单路口总延迟目标 < 200 ms;模型推理 + 编解码为主耗时。
隐私合规:不做人脸/车牌识别;只保留统计与轨迹要素。
硬件:Jetson Orin 运行 4×1080p@15–25fps 可行;或服务器侧汇聚。
对接信号机:强制遵从本地标准与安规;通过 PLC/Modbus 设置线圈输出;务必有看门狗与故障安全态(全红闪)。
八、测试与评估
先用历史视频离线回放验证,再上实时;关键指标:平均延误、排队长度、通行量、相位切换频次。
可借助仿真(SUMO/CityFlow)把“检测到的需求→相位控制”闭环验证。
网友回复


