用 YOLOv8 Pose + ByteTrack 对视频中的“人”进行跟踪,同时用关键点和边界框的几何特征做简易规则,实时判定“站立/行走/摔倒/躺卧”。代码包含完整可运行脚本与参数可调,适合先跑通再迭代优化。
准备环境
Python 3.8+
安装依赖
pip install ultralytics opencv-python numpy
说明
模型:yolov8n-pose.pt(轻量),可换 yolov8s/m/l-pose 提高精度。
跟踪:内置 ByteTrack,自动给每个人分配 track id。
判定逻辑(可调阈值):
使用躯干角度(髋-肩连线相对竖直角度)、框的长宽比、髋部向下速度、中心速度等。
突然由“竖直”到“水平”且髋部下坠速度快,判定为“摔倒”。
竖直且中心移动速度较大,判定“行走”;竖直且速度较小,判定“站立”。
长时间水平,判定“躺卧”。
代码(保存为 yolo_fall_walk.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
from collections import deque
import cv2
import numpy as np
from ultralytics import YOLO
# COCO关键点索引
L_SHOULDER, R_SHOULDER = 5, 6
L_HIP, R_HIP = 11, 12
STATE_COLORS = {
"walking": (0, 200, 0), # 绿
"standing": (200, 200, 0), # 黄
"falling": (0, 0, 255), # 红
"lying": (0, 140, 255), # 橙
"unknown": (180, 180, 180), # 灰
}
def torso_angle_deg(kps_xy):
# 返回“躯干相对竖直的角度”,0度=笔直竖立,90度=水平
if kps_xy is None or len(kps_xy) < 17:
return None, None
ls, rs = kps_xy[L_SHOULDER], kps_xy[R_SHOULDER]
lh, rh = kps_xy[L_HIP], kps_xy[R_HIP]
# 如果关键点缺失,用None
if (ls == 0).all() or (rs == 0).all() or (lh == 0).all() or (rh == 0).all():
return None, None
shoulder = (ls + rs) / 2.0
hip = (lh + rh) / 2.0
v = shoulder - hip
vx, vy = float(v[0]), float(v[1])
if abs(vx) < 1e-6 and abs(vy) < 1e-6:
return None, hip
# 与竖直的夹角:用atan2(|vx|, |vy|)
ang = np.degrees(np.arctan2(abs(vx), abs(vy)))
return ang, hip
class TrackState:
def __init__(self, hist=15):
self.history = deque(maxlen=hist) # 存最近几帧的指标
self.state = "unknown"
self.last_state = "unknown"
self.stable_count = 0
self.falling_frames = 0
self.lying_frames = 0
self.missed = 0
class FallWalkingDetector:
def __init__(self, fps=30.0, hist=15):
self.fps = fps if fps and fps > 0 else 30.0
self.dt = 1.0 / self.fps
self.tracks = {}
self.hist = hist
# 参数阈值(可调)
self.AR_VERT = 1.2 # 竖直:高宽比
self.AR_HORI = 0.85 # 水平:高宽比
self.ANG_VERT = 35.0 # 竖直角阈
self.ANG_HORI = 55.0 # 水平角阈
self.SPD_WALK = 0.35 # 步行速度阈(按框高度归一化,单位:高/秒)
self.VHIP_FALL = 1.2 # 髋部向下速度触发摔倒(高/秒)
self.ANG_JUMP = 20.0 # 角度突然变化阈
self.AR_DROP = 0.3 # 高宽比快速下降比例阈
self.MIN_FALL_FRAMES = max(2, int(self.fps * 0.1))
self.MIN_LYING_FRAMES = max(4, int(self.fps * 0.2))
def _get_track(self, tid):
if tid not in self.tracks:
self.tracks[tid] = TrackState(hist=self.hist)
return self.tracks[tid]
def _geom_flags(self, ar, ang):
vertical = (ar is not None and ar > self.AR_VERT) or (ang is not None and ang < self.ANG_VERT)
horizontal = (ar is not None and ar < self.AR_HORI) or (ang is not None and ang > self.ANG_HORI)
return vertical, horizontal
def update(self, tid, bbox, kps_xy):
st = self._get_track(tid)
x1, y1, x2, y2 = bbox
w = max(1.0, x2 - x1)
h = max(1.0, y2 - y1)
cx, cy = (x1 + x2) / 2.0, (y1 + y2) / 2.0
ar = h / w
ang, hip = torso_angle_deg(kps_xy)
hip_y = hip[1] if isinstance(hip, (list, tuple, np.ndarray)) else None
# 前一帧数据
prev = st.history[-1] if len(st.history) > 0 else None
spd = 0.0
vhip = 0.0
d_ang = 0.0
ar_drop = 0.0
prev_vertical = False
if prev is not None:
# 中心速度(按框高归一化,单位:高/秒)
spd = (np.hypot(cx - prev["cx"], cy - prev["cy"]) / h) / self.dt
# 髋部向下速度(像素y轴向下为正)
if hip_y is not None and prev["hip_y"] is not None:
vhip = ((hip_y - prev["hip_y"]) / h) / self.dt
# 角度变化
if ang is not None and prev["ang"] is not None:
d_ang = ang - prev["ang"]
# AR下降比例
if prev["ar"] is not None and ar is not None and prev["ar"] > 1e-6:
ar_drop = (prev["ar"] - ar) / prev["ar"]
# 最近几帧是否竖直
prev_vertical = any(p["vertical"] for p in list(st.history)[-min(5, len(st.history)):])
vertical, horizontal = self._geom_flags(ar, ang)
# 摔倒触发条件:竖直->水平 + 髋部快速下坠 + 角度或AR快速变化
fall_trigger = prev is not None and prev_vertical and horizontal and (
(vhip > self.VHIP_FALL and d_ang > self.ANG_JUMP) or
(vhip > self.VHIP_FALL and ar_drop > self.AR_DROP)
)
# 先给出意图状态(再做稳定)
intended = st.state
if fall_trigger:
intended = "falling"
st.falling_frames += 1
else:
st.falling_frames = max(0, st.falling_frames - 1)
if horizontal:
st.lying_frames += 1
intended = "lying" if st.lying_frames >= self.MIN_LYING_FRAMES else st.state or "lying"
elif vertical:
st.lying_frames = 0
intended = "walking" if spd > self.SPD_WALK else "standing"
else:
intended = st.state # 保持
# 简单的状态抖动抑制
if intended != st.state:
st.stable_count += 1
if st.stable_count >= 2: # 连续2帧再切换
st.last_state = st.state
st.state = intended
st.stable_count = 0
else:
st.stable_count = 0
# 记录历史
st.history.append({
"cx": cx, "cy": cy,
"hip_y": hip_y,
"ang": ang,
"ar": ar,
"vertical": vertical,
"horizontal": horizontal,
"spd": spd,
"vhip": vhip,
})
return st.state, {
"spd": spd, "vhip": vhip, "ang": ang, "ar": ar,
"vertical": vertical, "horizontal": horizontal
}
def draw_label(img, x1, y1, text, color):
font = cv2.FONT_HERSHEY_SIMPLEX
scale = 0.5
thickness = 1
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
cv2.rectangle(img, (int(x1), int(y1) - th - 6), (int(x1) + tw + 6, int(y1)), color, -1)
cv2.putText(img, text, (int(x1) + 3, int(y1) - 4), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--source", type=str, default="0", help="输入源:视频路径或摄像头索引(如0)")
ap.add_argument("--out", type=str, default="", help="输出视频路径(可选)")
ap.add_argument("--conf", type=float, default=0.35, help="检测置信度阈值")
ap.add_argument("--device", type=str, default="", help="设备: cpu 或 cuda:0")
ap.add_argument("--show", action="store_true", help="窗口实时显示")
ap.add_argument("--model", type=str, default="yolov8n-pose.pt", help="YOLOv8 pose 模型路径")
args = ap.parse_args()
# 读取FPS
src = 0 if args.source.isdigit() else args.source
cap = cv2.VideoCapture(int(args.source) if args.source.isdigit() else args.source)
fps = cap.get(cv2.CAP_PROP_FPS)
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
if not fps or fps <= 0:
fps = 30.0
detector = FallWalkingDetector(fps=fps, hist=15)
# 输出视频准备
writer = None
if args.out:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(args.out, fourcc, fps, (W, H))
model = YOLO(args.model)
# 跟踪+姿态(stream 模式逐帧返回结果)
for res in model.track(
source=src,
stream=True,
conf=args.conf,
iou=0.5,
tracker="bytetrack.yaml",
persist=True,
classes=[0], # 只检测person
device=args.device if args.device else None,
verbose=False
):
# 原始图
frame = res.orig_img
# 可选:先画出YOLO自带的框和骨架
frame_drawn = res.plot(line_width=2, font_size=0.6)
if res.boxes is None or len(res.boxes) == 0:
if args.show:
cv2.imshow("Fall & Walk Detection", frame_drawn)
if cv2.waitKey(1) & 0xFF == 27:
break
if writer:
writer.write(frame_drawn)
continue
boxes = res.boxes.xyxy.cpu().numpy()
ids = res.boxes.id.cpu().numpy().astype(int) if res.boxes.id is not None else np.arange(len(boxes))
kps_xy = None
if res.keypoints is not None and hasattr(res.keypoints, "xy"):
kps_xy = res.keypoints.xy.cpu().numpy() # [N, 17, 2]
for i, box in enumerate(boxes):
tid = int(ids[i]) if i < len(ids) else i
kp = kps_xy[i] if kps_xy is not None and i < len(kps_xy) else None
state, metrics = detector.update(tid, box, kp)
x1, y1, x2, y2 = box
color = STATE_COLORS.get(state, (255, 255, 255))
# 框颜色覆盖一下,增强可视化状态
cv2.rectangle(frame_drawn, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
label = f"id:{tid} {state}"
draw_label(frame_drawn, x1, y1, label, color)
# 可选:显示一些数值(调参用)
# dbg = f"spd:{metrics['spd']:.2f} vhip:{metrics['vhip']:.2f} ang:{(metrics['ang'] or 0):.1f}"
# cv2.putText(frame_drawn, dbg, (int(x1), int(y2) + 16), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)
if args.show:
cv2.imshow("Fall & Walk Detection", frame_drawn)
if cv2.waitKey(1) & 0xFF == 27:
break
if writer:
writer.write(frame_drawn)
if writer:
writer.release()
if args.show:
cv2.destroyAllWindows()
if __name__ == "__main__":
main() 运行示例
处理视频文件并保存结果
python yolo_fall_walk.py --source input.mp4 --out result.mp4 --show
摄像头实时检测(按 ESC 退出)
python yolo_fall_walk.py --source 0 --show
使用更强的模型或指定设备
python yolo_fall_walk.py --source input.mp4 --model yolov8s-pose.pt --device cuda:0
调参建议
运动场景较剧烈时,可适当提高 VHIP_FALL、ANG_JUMP 阈值以降低误报。
远景人物较小,SPD_WALK 可以调低(如 0.25)。
室内场景摔倒多为快速下坠+躯干水平,AR_HORI 可加严(如 0.8),ANG_HORI 可略降低(如 50)。
需要更稳的判定可增大 MIN_FALL_FRAMES、MIN_LYING_FRAMES 和 history 长度。
扩展
如果希望更精准的动作识别(走/跑/蹲/坐/摔),可把关键点时间序列送入轻量时序模型(如 TCN/1D-CNN)做二次分类。
没法用姿态模型时,也可只用边框几何+速度做简化版(精度会差一些)。
网友回复


