当前位置: 首页 > java >正文

openpi π₀ 项目部署运行逻辑(三)——策略推理服务器 serve_policy.py

 π₀ 主控脚本都在 scripts 中:

其中,serve_policy.py 是 openpi 中的策略推理服务端脚本,作用为:启动一个 WebSocket 服务器,加载预训练策略模型,等待外部请求(如来自 main.py 的控制程序),然后执行动作推理并返回结果

一句话总结一下:

将一个 Pi0 策略模型部署为网络服务(WebSocket API),供机器人主控进程远程调用

目录

1 库引用

2 参数定义

3 加载策略模型

4 启动推理服务

5 使用方法总结

5.1 使用方法

5.2 问题分析


1 库引用

import dataclasses  # 用于创建结构化参数对象
import enum          # 用于定义环境枚举类型
import logging       # 用于日志输出
import socket        # 获取本机 IP 地址信息import tyro          # 命令行参数解析库,比 argparse 更现代# 引入策略和策略配置模块
from openpi.policies import policy as _policy
from openpi.policies import policy_config as _policy_config
# 引入 WebSocket 推理服务模块
from openpi.serving import websocket_policy_server
# 引入训练配置模块
from openpi.training import config as _config

2 参数定义

# 定义支持的机器人环境枚举类型
class EnvMode(enum.Enum):ALOHA = "aloha"ALOHA_SIM = "aloha_sim"DROID = "droid"LIBERO = "libero"# 定义用于加载 checkpoint 的参数结构
@dataclasses.dataclass
class Checkpoint:config: str  # 模型对应的训练配置名,如 "pi0_aloha_sim"dir: str     # checkpoint 目录路径(可以是本地或 S3)# 定义默认策略类型占位符结构
@dataclasses.dataclass
class Default:pass  # 使用默认模型配置(例如从 DEFAULT_CHECKPOINT 中选择)# 定义主入口参数结构体
@dataclasses.dataclass
class Args:env: EnvMode = EnvMode.ALOHA_SIM         # 使用的机器人环境default_prompt: str | None = None        # 默认语言提示词port: int = 8000                         # WebSocket 服务端口record: bool = False                     # 是否记录策略输出(用于调试)policy: Checkpoint | Default = dataclasses.field(default_factory=Default)# 定义每个环境对应的默认 checkpoint
DEFAULT_CHECKPOINT: dict[EnvMode, Checkpoint] = {EnvMode.ALOHA: Checkpoint(config="pi0_aloha",dir="s3://openpi-assets/checkpoints/pi0_base",),EnvMode.ALOHA_SIM: Checkpoint(config="pi0_aloha_sim",dir="s3://openpi-assets/checkpoints/pi0_aloha_sim",),EnvMode.DROID: Checkpoint(config="pi0_fast_droid",dir="s3://openpi-assets/checkpoints/pi0_fast_droid",),EnvMode.LIBERO: Checkpoint(config="pi0_fast_libero",dir="s3://openpi-assets/checkpoints/pi0_fast_libero",),
}

其中,class Args 使用 Tyro 来从命令行解析参数,包括以下选项:

  • --env:选择环境,如 ALOHA, DROID(仅在使用默认模型时起作用)
  • --default_prompt:语言提示词(自然语言)
  • --port:监听的端口(默认 8000)
  • --record:是否开启动作记录(用于调试)
  • --policy:加载指定 checkpoint,支持:1. Checkpoint(config="...", dir="...")  2. Default() 使用内置 checkpoint

3 加载策略模型

# 从默认环境创建策略模型
def create_default_policy(env: EnvMode, *, default_prompt: str | None = None) -> _policy.Policy:if checkpoint := DEFAULT_CHECKPOINT.get(env):return _policy_config.create_trained_policy(_config.get_config(checkpoint.config),  # 加载配置checkpoint.dir,                         # 加载参数default_prompt=default_prompt           # 设置默认语言提示)raise ValueError(f"Unsupported environment mode: {env}")# 从命令行参数创建策略模型
def create_policy(args: Args) -> _policy.Policy:match args.policy:case Checkpoint():  # 如果显式指定 checkpointreturn _policy_config.create_trained_policy(_config.get_config(args.policy.config),args.policy.dir,default_prompt=args.default_prompt)case Default():     # 否则使用默认模型return create_default_policy(args.env, default_prompt=args.default_prompt)

分两种情况:

1. 手动指定模型与路径,即增加参数 --policy:

--policy 'Checkpoint(config="pi0_aloha", dir="s3://openpi-assets/checkpoints/pi0_base")'

2. 使用内置默认策略(会从 S3 下载),即不指定参数 --policy:

uv run scripts/serve_policy.py --env ALOHA --default_prompt='fold the towel'

4 启动推理服务

# 主逻辑函数
def main(args: Args) -> None:policy = create_policy(args)            # 加载策略对象policy_metadata = policy.metadata       # 获取策略的元信息if args.record:                         # 如果启用记录,包裹为记录器policy = _policy.PolicyRecorder(policy, "policy_records")# 获取本机 IP 和主机名(日志打印)hostname = socket.gethostname()local_ip = socket.gethostbyname(hostname)logging.info("Creating server (host: %s, ip: %s)", hostname, local_ip)# 创建 WebSocket 推理服务对象server = websocket_policy_server.WebsocketPolicyServer(policy=policy,host="0.0.0.0",               # 监听所有接口port=args.port,               # 指定端口metadata=policy_metadata,    # 附加元信息)server.serve_forever()           # 启动阻塞服务循环# 脚本入口:解析 CLI 参数并运行主函数
if __name__ == "__main__":logging.basicConfig(level=logging.INFO, force=True)  # 初始化日志main(tyro.cli(Args))  # 使用 tyro 从命令行解析 Args 并运行

启动一个 WebSocket 服务端,监听端口,等待来自主控系统(如 main.py)的动作请求,完成如下功能:

  • 接收来自客户端的 observation(图像、语言提示等)
  • 执行 policy.infer(...) 
  • 返回预测的动作结果

5 使用方法总结

5.1 使用方法

✅ 最简单用法(使用默认模型):

uv run scripts/serve_policy.py --env ALOHA --default_prompt='fold the towel'

自动从 S3 下载 pi0_aloha 的默认模型,并运行推理服务器,端口0.0.0.0:8000

✅ 使用本地模型:

uv run scripts/serve_policy.py policy:checkpoint \--policy.config=pi0_aloha \--policy.dir=/home/yejiangchen/.cache/openpi/openpi-assets/checkpoints/pi0_aloha_towel

✅ 启用推理行为记录:

uv run scripts/serve_policy.py --record ...

将推理行为记录下来,便于调试

如果在使用本地模型时出现 bug,运行以下命令查看:

uv run scripts/serve_policy.py policy:checkpoint --help

运行成功可以看到:

可以看到日志:

  • 模型已经被正常加载(包括 JAX/Flax 环境、参数、norm stats)
  • 推理服务已启动,在 0.0.0.0:8000 监听 WebSocket 请求
  • 各种 INFO 日志都是正常的底层库启动信息(比如尝试加载 rocm、tpu,最后还是用的 CUDA/CPU,不影响使用)
  • 检查点参数与归一化参数(norm stats)都已找到并成功加载

5.2 问题分析

可以看到 GPU 启动问题,检查 JAX 是否用上 GPU,输入:

(pi0) yejiangchen@yejiangchen-System-Product-Name:~/Desktop/Codes/openpi-main$ python
Python 3.11.11 (main, Dec 11 2024, 16:28:39) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import jax
>>> print(jax.devices())
[CudaDevice(id=0)]

可以看到:jax.devices() 输出为 [CudaDevice(id=0)],这说明 JAX 在当前环境下已经可以正确识别并调用 GPU(CUDA)。 CUDA 驱动、jax、jaxlib 安装都没有问题

其实只要 jax.devices() 显示 CudaDevice,模型实际计算就会在 GPU 上执行,即便启动日志没高亮显示 gpu 关键字也没关系

JAX/Flax 框架有时不会像 PyTorch 那样在日志里明确输出 “Using CUDA”。它只会在代码第一次涉及到数组/张量运算时,将数据移动到 GPU。如果已经能看到 [CudaDevice(id=0)],说明后续所有计算都会尽量在 GPU 上进行

运行时使用 nvtop 查看 GPU 占用:

安装:

sudo apt install nvtop

使用:

nvtop

可以看到 GPU 基本拉满了

http://www.xdnf.cn/news/8977.html

相关文章:

  • 中小企业AI算力如何选?【显卡租赁】VS【自建服务器】
  • 语音识别——文本转语音
  • 5.26 day 29
  • 论文阅读:Self-Planning Code Generation with Large Language Models
  • AOSP编译错误
  • Linux云计算训练营笔记day16(Linux周期性计划任务、Python)
  • OpenCV (C/C++) 中使用 Sobel 算子进行边缘检测
  • 【读书笔记】《编码:隐匿在计算机软硬件背后的语言》02 门
  • 【杂谈】------使用 __int128 处理超大整数计算
  • Haproxy 基础知识点
  • Halo:一个强大易用的国产开源建站工具
  • kafka实践与C++操作kafka
  • (自用)Java学习-5.14(注册,盐值加密,模糊查询)
  • Vue-模版绑定指令语法/什么是Vue组件
  • 小巧高效的目录索引生成软件
  • 「AR眼镜+智慧应急管理平台+视频联网」——矿山能源数智化转型的“安全之眼”与“效率引擎”
  • ffmpeg转换竖屏(画面是横屏旋转90度的竖屏文件格式)视频到横屏
  • SBT开源构建工具
  • 萤石云实际视频实时接入(生产环境)
  • Milvus分区-分片-段结构详解与最佳实践
  • java写一个简单的冒泡排序
  • 鸿蒙OSUniApp 制作简单的页面跳转与参数传递功能#三方框架 #Uniapp
  • 前端性能优化:如何让网页加载更快?
  • Oracle SHARED POOLRESERVED FREE LIST
  • OWA登录问题分析与解决方案
  • Vite 介绍
  • 【算法提升】牛牛冲钻五 最长无重复子数组 重排字符串 one_day
  • Hive 分桶(Bucketing)深度解析:原理、实战与核心概念对比
  • 水墨色调中国风PPT模版分享
  • 商务风企业公司推广培训计划PPT模版分享