PiscCode轨迹跟踪Mediapipe + OpenCV进阶:速度估算
在智能视频分析和计算机视觉领域,目标检测与目标跟踪是核心技术模块。传统的目标检测只能告诉我们“哪里有物体”,而跟踪技术能够在视频流中持续维护物体的身份信息。当我们进一步希望分析目标的运动状态时,比如速度和轨迹,便需要在跟踪的基础上引入运动估计。
本文结合 Mediapipe 的轻量级检测模型与 OpenCV 的图像处理能力,详细介绍一个支持实时速度计算的检测+跟踪系统——FrameObjectTrackerWithSpeed
,并分享其设计思路、实现方法和优化方向。
一、技术背景
1.1 目标检测与跟踪
目标检测(Object Detection):在单帧图像中识别物体,返回边界框和类别信息。常用模型有 YOLO、SSD、Faster R-CNN 等。
目标跟踪(Object Tracking):在视频中对物体进行持续跟踪,为同一物体分配唯一 ID,使其身份一致。常用方法有 KCF、SORT、DeepSORT 等。
结合检测与跟踪,可以实现:
统计物体经过某一区域的次数;
分析物体运动轨迹与速度;
在监控或交通场景中进行行为分析。
1.2 为什么选择 Mediapipe
Mediapipe 是 Google 开源的跨平台机器学习框架,优点:
轻量高效:EfficientDet Lite 等模型在移动端和桌面端均可实时运行。
易用接口:Python API 封装简单,快速上手。
跨平台部署:支持 Android、iOS、Web、Python。
GPU/NNAPI 优化:可充分利用硬件加速。
在本文中,我们使用 efficientdet_lite0.tflite
模型进行目标检测,兼顾速度与准确率。
1.3 为什么使用 OpenCV
OpenCV 在视频处理与可视化方面功能强大,主要作用:
读取和处理视频帧;
绘制检测框、文字、轨迹线;
图像后处理和调试。
因此,Mediapipe + OpenCV 是轻量级实时视频分析的理想组合。
二、整体架构设计
新的 FrameObjectTrackerWithSpeed
系统由三个核心模块组成:
目标检测器(Detector)
MediapipeObjectDetector
接收视频帧,输出检测框与类别。轨迹管理器(Tracker)
使用 IoU 将检测框与已有轨迹匹配;
新目标创建新轨迹;
丢失目标在超过阈值后删除;
保存历史轨迹点。
速度计算模块(Speed Estimator)
根据轨迹点位置和时间间隔计算速度;
支持像素→实际距离转换(
pixel_to_meter
参数);将速度显示在图像上。
整体流程如下:
视频帧 → Mediapipe检测 → 轨迹匹配/更新 → 速度计算 → 可视化输出
三、核心代码解析
3.1 初始化检测器
base_options = python.BaseOptions(model_asset_path=model_path)
options = vision.ObjectDetectorOptions(base_options=base_options,score_threshold=score_threshold
)
self.detector = vision.ObjectDetector.create_from_options(options)
指定 tflite 模型路径;
设置检测置信度阈值;
Mediapipe 内部会对输入图像做格式转换和推理。
3.2 Track 类设计(带速度计算)
class Track:def __init__(self, tid, bbox, category, trace_len, pixel_to_meter):self.id = tidself.bbox = bboxself.category = categoryself.lost = 0self.trace = []self.trace_len = trace_lenself.pixel_to_meter = pixel_to_meterself.last_time = time.time()self.speed = 0.0
每个轨迹对象维护:
ID、类别、边界框;
丢失计数
lost
;历史轨迹
trace
;速度
speed
;上一次更新时间
last_time
。
更新方法:
def update(self, bbox):self.bbox = bboxcx = (bbox[0]+bbox[2])//2cy = (bbox[1]+bbox[3])//2current_time = time.time()if self.trace:prev_cx, prev_cy = self.trace[-1]dt = current_time - self.last_timeif dt > 0:dx = (cx - prev_cx) * self.pixel_to_meterdy = (cy - prev_cy) * self.pixel_to_meterdistance = (dx**2 + dy**2)**0.5self.speed = distance / dt # m/sself.last_time = current_timeself.trace.append((cx, cy))if len(self.trace) > self.trace_len:self.trace.pop(0)
特点:
中心点坐标变化用于估算移动距离;
使用时间差计算瞬时速度;
支持像素到米的转换,方便真实场景分析。
3.3 IoU 匹配逻辑
@staticmethod
def _iou(boxA, boxB):xA = max(boxA[0], boxB[0])yA = max(boxA[1], boxB[1])xB = min(boxA[2], boxB[2])yB = min(boxA[3], boxB[3])interArea = max(0, xB - xA) * max(0, yB - yA)boxAArea = (boxA[2]-boxA[0])*(boxA[3]-boxA[1])boxBArea = (boxB[2]-boxB[0])*(boxB[3]-boxB[1])return interArea / float(boxAArea + boxBArea - interArea + 1e-6)
用交并比判断检测框与轨迹匹配程度;
IoU 大于阈值认为是同一个目标。
3.4 更新轨迹
def _update_tracks(self, detections):updated_tracks = []used_dets = set()for track in self.tracks:best_iou, best_det = 0, Nonefor i, det in enumerate(detections):if i in used_dets: continuebbox = det.bounding_boxbox_det = (bbox.origin_x, bbox.origin_y, bbox.origin_x+bbox.width, bbox.origin_y+bbox.height)iou_score = self._iou(track.bbox, box_det)if iou_score > best_iou:best_iou, best_det = iou_score, (i, det)if best_det and best_iou > self.iou_threshold:i, det = best_detused_dets.add(i)bbox = det.bounding_boxbox_det = (bbox.origin_x, bbox.origin_y, bbox.origin_x+bbox.width, bbox.origin_y+bbox.height)track.update(box_det)track.lost = 0updated_tracks.append(track)else:track.lost += 1if track.lost <= self.max_lost:updated_tracks.append(track)# 新增未匹配检测for i, det in enumerate(detections):if i in used_dets: continuebbox = det.bounding_boxbox_det = (bbox.origin_x, bbox.origin_y, bbox.origin_x+bbox.width, bbox.origin_y+bbox.height)category = det.categories[0].category_name if det.categories else "obj"new_track = self.Track(self.next_id, box_det, category, self.trace_len, self.pixel_to_meter)new_track.update(box_det)self.next_id += 1updated_tracks.append(new_track)self.tracks = updated_tracks
匹配已有轨迹并更新;
未匹配的检测新建轨迹;
丢失目标计数超过阈值后删除。
3.5 可视化与速度显示
def _draw_tracks(self, frame):annotated = frame.copy()for t in self.tracks:x1, y1, x2, y2 = t.bboxcolor = self._get_color(f"{t.category}_{t.id}")cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 3)label = f"ID {t.id} {t.category} {t.speed:.2f} m/s"...cv2.putText(annotated, label, (text_x, text_y), font, font_scale, color, thickness, cv2.LINE_AA)if len(t.trace) > 1:for i in range(1, len(t.trace)):cv2.line(annotated, t.trace[i-1], t.trace[i], color, 2)return annotated
绘制矩形框和轨迹折线;
显示 ID、类别与速度;
文字居中于框体,视觉效果清晰。
3.6 帧处理调用
cap = cv2.VideoCapture(0)
tracker = FrameObjectTrackerWithSpeed(pixel_to_meter=0.01)while cap.isOpened():ret, frame = cap.read()if not ret: breakannotated = tracker.do(frame, device="cpu")cv2.imshow("Tracking + Speed", annotated)if cv2.waitKey(1) & 0xFF == 27: break
cap.release()
cv2.destroyAllWindows()
使用摄像头捕获视频;
每帧调用
do()
方法进行检测、跟踪和速度计算;实时显示。
四、应用场景
交通监控
车辆检测、跟踪与速度统计;
超速警告或流量分析。
智能安防
人体检测与轨迹分析;
可实现异常行为告警。
体育分析
球员运动轨迹和速度;
训练与战术分析。
机器人视觉
目标跟随;
速度感知与导航。
五、优化与扩展
import cv2import mediapipe as mpimport randomfrom mediapipe.tasks import pythonfrom mediapipe.tasks.python import visionimport timeclass FrameObjectTrackerWithSpeed:"""Mediapipe 检测 + 简易轨迹跟踪 + 速度显示"""def __init__(self, model_path="文件地址/efficientdet_lite0.tflite",score_threshold=0.5, max_lost=5, iou_threshold=0.3,trace_len=30, pixel_to_meter=0.01):""":param model_path: mediapipe tflite 模型路径:param score_threshold: 检测分数阈值:param max_lost: 目标丢失多少帧后删除:param iou_threshold: IOU 阈值用于匹配:param trace_len: 保存的轨迹长度:param pixel_to_meter: 像素到米的比例"""# 初始化 mediapipe detectorbase_options = python.BaseOptions(model_asset_path=model_path)options = vision.ObjectDetectorOptions(base_options=base_options,score_threshold=score_threshold)self.detector = vision.ObjectDetector.create_from_options(options)# tracker 参数self.category_colors = {}self.next_id = 0self.tracks = []self.max_lost = max_lostself.iou_threshold = iou_thresholdself.trace_len = trace_lenself.pixel_to_meter = pixel_to_meterclass Track:"""轨迹对象"""def __init__(self, tid, bbox, category, trace_len, pixel_to_meter):self.id = tidself.bbox = bbox # (x1, y1, x2, y2)self.category = categoryself.lost = 0self.trace = [] # 历史轨迹点self.trace_len = trace_lenself.pixel_to_meter = pixel_to_meterself.last_time = time.time()self.speed = 0.0 # m/sdef update(self, bbox):self.bbox = bboxcx = (bbox[0] + bbox[2]) // 2cy = (bbox[1] + bbox[3]) // 2current_time = time.time()# 计算速度if self.trace:prev_cx, prev_cy = self.trace[-1]dt = current_time - self.last_timeif dt > 0:dx = (cx - prev_cx) * self.pixel_to_meterdy = (cy - prev_cy) * self.pixel_to_meterdistance = (dx ** 2 + dy ** 2) ** 0.5self.speed = distance / dt # m/sself.last_time = current_timeself.trace.append((cx, cy))if len(self.trace) > self.trace_len:self.trace.pop(0)@staticmethoddef _iou(boxA, boxB):"""计算两个框的 IOU"""xA = max(boxA[0], boxB[0])yA = max(boxA[1], boxB[1])xB = min(boxA[2], boxB[2])yB = min(boxA[3], boxB[3])interArea = max(0, xB - xA) * max(0, yB - yA)boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])return interArea / float(boxAArea + boxBArea - interArea + 1e-6)def _get_color(self, unique_name: str):"""为每个目标分配一个随机满饱和度颜色满饱和度颜色:RGB 中至少一个通道为 255,其余随机"""if unique_name not in self.category_colors:channels = [0, 0, 0]max_channel = random.randint(0, 2)channels[max_channel] = 255for i in range(3):if i != max_channel:channels[i] = random.randint(0, 255)self.category_colors[unique_name] = tuple(channels)return self.category_colors[unique_name]def _update_tracks(self, detections):updated_tracks = []used_dets = set()# 尝试匹配已有轨迹for track in self.tracks:best_iou, best_det = 0, Nonefor i, det in enumerate(detections):if i in used_dets:continuebbox = det.bounding_boxbox_det = (bbox.origin_x, bbox.origin_y,bbox.origin_x + bbox.width,bbox.origin_y + bbox.height)iou_score = self._iou(track.bbox, box_det)if iou_score > best_iou:best_iou, best_det = iou_score, (i, det)if best_det and best_iou > self.iou_threshold:i, det = best_detused_dets.add(i)bbox = det.bounding_boxbox_det = (bbox.origin_x, bbox.origin_y,bbox.origin_x + bbox.width,bbox.origin_y + bbox.height)track.update(box_det)track.lost = 0updated_tracks.append(track)else:track.lost += 1if track.lost <= self.max_lost:updated_tracks.append(track)# 新增未匹配的检测for i, det in enumerate(detections):if i in used_dets:continuebbox = det.bounding_boxbox_det = (bbox.origin_x, bbox.origin_y,bbox.origin_x + bbox.width,bbox.origin_y + bbox.height)category = det.categories[0].category_name if det.categories else "obj"unique_name = f"{category}_{self.next_id}"new_track = self.Track(self.next_id, box_det, category, self.trace_len, self.pixel_to_meter)new_track.update(box_det)self.next_id += 1updated_tracks.append(new_track)self.tracks = updated_tracksdef _draw_tracks(self, frame):annotated = frame.copy()for t in self.tracks:x1, y1, x2, y2 = t.bboxcolor = self._get_color(f"{t.category}_{t.id}") # 每个目标唯一颜色cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 3) # 较粗框线label = f"ID {t.id} {t.category} {t.speed:.2f} m/s"font = cv2.FONT_HERSHEY_SIMPLEXfont_scale = 2.0thickness = 3(text_width, text_height), baseline = cv2.getTextSize(label, font, font_scale, thickness)# 文字水平居中对齐到框体中心cx = (x1 + x2) // 2cy = (y1 + y2) // 2text_x = cx - text_width // 2text_y = cy + text_height // 2 # 垂直方向稍微向下一点,让文字居中cv2.putText(annotated, label, (text_x, text_y),font, font_scale, color, thickness, cv2.LINE_AA)# 画轨迹if len(t.trace) > 1:for i in range(1, len(t.trace)):cv2.line(annotated, t.trace[i - 1], t.trace[i], color, 2)return annotateddef do(self, frame,device):"""处理一帧,返回带检测框、轨迹和速度的图像"""if frame is None:return Nonemp_image = mp.Image(image_format=mp.ImageFormat.SRGB,data=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))detection_result = self.detector.detect(mp_image)detections = detection_result.detections or []self._update_tracks(detections)return self._draw_tracks(frame)
跟踪优化
引入 Kalman 滤波预测位置;
使用匈牙利算法做全局匹配;
增加外观特征(ReID)避免遮挡导致 ID 混淆。
速度计算优化
采用移动平均平滑瞬时速度;
支持多相机标定实现真实世界速度估计。
性能优化
GPU 或 OpenVINO 加速;
使用更轻量模型(如 EfficientDet Lite2/3)提升帧率。
功能扩展
区域统计与事件触发;
数据记录与可视化分析。
六、总结
本文介绍了一个基于 Mediapipe + OpenCV 的实时目标检测与轨迹跟踪系统,并加入了速度计算功能。核心思路:
Mediapipe 高效检测目标;
IoU 匹配实现简易跟踪;
轨迹点记录并计算速度;
可视化显示框、轨迹和速度。
该系统轻量、可实时运行,适合视频分析、交通监控、体育分析、机器人视觉等场景,并具备进一步优化空间,如使用 Kalman 滤波、多目标匹配或更高精度模型。