当前位置: 首页 > news >正文

录制mp4

目录

单线程保存mp4

多线程保存mp4 rtsp

ffmpeg录制mp4


单线程保存mp4


import cv2
import imageiocv2.namedWindow('photo', 0)  # 0窗口大小可以任意拖动,1自适应
cv2.resizeWindow('photo', 1280, 720)
url ="rtsp://admin:aa123456@192.168.1.64/h264/ch1/main/av_stream"
cap = cv2.VideoCapture(1)
ret = cap.isOpened()
imgs = []
fps =30
index = 0
count = 0
strat_record = False
while (ret):ret, img = cap.read()if not ret: breakcv2.imshow('photo', img)img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)if strat_record:imgs.append(img)index +=1if index %300 == 299 and strat_record:count+=1save_video_path = f'lanqiu_{count}.mp4'imageio.mimsave(save_video_path, imgs, fps=fps, macro_block_size=None)imgs=[]key = cv2.waitKey(1) & 0xFFif key == ord('q'):breakelif key == ord('s'):strat_record = Trueprint("start_record", strat_record)elif key == ord('e'):strat_record = Falseprint("end_record", strat_record)
cap.release()
save_video_path = f'lanqiu_{count}.mp4'
imageio.mimsave(save_video_path, imgs, fps=fps, macro_block_size=None)

多线程保存mp4 rtsp

import cv2
import threading
import queue
import time# 参数设置
url = "rtsp://admin:aa123456@192.168.1.64/h264/ch1/main/av_stream"
fps = 30
segment_time = 10  # 每段录制 10 秒
fourcc = cv2.VideoWriter_fourcc(*'mp4v')# 用于保存帧的队列
frame_queue = queue.Queue()
recording = False
stop_signal = False
video_count = 0# 保存线程函数
def save_video_worker():global video_countwhile True:if stop_signal and frame_queue.empty():breakframes = []start_time = time.time()while time.time() - start_time < segment_time:try:frame = frame_queue.get(timeout=1)frames.append(frame)except queue.Empty:continueif frames:h, w = frames[0].shape[:2]video_count += 1save_path = f'lanqiu_{video_count}.mp4'out = cv2.VideoWriter(save_path, fourcc, fps, (w, h))for f in frames:out.write(f)out.release()print(f"[保存完成] {save_path}")# 启动摄像头
cap = cv2.VideoCapture(url)
ret = cap.isOpened()cv2.namedWindow('photo', 0)
cv2.resizeWindow('photo', 1280, 720)# 开启保存线程(一直运行,直到设置 stop_signal)
thread = threading.Thread(target=save_video_worker)
thread.start()while ret:ret, frame = cap.read()if not ret:breakcv2.imshow('photo', frame)key = cv2.waitKey(1) & 0xFFif key == ord('q'):breakelif key == ord('s') and not recording:recording = Trueprint("[开始录制]")elif key == ord('e') and recording:recording = Falseprint("[停止录制]")if recording:frame_queue.put(frame.copy())  # 用 copy 避免线程间冲突cap.release()
stop_signal = True
thread.join()
cv2.destroyAllWindows()

ffmpeg录制mp4

import subprocess
import threading
import queue
import time
import cv2
import numpy as np# === 参数设置 ===
rtsp_url = "rtsp://admin:aa123456@192.168.1.64/h264/ch1/main/av_stream"
width, height = 1280, 720
fps = 25
segment_time = 10  # 每段录制时间(秒)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')recording = False
stop_signal = False
video_count = 0frame_queue = queue.Queue()# === 保存线程函数 ===
def save_video_worker():global video_countwhile not stop_signal or not frame_queue.empty():frames = []start_time = time.time()while time.time() - start_time < segment_time:try:frame = frame_queue.get(timeout=1)frames.append(frame)except queue.Empty:continueif frames:video_count += 1out = cv2.VideoWriter(f'video_segment_{video_count}.mp4', fourcc, fps, (width, height))for f in frames:out.write(f)out.release()print(f"[保存完成] video_segment_{video_count}.mp4")# === 启动 FFmpeg 读取 RTSP ===
ffmpeg_cmd = [r'E:\soft\ffmpeg-7.1.1-full_build\ffmpeg-7.1.1-full_build\bin\ffmpeg.exe','-rtsp_transport', 'tcp','-i', rtsp_url,'-f', 'rawvideo','-pix_fmt', 'bgr24','-vf', f'scale={width}:{height}','-'
]pipe = subprocess.Popen(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8)# === 启动保存线程 ===
thread = threading.Thread(target=save_video_worker)
thread.start()# === 实时显示和按键控制 ===
cv2.namedWindow("photo", 0)
cv2.resizeWindow("photo", width, height)try:while True:raw_frame = pipe.stdout.read(width * height * 3)if not raw_frame:print("视频读取失败,退出")breakframe = np.frombuffer(raw_frame, np.uint8).reshape((height, width, 3))cv2.imshow("photo", frame)key = cv2.waitKey(1) & 0xFFif key == ord('q'):breakelif key == ord('s') and not recording:recording = Trueprint("[开始录制]")elif key == ord('e') and recording:recording = Falseprint("[停止录制]")if recording:frame_queue.put(frame.copy())except KeyboardInterrupt:print("中断退出")# === 清理资源 ===
stop_signal = True
thread.join()
pipe.terminate()
cv2.destroyAllWindows()

http://www.xdnf.cn/news/871489.html

相关文章:

  • Kali Linux 安全工具解析
  • Ros(控制机器人运动)
  • .NET 原生驾驭 AI 新基建实战系列(四):Qdrant ── 实时高效的向量搜索利器
  • JVMTI 在安卓逆向工程中的应用
  • 【Oracle】存储过程
  • 纹理压缩格式优化
  • OpenCV 自带颜色表实现各种滤镜
  • 【Netty源码分析总结】
  • 使用 Unstructured 开源库快速入门指南
  • 机器学习:聚类算法
  • Python爬虫:trafilatura 的详细使用(快速提取正文和评论以及结构,转换为 TXT、CSV 和 XML)
  • Day44打卡 @浙大疏锦行
  • 01-Redis介绍与安装
  • pyqt5 安装失败
  • 查看服务应用是否有跑起来命令
  • 机器学习算法分类
  • 3D旋转动态爱心 - Python创意代码
  • MySQL ACID 面试深度解析:原理、实现与面试实战
  • Pytest+Selenium UI自动化测试实战实例
  • MCP:AI应用的通用接口,如何重塑大模型与外部系统的连接?
  • 小米又开源了,一个多模态大模型 + 一个生不逢时的推理大模型
  • CppCon 2015 学习:Bridging Languages Cross-Platform
  • 循环链表与循环队列的区分与对比
  • 防火墙iptables项目实战
  • Java 二维码
  • ROS2性能狂飙:C++11移动语义‘偷梁换柱’实战
  • CSP严格模式返回不存在的爬虫相关文件
  • C#和C++在编译过程中的文件区分
  • 树莓派上遇到插入耳机后显示“无输入设备”问题
  • 格恩朗椭圆齿轮流量计 精准计量 赋能工业