当前位置: 首页 > ai >正文

使用 ROS2 构建客户端-服务器通信:一个简单的计算器示例

本指南将详细介绍如何创建两个 ROS2 Python 节点(一个客户端和一个服务器),通过发布/订阅机制实现相互通信,完成数字计算请求与结果返回的功能。我们将使用 std_msgs/String 消息类型来传递数据。

1. 创建 ROS2 Python 包

首先,使用 ros2 pkg create 命令创建两个独立的 Python 包,分别用于客户端和服务器。

# 创建客户端包
ros2 pkg create calculator_client_py \--build-type ament_python \--dependencies rclpy \--node-name calculator_client_node# 创建服务器包
ros2 pkg create calculator_server_py \--build-type ament_python \--dependencies rclpy \--node-name calculator_server_node

命令选项说明:

  • --build-type ament_python:指定该包为 Python 包,并使用 ament 构建系统。
  • --dependencies rclpy:声明依赖 rclpy 库,这是 Python 版本的 ROS2 客户端库。
  • --node-name:在包内创建一个名为 calculator_client_node.pycalculator_server_node.py 的基础节点文件。
2. 实现客户端节点 (calculator_client_node.py)

客户端负责接收用户输入,将计算请求(两个数字和一个运算符)打包成字符串消息,并发布到 calculation_request 话题。同时,它订阅 calculation_response 话题以接收服务器的计算结果。

[calculator_client_node.py]
#!/usr/bin/env python3import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass CalculatorClient(Node):def __init__(self):super().__init__('calculator_client')# 创建发布者,发布请求到 'calculation_request' 主题self.publisher_ = self.create_publisher(String, 'calculation_request', 10)# 创建订阅者,订阅来自 'calculation_response' 主题的响应self.subscription = self.create_subscription(String,'calculation_response',self.listener_callback,10)self.subscription  # 防止未使用变量警告def listener_callback(self, msg):# 回调函数:当收到计算结果时被调用self.get_logger().info(f'Received result: {msg.data}')def send_request(self, num1, operator, num2):# 发送请求函数msg = String()# 格式: "num1,operator,num2"msg.data = f"{num1},{operator},{num2}"self.publisher_.publish(msg)self.get_logger().info(f'Sent request: {num1} {operator} {num2}')def main(args=None):rclpy.init(args=args)calculator_client = CalculatorClient()print("Supported operations: +, -, *, /")try:while rclpy.ok():# 获取用户输入user_input = input("Enter calculation (e.g., '3 + 5') or 'exit' to quit: ")if user_input.lower() == 'exit':breaktry:parts = user_input.split()if len(parts) != 3:print("Invalid input format. Please use 'number operator number'.")continuenum1_str, operator, num2_str = parts# 验证运算符if operator not in ['+', '-', '*', '/']:print(f"Unsupported operator '{operator}'. Please use +, -, *, or /.")continue# 尝试转换为浮点数num1 = float(num1_str)num2 = float(num2_str)# 发送请求calculator_client.send_request(num1, operator, num2)except ValueError:print("Invalid input. Please ensure numbers are valid.")continue# 旋转一次以处理可能的回调rclpy.spin_once(calculator_client, timeout_sec=0.1)except KeyboardInterrupt:print("\nShutting down client...")finally:calculator_client.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
3. 实现服务器节点 (calculator_server_node.py)

服务器节点订阅 calculation_request 话题。当收到请求时,它解析消息,执行计算,并将结果(或错误信息)发布到 calculation_response 话题。

[calculator_server_node.py]
#!/usr/bin/env python3import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass CalculatorServer(Node):def __init__(self):super().__init__('calculator_server')# 创建订阅者,订阅 'calculation_request' 主题的请求self.subscription = self.create_subscription(String,'calculation_request',self.listener_callback,10)self.subscription  # 防止未使用变量警告# 创建发布者,发布响应到 'calculation_response' 主题self.publisher_ = self.create_publisher(String, 'calculation_response', 10)def listener_callback(self, msg):# 回调函数:当收到计算请求时被调用try:# 解析收到的数据 "num1,operator,num2"data_str = msg.datanum1_str, operator, num2_str = data_str.split(',')num1 = float(num1_str)num2 = float(num2_str)# 执行计算if operator == '+':result = num1 + num2elif operator == '-':result = num1 - num2elif operator == '*':result = num1 * num2elif operator == '/':if num2 == 0:raise ZeroDivisionError("Division by zero is not allowed.")result = num1 / num2else:# 这在客户端已经检查过,但作为服务端健壮性考虑raise ValueError(f"Unsupported operator: {operator}")# 准备并发布响应response_msg = String()response_msg.data = f"{num1} {operator} {num2} = {result}"self.publisher_.publish(response_msg)self.get_logger().info(f'Processed request: {num1} {operator} {num2} = {result}')except ValueError as e:# 处理解析错误或不支持的运算符error_msg = String()error_msg.data = f"Error: Invalid request data '{msg.data}'. Details: {e}"self.publisher_.publish(error_msg)self.get_logger().warn(f'Failed to process request: {msg.data}. Error: {e}')except ZeroDivisionError as e:# 处理除零错误error_msg = String()error_msg.data = f"Error: {e}"self.publisher_.publish(error_msg)self.get_logger().warn(f'Calculation error for request: {msg.data}. Error: {e}')except Exception as e: # 捕获其他未预期的错误error_msg = String()error_msg.data = f"Unexpected error occurred: {e}"self.publisher_.publish(error_msg)self.get_logger().error(f'Unexpected error processing request: {msg.data}. Error: {e}')def main(args=None):rclpy.init(args=args)calculator_server = CalculatorServer()try:print("Calculator server is running... Waiting for requests.")# 保持节点运行,等待消息rclpy.spin(calculator_server)except KeyboardInterrupt:print("\nShutting down server...")finally:calculator_server.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()

Additional Notes:

  • The created node will need manual modification to implement the actual calculator service logic
  • After creation, you would typically:
    1. Edit the node file to implement service callbacks
    2. Add message/service dependencies if needed
    3. Build with colcon build
    4. Source the workspace
    5. Run with ros2 run calculator_server_py calculator_server_node
4. 构建与运行
  1. 构建包

colcon build --packages-select calculator_client_py calculator_server_py
  • 激活环境

source install/setup.bash
  • 启动节点

    • 在一个终端中启动服务器(必须先启动):
ros2 run calculator_server_py calculator_server_node
  • 在另一个终端中启动客户端
    • ros2 run calculator_client_py calculator_client_node
  • 交互: 在客户端终端输入类似 3 + 5 的表达式,服务器会计算并将结果返回,客户端会显示结果。

启动客户端Node

启动服务端Node

http://www.xdnf.cn/news/18822.html

相关文章:

  • 储能变流器学习之MPPT
  • 汽车盲点检测系统的网络安全分析和设计
  • k8s-容器化部署论坛和商城服务
  • 开源 | 推荐一套企业级开源AI人工智能训练推理平台(数算岛):完整代码包含多租户、分布式训练、模型市场、多框架支持、边缘端适配、云边协同协议:
  • PMP项目管理知识点-⑮预测型项目概念辨析
  • Web 自动化测试常用函数实战(一)
  • Unity自定义Inspector面板之使用多选框模拟单选框
  • 测试分类(超详解)
  • vue拖动排序,vue使用 HTML5 的draggable拖放 API实现内容拖并排序,并更新数组数据
  • 基于SpringBoot的社区儿童疫苗接种预约系统设计与实现(代码+数据库+LW)
  • 【高级机器学习】3. Convex Optimisation
  • 无限长直导线周围电场分布的MATLAB
  • 【MATLAB例程】二维平面上的多目标TOA定位,目标和TOA基站的数量、位置可自行设置。附代码下载链接
  • 浅谈Elasticsearch数据写入流程的refresh和flush操作
  • ICDE 2025 | 包含OPTIONAL和UNION表达式的SPARQL查询的高效执行方法
  • 硬件开发_基于物联网的儿童座椅系统
  • 3.【鸿蒙应用开发实战: 从入门到精通】开发入门 Hello World
  • 7、prefix-tuning、P-tuning、Prompt-tuning
  • 基于数据安全的旅游民宿租赁系统
  • 音频时长裁剪工具:高效处理音频,让内容创作更轻松
  • docker 所有常用命令,配上思维导图,加图表显示
  • 配送算法16 A Deep Reinforcement Learning Approach for the Meal Delivery Problem
  • 【Linux】用户与用户组管理
  • 【C语言强化训练16天】--从基础到进阶的蜕变之旅:Day14
  • 蓝桥杯算法之基础知识(3)——Python的idle的快捷键设置(idle改键)
  • OpenCV实战1.信用卡数字识别
  • 极简风格PDF格式转换解决方案
  • 人工智能安全地图:将人工智能漏洞与现实世界的影响联系起来
  • Linux 系统核心调优:CPU、磁盘 I/O、网络与内核参数实战
  • Java全栈开发面试实录:从基础到实战的深度探索