+
10
-

回答

用 YOLOv8 Pose + ByteTrack 对视频中的“人”进行跟踪,同时用关键点和边界框的几何特征做简易规则,实时判定“站立/行走/摔倒/躺卧”。代码包含完整可运行脚本与参数可调,适合先跑通再迭代优化。

准备环境

Python 3.8+

安装依赖

pip install ultralytics opencv-python numpy

说明

模型:yolov8n-pose.pt(轻量),可换 yolov8s/m/l-pose 提高精度。

跟踪:内置 ByteTrack,自动给每个人分配 track id。

判定逻辑(可调阈值):

使用躯干角度(髋-肩连线相对竖直角度)、框的长宽比、髋部向下速度、中心速度等。

突然由“竖直”到“水平”且髋部下坠速度快,判定为“摔倒”。

竖直且中心移动速度较大,判定“行走”;竖直且速度较小,判定“站立”。

长时间水平,判定“躺卧”。

代码(保存为 yolo_fall_walk.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
from collections import deque
import cv2
import numpy as np
from ultralytics import YOLO

# COCO关键点索引
L_SHOULDER, R_SHOULDER = 5, 6
L_HIP, R_HIP = 11, 12

STATE_COLORS = {
    "walking": (0, 200, 0),     # 绿
    "standing": (200, 200, 0),  # 黄
    "falling": (0, 0, 255),     # 红
    "lying": (0, 140, 255),     # 橙
    "unknown": (180, 180, 180), # 灰
}

def torso_angle_deg(kps_xy):
    # 返回“躯干相对竖直的角度”,0度=笔直竖立,90度=水平
    if kps_xy is None or len(kps_xy) < 17:
        return None, None
    ls, rs = kps_xy[L_SHOULDER], kps_xy[R_SHOULDER]
    lh, rh = kps_xy[L_HIP], kps_xy[R_HIP]
    # 如果关键点缺失,用None
    if (ls == 0).all() or (rs == 0).all() or (lh == 0).all() or (rh == 0).all():
        return None, None
    shoulder = (ls + rs) / 2.0
    hip = (lh + rh) / 2.0
    v = shoulder - hip
    vx, vy = float(v[0]), float(v[1])
    if abs(vx) < 1e-6 and abs(vy) < 1e-6:
        return None, hip
    # 与竖直的夹角:用atan2(|vx|, |vy|)
    ang = np.degrees(np.arctan2(abs(vx), abs(vy)))
    return ang, hip

class TrackState:
    def __init__(self, hist=15):
        self.history = deque(maxlen=hist)  # 存最近几帧的指标
        self.state = "unknown"
        self.last_state = "unknown"
        self.stable_count = 0
        self.falling_frames = 0
        self.lying_frames = 0
        self.missed = 0

class FallWalkingDetector:
    def __init__(self, fps=30.0, hist=15):
        self.fps = fps if fps and fps > 0 else 30.0
        self.dt = 1.0 / self.fps
        self.tracks = {}
        self.hist = hist

        # 参数阈值(可调)
        self.AR_VERT = 1.2     # 竖直:高宽比
        self.AR_HORI = 0.85    # 水平:高宽比
        self.ANG_VERT = 35.0   # 竖直角阈
        self.ANG_HORI = 55.0   # 水平角阈
        self.SPD_WALK = 0.35   # 步行速度阈(按框高度归一化,单位:高/秒)
        self.VHIP_FALL = 1.2   # 髋部向下速度触发摔倒(高/秒)
        self.ANG_JUMP = 20.0   # 角度突然变化阈
        self.AR_DROP = 0.3     # 高宽比快速下降比例阈
        self.MIN_FALL_FRAMES = max(2, int(self.fps * 0.1))
        self.MIN_LYING_FRAMES = max(4, int(self.fps * 0.2))

    def _get_track(self, tid):
        if tid not in self.tracks:
            self.tracks[tid] = TrackState(hist=self.hist)
        return self.tracks[tid]

    def _geom_flags(self, ar, ang):
        vertical = (ar is not None and ar > self.AR_VERT) or (ang is not None and ang < self.ANG_VERT)
        horizontal = (ar is not None and ar < self.AR_HORI) or (ang is not None and ang > self.ANG_HORI)
        return vertical, horizontal

    def update(self, tid, bbox, kps_xy):
        st = self._get_track(tid)
        x1, y1, x2, y2 = bbox
        w = max(1.0, x2 - x1)
        h = max(1.0, y2 - y1)
        cx, cy = (x1 + x2) / 2.0, (y1 + y2) / 2.0
        ar = h / w

        ang, hip = torso_angle_deg(kps_xy)
        hip_y = hip[1] if isinstance(hip, (list, tuple, np.ndarray)) else None

        # 前一帧数据
        prev = st.history[-1] if len(st.history) > 0 else None
        spd = 0.0
        vhip = 0.0
        d_ang = 0.0
        ar_drop = 0.0
        prev_vertical = False

        if prev is not None:
            # 中心速度(按框高归一化,单位:高/秒)
            spd = (np.hypot(cx - prev["cx"], cy - prev["cy"]) / h) / self.dt
            # 髋部向下速度(像素y轴向下为正)
            if hip_y is not None and prev["hip_y"] is not None:
                vhip = ((hip_y - prev["hip_y"]) / h) / self.dt
            # 角度变化
            if ang is not None and prev["ang"] is not None:
                d_ang = ang - prev["ang"]
            # AR下降比例
            if prev["ar"] is not None and ar is not None and prev["ar"] > 1e-6:
                ar_drop = (prev["ar"] - ar) / prev["ar"]
            # 最近几帧是否竖直
            prev_vertical = any(p["vertical"] for p in list(st.history)[-min(5, len(st.history)):])

        vertical, horizontal = self._geom_flags(ar, ang)

        # 摔倒触发条件:竖直->水平 + 髋部快速下坠 + 角度或AR快速变化
        fall_trigger = prev is not None and prev_vertical and horizontal and (
            (vhip > self.VHIP_FALL and d_ang > self.ANG_JUMP) or
            (vhip > self.VHIP_FALL and ar_drop > self.AR_DROP)
        )

        # 先给出意图状态(再做稳定)
        intended = st.state
        if fall_trigger:
            intended = "falling"
            st.falling_frames += 1
        else:
            st.falling_frames = max(0, st.falling_frames - 1)
            if horizontal:
                st.lying_frames += 1
                intended = "lying" if st.lying_frames >= self.MIN_LYING_FRAMES else st.state or "lying"
            elif vertical:
                st.lying_frames = 0
                intended = "walking" if spd > self.SPD_WALK else "standing"
            else:
                intended = st.state  # 保持

        # 简单的状态抖动抑制
        if intended != st.state:
            st.stable_count += 1
            if st.stable_count >= 2:  # 连续2帧再切换
                st.last_state = st.state
                st.state = intended
                st.stable_count = 0
        else:
            st.stable_count = 0

        # 记录历史
        st.history.append({
            "cx": cx, "cy": cy,
            "hip_y": hip_y,
            "ang": ang,
            "ar": ar,
            "vertical": vertical,
            "horizontal": horizontal,
            "spd": spd,
            "vhip": vhip,
        })

        return st.state, {
            "spd": spd, "vhip": vhip, "ang": ang, "ar": ar,
            "vertical": vertical, "horizontal": horizontal
        }

def draw_label(img, x1, y1, text, color):
    font = cv2.FONT_HERSHEY_SIMPLEX
    scale = 0.5
    thickness = 1
    (tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
    cv2.rectangle(img, (int(x1), int(y1) - th - 6), (int(x1) + tw + 6, int(y1)), color, -1)
    cv2.putText(img, text, (int(x1) + 3, int(y1) - 4), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--source", type=str, default="0", help="输入源:视频路径或摄像头索引(如0)")
    ap.add_argument("--out", type=str, default="", help="输出视频路径(可选)")
    ap.add_argument("--conf", type=float, default=0.35, help="检测置信度阈值")
    ap.add_argument("--device", type=str, default="", help="设备: cpu 或 cuda:0")
    ap.add_argument("--show", action="store_true", help="窗口实时显示")
    ap.add_argument("--model", type=str, default="yolov8n-pose.pt", help="YOLOv8 pose 模型路径")
    args = ap.parse_args()

    # 读取FPS
    src = 0 if args.source.isdigit() else args.source
    cap = cv2.VideoCapture(int(args.source) if args.source.isdigit() else args.source)
    fps = cap.get(cv2.CAP_PROP_FPS)
    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()
    if not fps or fps <= 0:
        fps = 30.0

    detector = FallWalkingDetector(fps=fps, hist=15)

    # 输出视频准备
    writer = None
    if args.out:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(args.out, fourcc, fps, (W, H))

    model = YOLO(args.model)

    # 跟踪+姿态(stream 模式逐帧返回结果)
    for res in model.track(
        source=src,
        stream=True,
        conf=args.conf,
        iou=0.5,
        tracker="bytetrack.yaml",
        persist=True,
        classes=[0],  # 只检测person
        device=args.device if args.device else None,
        verbose=False
    ):
        # 原始图
        frame = res.orig_img

        # 可选:先画出YOLO自带的框和骨架
        frame_drawn = res.plot(line_width=2, font_size=0.6)

        if res.boxes is None or len(res.boxes) == 0:
            if args.show:
                cv2.imshow("Fall & Walk Detection", frame_drawn)
                if cv2.waitKey(1) & 0xFF == 27:
                    break
            if writer:
                writer.write(frame_drawn)
            continue

        boxes = res.boxes.xyxy.cpu().numpy()
        ids = res.boxes.id.cpu().numpy().astype(int) if res.boxes.id is not None else np.arange(len(boxes))
        kps_xy = None
        if res.keypoints is not None and hasattr(res.keypoints, "xy"):
            kps_xy = res.keypoints.xy.cpu().numpy()  # [N, 17, 2]

        for i, box in enumerate(boxes):
            tid = int(ids[i]) if i < len(ids) else i
            kp = kps_xy[i] if kps_xy is not None and i < len(kps_xy) else None
            state, metrics = detector.update(tid, box, kp)

            x1, y1, x2, y2 = box
            color = STATE_COLORS.get(state, (255, 255, 255))
            # 框颜色覆盖一下,增强可视化状态
            cv2.rectangle(frame_drawn, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)

            label = f"id:{tid} {state}"
            draw_label(frame_drawn, x1, y1, label, color)

            # 可选:显示一些数值(调参用)
            # dbg = f"spd:{metrics['spd']:.2f} vhip:{metrics['vhip']:.2f} ang:{(metrics['ang'] or 0):.1f}"
            # cv2.putText(frame_drawn, dbg, (int(x1), int(y2) + 16), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)

        if args.show:
            cv2.imshow("Fall & Walk Detection", frame_drawn)
            if cv2.waitKey(1) & 0xFF == 27:
                break

        if writer:
            writer.write(frame_drawn)

    if writer:
        writer.release()
    if args.show:
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

运行示例

处理视频文件并保存结果

python yolo_fall_walk.py --source input.mp4 --out result.mp4 --show

摄像头实时检测(按 ESC 退出)

python yolo_fall_walk.py --source 0 --show

使用更强的模型或指定设备

python yolo_fall_walk.py --source input.mp4 --model yolov8s-pose.pt --device cuda:0

调参建议

运动场景较剧烈时,可适当提高 VHIP_FALL、ANG_JUMP 阈值以降低误报。

远景人物较小,SPD_WALK 可以调低(如 0.25)。

室内场景摔倒多为快速下坠+躯干水平,AR_HORI 可加严(如 0.8),ANG_HORI 可略降低(如 50)。

需要更稳的判定可增大 MIN_FALL_FRAMES、MIN_LYING_FRAMES 和 history 长度。

扩展

如果希望更精准的动作识别(走/跑/蹲/坐/摔),可把关键点时间序列送入轻量时序模型(如 TCN/1D-CNN)做二次分类。

没法用姿态模型时,也可只用边框几何+速度做简化版(精度会差一些)。

网友回复

我知道答案,我要回答