用 YOLOv8 Pose + ByteTrack 对视频中的“人”进行跟踪,同时用关键点和边界框的几何特征做简易规则,实时判定“站立/行走/摔倒/躺卧”。代码包含完整可运行脚本与参数可调,适合先跑通再迭代优化。
准备环境
Python 3.8+
安装依赖
pip install ultralytics opencv-python numpy
说明
模型:yolov8n-pose.pt(轻量),可换 yolov8s/m/l-pose 提高精度。
跟踪:内置 ByteTrack,自动给每个人分配 track id。
判定逻辑(可调阈值):
使用躯干角度(髋-肩连线相对竖直角度)、框的长宽比、髋部向下速度、中心速度等。
突然由“竖直”到“水平”且髋部下坠速度快,判定为“摔倒”。
竖直且中心移动速度较大,判定“行走”;竖直且速度较小,判定“站立”。
长时间水平,判定“躺卧”。
代码(保存为 yolo_fall_walk.py)
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse from collections import deque import cv2 import numpy as np from ultralytics import YOLO # COCO关键点索引 L_SHOULDER, R_SHOULDER = 5, 6 L_HIP, R_HIP = 11, 12 STATE_COLORS = { "walking": (0, 200, 0), # 绿 "standing": (200, 200, 0), # 黄 "falling": (0, 0, 255), # 红 "lying": (0, 140, 255), # 橙 "unknown": (180, 180, 180), # 灰 } def torso_angle_deg(kps_xy): # 返回“躯干相对竖直的角度”,0度=笔直竖立,90度=水平 if kps_xy is None or len(kps_xy) < 17: return None, None ls, rs = kps_xy[L_SHOULDER], kps_xy[R_SHOULDER] lh, rh = kps_xy[L_HIP], kps_xy[R_HIP] # 如果关键点缺失,用None if (ls == 0).all() or (rs == 0).all() or (lh == 0).all() or (rh == 0).all(): return None, None shoulder = (ls + rs) / 2.0 hip = (lh + rh) / 2.0 v = shoulder - hip vx, vy = float(v[0]), float(v[1]) if abs(vx) < 1e-6 and abs(vy) < 1e-6: return None, hip # 与竖直的夹角:用atan2(|vx|, |vy|) ang = np.degrees(np.arctan2(abs(vx), abs(vy))) return ang, hip class TrackState: def __init__(self, hist=15): self.history = deque(maxlen=hist) # 存最近几帧的指标 self.state = "unknown" self.last_state = "unknown" self.stable_count = 0 self.falling_frames = 0 self.lying_frames = 0 self.missed = 0 class FallWalkingDetector: def __init__(self, fps=30.0, hist=15): self.fps = fps if fps and fps > 0 else 30.0 self.dt = 1.0 / self.fps self.tracks = {} self.hist = hist # 参数阈值(可调) self.AR_VERT = 1.2 # 竖直:高宽比 self.AR_HORI = 0.85 # 水平:高宽比 self.ANG_VERT = 35.0 # 竖直角阈 self.ANG_HORI = 55.0 # 水平角阈 self.SPD_WALK = 0.35 # 步行速度阈(按框高度归一化,单位:高/秒) self.VHIP_FALL = 1.2 # 髋部向下速度触发摔倒(高/秒) self.ANG_JUMP = 20.0 # 角度突然变化阈 self.AR_DROP = 0.3 # 高宽比快速下降比例阈 self.MIN_FALL_FRAMES = max(2, int(self.fps * 0.1)) self.MIN_LYING_FRAMES = max(4, int(self.fps * 0.2)) def _get_track(self, tid): if tid not in self.tracks: self.tracks[tid] = TrackState(hist=self.hist) return self.tracks[tid] def _geom_flags(self, ar, ang): vertical = (ar is not None and ar > self.AR_VERT) or (ang is not None and ang < self.ANG_VERT) horizontal = (ar is not None and ar < self.AR_HORI) or (ang is not None and ang > self.ANG_HORI) return vertical, horizontal def update(self, tid, bbox, kps_xy): st = self._get_track(tid) x1, y1, x2, y2 = bbox w = max(1.0, x2 - x1) h = max(1.0, y2 - y1) cx, cy = (x1 + x2) / 2.0, (y1 + y2) / 2.0 ar = h / w ang, hip = torso_angle_deg(kps_xy) hip_y = hip[1] if isinstance(hip, (list, tuple, np.ndarray)) else None # 前一帧数据 prev = st.history[-1] if len(st.history) > 0 else None spd = 0.0 vhip = 0.0 d_ang = 0.0 ar_drop = 0.0 prev_vertical = False if prev is not None: # 中心速度(按框高归一化,单位:高/秒) spd = (np.hypot(cx - prev["cx"], cy - prev["cy"]) / h) / self.dt # 髋部向下速度(像素y轴向下为正) if hip_y is not None and prev["hip_y"] is not None: vhip = ((hip_y - prev["hip_y"]) / h) / self.dt # 角度变化 if ang is not None and prev["ang"] is not None: d_ang = ang - prev["ang"] # AR下降比例 if prev["ar"] is not None and ar is not None and prev["ar"] > 1e-6: ar_drop = (prev["ar"] - ar) / prev["ar"] # 最近几帧是否竖直 prev_vertical = any(p["vertical"] for p in list(st.history)[-min(5, len(st.history)):]) vertical, horizontal = self._geom_flags(ar, ang) # 摔倒触发条件:竖直->水平 + 髋部快速下坠 + 角度或AR快速变化 fall_trigger = prev is not None and prev_vertical and horizontal and ( (vhip > self.VHIP_FALL and d_ang > self.ANG_JUMP) or (vhip > self.VHIP_FALL and ar_drop > self.AR_DROP) ) # 先给出意图状态(再做稳定) intended = st.state if fall_trigger: intended = "falling" st.falling_frames += 1 else: st.falling_frames = max(0, st.falling_frames - 1) if horizontal: st.lying_frames += 1 intended = "lying" if st.lying_frames >= self.MIN_LYING_FRAMES else st.state or "lying" elif vertical: st.lying_frames = 0 intended = "walking" if spd > self.SPD_WALK else "standing" else: intended = st.state # 保持 # 简单的状态抖动抑制 if intended != st.state: st.stable_count += 1 if st.stable_count >= 2: # 连续2帧再切换 st.last_state = st.state st.state = intended st.stable_count = 0 else: st.stable_count = 0 # 记录历史 st.history.append({ "cx": cx, "cy": cy, "hip_y": hip_y, "ang": ang, "ar": ar, "vertical": vertical, "horizontal": horizontal, "spd": spd, "vhip": vhip, }) return st.state, { "spd": spd, "vhip": vhip, "ang": ang, "ar": ar, "vertical": vertical, "horizontal": horizontal } def draw_label(img, x1, y1, text, color): font = cv2.FONT_HERSHEY_SIMPLEX scale = 0.5 thickness = 1 (tw, th), _ = cv2.getTextSize(text, font, scale, thickness) cv2.rectangle(img, (int(x1), int(y1) - th - 6), (int(x1) + tw + 6, int(y1)), color, -1) cv2.putText(img, text, (int(x1) + 3, int(y1) - 4), font, scale, (255, 255, 255), thickness, cv2.LINE_AA) def main(): ap = argparse.ArgumentParser() ap.add_argument("--source", type=str, default="0", help="输入源:视频路径或摄像头索引(如0)") ap.add_argument("--out", type=str, default="", help="输出视频路径(可选)") ap.add_argument("--conf", type=float, default=0.35, help="检测置信度阈值") ap.add_argument("--device", type=str, default="", help="设备: cpu 或 cuda:0") ap.add_argument("--show", action="store_true", help="窗口实时显示") ap.add_argument("--model", type=str, default="yolov8n-pose.pt", help="YOLOv8 pose 模型路径") args = ap.parse_args() # 读取FPS src = 0 if args.source.isdigit() else args.source cap = cv2.VideoCapture(int(args.source) if args.source.isdigit() else args.source) fps = cap.get(cv2.CAP_PROP_FPS) W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() if not fps or fps <= 0: fps = 30.0 detector = FallWalkingDetector(fps=fps, hist=15) # 输出视频准备 writer = None if args.out: fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(args.out, fourcc, fps, (W, H)) model = YOLO(args.model) # 跟踪+姿态(stream 模式逐帧返回结果) for res in model.track( source=src, stream=True, conf=args.conf, iou=0.5, tracker="bytetrack.yaml", persist=True, classes=[0], # 只检测person device=args.device if args.device else None, verbose=False ): # 原始图 frame = res.orig_img # 可选:先画出YOLO自带的框和骨架 frame_drawn = res.plot(line_width=2, font_size=0.6) if res.boxes is None or len(res.boxes) == 0: if args.show: cv2.imshow("Fall & Walk Detection", frame_drawn) if cv2.waitKey(1) & 0xFF == 27: break if writer: writer.write(frame_drawn) continue boxes = res.boxes.xyxy.cpu().numpy() ids = res.boxes.id.cpu().numpy().astype(int) if res.boxes.id is not None else np.arange(len(boxes)) kps_xy = None if res.keypoints is not None and hasattr(res.keypoints, "xy"): kps_xy = res.keypoints.xy.cpu().numpy() # [N, 17, 2] for i, box in enumerate(boxes): tid = int(ids[i]) if i < len(ids) else i kp = kps_xy[i] if kps_xy is not None and i < len(kps_xy) else None state, metrics = detector.update(tid, box, kp) x1, y1, x2, y2 = box color = STATE_COLORS.get(state, (255, 255, 255)) # 框颜色覆盖一下,增强可视化状态 cv2.rectangle(frame_drawn, (int(x1), int(y1)), (int(x2), int(y2)), color, 2) label = f"id:{tid} {state}" draw_label(frame_drawn, x1, y1, label, color) # 可选:显示一些数值(调参用) # dbg = f"spd:{metrics['spd']:.2f} vhip:{metrics['vhip']:.2f} ang:{(metrics['ang'] or 0):.1f}" # cv2.putText(frame_drawn, dbg, (int(x1), int(y2) + 16), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA) if args.show: cv2.imshow("Fall & Walk Detection", frame_drawn) if cv2.waitKey(1) & 0xFF == 27: break if writer: writer.write(frame_drawn) if writer: writer.release() if args.show: cv2.destroyAllWindows() if __name__ == "__main__": main()
运行示例
处理视频文件并保存结果
python yolo_fall_walk.py --source input.mp4 --out result.mp4 --show
摄像头实时检测(按 ESC 退出)
python yolo_fall_walk.py --source 0 --show
使用更强的模型或指定设备
python yolo_fall_walk.py --source input.mp4 --model yolov8s-pose.pt --device cuda:0
调参建议
运动场景较剧烈时,可适当提高 VHIP_FALL、ANG_JUMP 阈值以降低误报。
远景人物较小,SPD_WALK 可以调低(如 0.25)。
室内场景摔倒多为快速下坠+躯干水平,AR_HORI 可加严(如 0.8),ANG_HORI 可略降低(如 50)。
需要更稳的判定可增大 MIN_FALL_FRAMES、MIN_LYING_FRAMES 和 history 长度。
扩展
如果希望更精准的动作识别(走/跑/蹲/坐/摔),可把关键点时间序列送入轻量时序模型(如 TCN/1D-CNN)做二次分类。
没法用姿态模型时,也可只用边框几何+速度做简化版(精度会差一些)。
网友回复
openai发布的agentkit与coze扣子、dify等流程搭建智能体有啥不同?
阿里云上的ecs镜像存储还要钱,如何免费下载到本地以后再创建?
如何通过调用大模型api实现输入一个商品图片生成模特展示解说的宣传短片?
qwen千问大模型api如何内置互联网搜索?
YOLO如何结合opencv实现视觉实时摔倒检测?
html中内嵌style与link引入css代码报错的处理机制不同?
coze扣子中调用seadream4多参考图修改ps图片如何返回图片尺寸设为第一个图片的尺寸?
coze扣子中如何将原图缩放到指定尺寸?
如何解决传大文件突然断网重传的问题?
CefSharp与Electron开发桌面应用哪个更好?