+
33
-

回答

下面给你一套可落地的思路与示例代码:用 Python + CV 模型实时感知路口四个方向车流量,按需自适应分配绿灯时长,提升通勤效率。

一、整体方案(端到端)

感知层:路口相机(1 个俯视/斜俯视广角,或每个方向 1 个相机)→ RTSP 视频流

识别跟踪:YOLO 检测(car/bus/truck/motorcycle)+ ByteTrack 多目标跟踪

交通量指标:

车队长度/排队数:在“停止线前的队列区域 ROI”统计低速车辆数

到达率(可选):在“上游计数线”统计单位时间过线数

PCU 权重:按车种折算等效小客车数(car=1, moto=0.3, bus=2, truck=2.5)

控制策略:自适应相位控制(最小绿/最大绿/黄灯/全红、按需求分配绿灯,支持gap-out)

执行层:模拟(UI 可视化/MQTT)或对接信号机(Modbus/RS485/PLC)

监控与安全:异常回落固定配时、日志与告警、夜间/恶劣天气策略

二、技术选型

模型:YOLOv8n/s(ultralytics),或 YOLO-World/RT-DETR(更强更慢)

跟踪:ByteTrack(supervision 库封装)

视频:OpenCV 读取 RTSP

加速硬件:Jetson Orin/Xavier、RTX 30/40 系列;CPU 也可但路口多流会吃力

仓库与数据:COCO 预训练可用,场景适配可微调(UA-DETRAC、BDD100K、VisDrone 等)

三、实现步骤1) 相机与标定

固定相机,尽量俯视减小遮挡;保存一帧清晰画面

标定停止线与“队列区域 Polygon(每个方向一块)”,可用鼠标点多边形或以真实尺寸做单应性标定(推荐)

可选:设置“上游计数线”,用于测到达率

2) 检测与跟踪

只保留车辆类(car/motorcycle/bus/truck)

跟踪获得稳定的 track_id 与位移,估计速度与方向

3) 车流量度量

队列判定:落在队列区域且速度低于阈值即视为排队

队列需求= Σ(车辆PCU权重)

相位(NS/EW)需求=对应两个方向的需求之和

可选:到达率叠加入需求,防止刚到达方向长期饥饿

4) 自适应控制(2 相位示例:南北同绿/东西同绿)

约束:最小绿(min_green)、最大绿(max_green)、黄灯(amber)、全红(all_red)

切换条件:

达到最小绿后,若对向需求显著更大或当前相位“gap-out”(一定时间无车/队列清空),则转黄→全红→对向

达最大绿强制切换

公平性:设置需求阈值与加权,防止某方向长时间不得绿

四、核心 Python 示例(YOLOv8 + ByteTrack + OpenCV + 简易自适应控制)依赖安装:pip install ultralytics supervision==0.18.0 opencv-python numpy

示例仅演示单画面(一个广角包含四个方向)。如果你是四路相机,读四个 RTSP 并在同一时钟下融合各自 ROI 即可。

import time
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv

# 1) 配置区域(需要你根据画面标注)
# 每个方向的“队列区域”顶点(顺时针),像素坐标
QUEUE_ROIS = {
    "N": [(100, 100), (300, 100), (300, 260), (100, 260)],
    "S": [(100, 700), (300, 700), (300, 540), (100, 540)],
    "E": [(1000, 300), (1160, 300), (1160, 500), (1000, 500)],
    "W": [(20, 300), (180, 300), (180, 500), (20, 500)],
}
# 如需到达率,额外定义上游计数线(示例)
LINE_PAIRS = {
    "N": ((120, 260), (280, 260)),
    "S": ((120, 540), (280, 540)),
    "E": ((1000, 320), (1000, 480)),
    "W": ((180, 320), (180, 480)),
}

# PCU 权重
PCU = {2: 1.0, 3: 0.3, 5: 2.0, 7: 2.5}  # COCO: 2 car, 3 moto, 5 bus, 7 truck
INTEREST_CLASSES = set(PCU.keys())

# 速度阈值(像素/秒),建议完成单应性标定后改为 m/s 阈值如 0.5 m/s
QUEUE_SPEED_THRESH_PX = 12.0

# 2) 自适应控制器(两相位:NS/EW)
class AdaptiveController:
    def __init__(self, min_green=10, max_green=45, amber=3, all_red=1, gap_out=3, demand_bias=1.15):
        self.state = "NS_GREEN"  # NS_GREEN, NS_AMBER, ALL_RED_TO_EW, EW_GREEN, EW_AMBER, ALL_RED_TO_NS
        self.t_state = time.time()
        self.min_green = min_green
        self.max_green = max_green
        self.amber = amber
        self.all_red = all_red
        self.gap_out = gap_out
        self.demand_bias = demand_bias
        self.last_seen_vehicle = {"NS": time.time(), "EW": time.time()}

    def update_last_seen(self, phase, has_vehicle):
        if has_vehicle:
            self.last_seen_vehicle[phase] = time.time()

    def time_in_state(self):
        return time.time() - self.t_state

    def set_state(self, s):
        self.state = s
        self.t_state = time.time()

    def decide(self, demand_NS, demand_EW, has_NS, has_EW):
        # 更新“是否有车”观察,用于gap-out
        self.update_last_seen("NS", has_NS)
        self.update_last_seen("EW", has_EW)

        t = self.time_in_state()

        if self.state == "NS_GREEN":
            # 强制最小绿
            if t < self.min_green:
                return self.output("NS_GREEN")
            # 最大绿
            if t >= self.max_green:
                self.set_state("NS_AMBER")
                return self.output("NS_AMBER")
            # gap-out 或对向需求更大
            ns_gap = time.time() - self.last_seen_vehicle["NS"]
            if (ns_gap >= self.gap_out and demand_EW > 0) or (demand_EW > demand_NS * self.demand_bias):
                self.set_state("NS_AMBER")
                return self.output("NS_AMBER")
            return self.output("NS_GREEN")

        if self.state == "NS_AMBER":
            if t >= self.amber:
                self.set_state("ALL_RED_TO_EW")
            return self.output("NS_AMBER")

        if self.state == "ALL_RED_TO_EW":
            if t >= self.all_red:
                self.set_state("EW_GREEN")
            return self.output("ALL_RED")

        if self.state == "EW_GREEN":
            if t < self.min_green:
                return self.output("EW_GREEN")
            if t >= self.max_green:
                self.set_state("EW_AMBER")
                return self.output("EW_AMBER")
            ew_gap = time.time() - self.last_seen_vehicle["EW"]
            if (ew_gap >= self.gap_out and demand_NS > 0) or (demand_NS > demand_EW * self.demand_bias):
                self.set_state("EW_AMBER")
                return self.output("EW_AMBER")
            return self.output("EW_GREEN")

        if self.state == "EW_AMBER":
            if t >= self.amber:
                self.set_state("ALL_RED_TO_NS")
            return self.output("EW_AMBER")

        if self.state == "ALL_RED_TO_NS":
            if t >= self.all_red:
                self.set_state("NS_GREEN")
            return self.output("ALL_RED")

    def output(self, mode):
        # 返回四个方向的灯色
        if mode in ["NS_GREEN", "NS_AMBER"]:
            return {"N": ("G" if mode=="NS_GREEN" else "Y"),
                    "S": ("G" if mode=="NS_GREEN" else "Y"),
                    "E": "R", "W": "R", "state": self.state}
        if mode in ["EW_GREEN", "EW_AMBER"]:
            return {"E": ("G" if mode=="EW_GREEN" else "Y"),
                    "W": ("G" if mode=="EW_GREEN" else "Y"),
                    "N": "R", "S": "R", "state": self.state}
        # ALL_RED
        return {"N": "R", "S": "R", "E": "R", "W": "R", "state": self.state}

# 3) 工具函数
def center_bottom(xyxy):
    x1, y1, x2, y2 = xyxy
    return (int((x1+x2)/2), int(y2))

def point_in_poly(pt, poly):
    # poly: list[(x,y)]
    res = cv2.pointPolygonTest(np.array(poly, np.int32), pt, False)
    return res >= 0

# 4) 初始化
model = YOLO("yolov8n.pt")
tracker = sv.ByteTrack(track_thresh=0.25, track_buffer=30, match_thresh=0.8)
cap = cv2.VideoCapture("your_rtsp_or_video.mp4")  # 替换为 RTSP

# 计数线(可选)
line_zones = {k: sv.LineZone(sv.Point(*p1), sv.Point(*p2)) for k,(p1,p2) in LINE_PAIRS.items()}
line_annotators = {k: sv.LineZoneAnnotator(thickness=2, text_thickness=1, text_scale=0.5) for k in line_zones}

# 轨迹速度缓存
last_pos = {}      # track_id -> (x,y)
last_time = {}     # track_id -> timestamp

controller = AdaptiveController()

while True:
    ok, frame = cap.read()
    if not ok:
        break
    ts = time.time()

    # 检测
    res = model(frame, verbose=False)[0]
    det = sv.Detections.from_ultralytics(res)
    mask = np.array([cls in INTEREST_CLASSES for cls in det.class_id], dtype=bool)
    det = det[mask]

    # 跟踪
    tracked = tracker.update_with_detections(det)

    # 计算速度与所属方向(基于队列区域)
    # 聚合每个方向的队列PCU和是否有车
    demand = {d: 0.0 for d in QUEUE_ROIS.keys()}
    has_vehicle = {d: False for d in ["N", "S", "E", "W"]}

    for i in range(len(tracked)):
        xyxy = tracked.xyxy[i]
        cls = int(tracked.class_id[i])
        tid = int(tracked.tracker_id[i]) if tracked.tracker_id is not None else -1
        c = center_bottom(xyxy)

        # 速度估计
        v = 0.0
        if tid != -1:
            if tid in last_pos:
                dt = ts - last_time[tid]
                if dt > 0:
                    dx = c[0] - last_pos[tid][0]
                    dy = c[1] - last_pos[tid][1]
                    v = (dx*dx + dy*dy) ** 0.5 / dt  # px/s
            last_pos[tid] = c
            last_time[tid] = ts

        # 属于哪个方向的队列区域
        for d, poly in QUEUE_ROIS.items():
            if point_in_poly(c, poly):
                has_vehicle[d] = True
                # 排队:速度低于阈值
                if v <= QUEUE_SPEED_THRESH_PX:
                    demand[d] += PCU.get(cls, 1.0)
                break

        # 到达率(可选):过线统计
        for k, lz in line_zones.items():
            lz.trigger(tracked)  # supervision 会用 bbox 的中心进行跨线判断

    # 合并相位需求(两相位)
    demand_NS = demand["N"] + demand["S"]
    demand_EW = demand["E"] + demand["W"]
    has_NS = has_vehicle["N"] or has_vehicle["S"]
    has_EW = has_vehicle["E"] or has_vehicle["W"]

    lights = controller.decide(demand_NS, demand_EW, has_NS, has_EW)

    # 叠加可视化
    for d, poly in QUEUE_ROIS.items():
        color = (0,255,0) if (lights[d] == "G") else ((0,255,255) if lights[d]=="Y" else (0,0,255))
        cv2.polylines(frame, [np.array(poly, np.int32)], True, color, 2)
        cv2.putText(frame, f"{d}: {lights[d]}  q_pcu={demand[d]:.1f}", (poly[0][0], poly[0][1]-8),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA)

    # 可选:画计数线
    for k, lz in line_zones.items():
        line_annotators[k].annotate(frame, lz)

    cv2.putText(frame, f"Phase={lights['state']} | NS_demand={demand_NS:.1f} EW_demand={demand_EW:.1f}",
                (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (50, 220, 50), 2)
    cv2.imshow("Adaptive Intersection", frame)
    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
cv2.destroyAllWindows()

说明与关键点

ROI/线段请按你的画面实测点位填写。队列区域建议覆盖停止线前约 30–80 米(按道路限速/存储空间决定)。

队列判定靠“低速 + 区域内”,更稳健可增加“朝向向量与车道方向夹角限制”(即只有朝向停止线方向的才算)。

若做单应性标定(推荐),可将速度阈值/队列长度直接用米和 m/s,鲁棒性更高。

五、单应性标定(可选但强烈建议)

在画面选取 4 个地面点(如车道边角/停止线角点),在真实世界给出对应坐标(米),求单应矩阵 H。

将像素坐标映射到鸟瞰平面后,速度、距离、队列长度均可用物理量。

示例(片段):

# src: 图像坐标, dst: 实际地面坐标(米)
src = np.float32([[x1,y1],[x2,y2],[x3,y3],[x4,y4]])
dst = np.float32([[0,0],[W,0],[W,H],[0,H]])  # 设定一个W×H的实际标定平面
H, _ = cv2.findHomography(src, dst)

def warp_point(pt, H):
    px = np.array([pt[0], pt[1], 1.0], dtype=np.float32)
    wx = H @ px
    wx /= wx[2]
    return (float(wx[0]), float(wx[1]))  # 米

六、控制策略建议

约束参数参考:min_green 8–12 s,max_green 40–80 s,amber 3–5 s,全红 1–2 s;精确值需依据路口净宽、设计车速、ITE/GB 标准计算。

需求构造:Demand = 队列PCU + α·到达率(PCU/s)。α 取 5–15 可以把“即将排队”的方向提前获绿。

防饿死:设置 demand_bias(如 1.1–1.3),以及最大红灯等待上限(到点强制切换)。

行人/左转相位:若有专用相位,扩展为 4 相位或多相位 FSM;注意相位冲突矩阵与清空时间。

七、工程落地要点

稳定性:恶劣天气/夜间/逆光 → 调参(阈值/曝光)、模型微调、红外补光;置信度过低时回落到固定配时方案。

延迟:单路口总延迟目标 < 200 ms;模型推理 + 编解码为主耗时。

隐私合规:不做人脸/车牌识别;只保留统计与轨迹要素。

硬件:Jetson Orin 运行 4×1080p@15–25fps 可行;或服务器侧汇聚。

对接信号机:强制遵从本地标准与安规;通过 PLC/Modbus 设置线圈输出;务必有看门狗与故障安全态(全红闪)。

八、测试与评估

先用历史视频离线回放验证,再上实时;关键指标:平均延误、排队长度、通行量、相位切换频次。

可借助仿真(SUMO/CityFlow)把“检测到的需求→相位控制”闭环验证。

网友回复

我知道答案,我要回答