【ROS2】 核心概念5——服务(service)
主要参考:https://book.guyuehome.com/ROS/2.核心概念/2.4_话题/
官方文档: https://docs.ros.org/en/humble/Tutorials/Services/Understanding-ROS2-Services.html
话题和服务是ROS中最为常用的两种数据通信方法,
话题(topic)适合传感器、控制指令等周期性、单向传输的数据
服务(service)适合一问一答,同步性要求更高,比如获取机器视觉识别到的目标位置
一、ROS2的简介及相关文章
ROS的安装:【ROS2】机器人操作系统安装到Ubuntu22.04简介(手动
工程空间:(workspace): 【ROS2 】核心概念1——工作空间(workspace)-CSDN博客
功能包(Package):解耦的功能代码,包含节点、接口、配置等 【ROS2】 核心概念2——功能包package_ros功能包概念-CSDN博客
节点(Node):可位于不同计算机、不同语言、可独立运行进程 【ROS2】核心概念3——节点(node)
话题(Topic):节点间异步传递数据的“桥梁”,基于发布/订阅模型(适合传感器、周期性指令数据流)。【ROS2】核心概念4——话题(node)-CSDN博客
服务(Service):同步的“你问我答”机制,适用于需即时响应的短任务(如开关控制)。
动作(Action):复杂行为的“流程管理”,支持长时间任务、反馈与取消(如导航到目标点)。
通信接口(Interface):数据传递的“标准结构”,定义消息(.msg)、服务(.srv)、动作(.action)格式。
参数(Parameter):机器人系统的“全局字典”,动态键值对配置(如超时时间、IP地址)。
分布式通信(Distributed Communication):跨多计算平台的“任务分配”,支持机器人集群协同工作。
DDS(Data Distribution Service):ROS2底层的“神经网络”,实现实时、可靠的通信中间件
二、ROS2代码
2.1 创建工程空间,任意路径下
mkdir -p service_ws/src cd service_ws/src
2.2 创建功能包,python形式
ros2 pkg create --build-type ament_python learn_service_py
2.3 服务端代码
实现2个通信,一个是话题订阅usb相机的,接收到图片信息后,检测红色并进行坐标计算
一个是创建一个服务,当访问时返回一个坐标(x,y),如果没有图像,返回0,0
detect_service.py
你的路径/service_ws/src/learn_service_py/learn_service_py/detect_service.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from sensor_msgs.msg import Image # 图像消息类型
import numpy as np # Python数值计算库
from cv_bridge import CvBridge # ROS与OpenCV图像转换类
import cv2 # Opencv图像处理库
from std_srvs.srv import Trigger # 自定义的服务接口lower_red = np.array([0, 90, 128]) # 红色的HSV阈值下限
upper_red = np.array([180, 255, 255]) # 红色的HSV阈值上限class ImageSubscriber(Node):def __init__(self, name):super().__init__(name) # ROS2节点父类初始化self.sub = self.create_subscription(Image, 'image_raw', self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)self.cv_bridge = CvBridge() # 创建一个图像转换对象,用于OpenCV图像与ROS的图像消息的互相转换self.srv = self.create_service(Trigger, # 创建服务器对象(接口类型、服务名、服务器回调函数)'get_target_position',self.object_position_callback) self.objectX = 0self.objectY = 0 def object_detect(self, image):hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 图像从BGR颜色模型转换为HSV模型mask_red = cv2.inRange(hsv_img, lower_red, upper_red) # 图像二值化contours, hierarchy = cv2.findContours(mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) # 图像中轮廓检测for cnt in contours: # 去除一些轮廓面积太小的噪声if cnt.shape[0] < 150:continue(x, y, w, h) = cv2.boundingRect(cnt) # 得到苹果所在轮廓的左上角xy像素坐标及轮廓范围的宽和高cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2) # 将苹果的轮廓勾勒出来cv2.circle(image, (int(x+w/2), int(y+h/2)), 5,(0, 255, 0), -1) # 将苹果的图像中心点画出来self.objectX = int(x+w/2)self.objectY = int(y+h/2)cv2.imshow("object", image) # 使用OpenCV显示处理后的图像效果cv2.waitKey(50)# 表示收到订阅消息后的回调函数def listener_callback(self, data):self.get_logger().info('Receiving video frame') # 输出日志信息,提示已进入回调函数image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8') # 将ROS的图像消息转化成OpenCV图像self.object_detect(image) # 苹果检测def object_position_callback(self, request, response): # 创建回调函数,执行收到请求后对数据的处理# 填充响应数据response.success = Trueresponse.message = f"x:{self.objectX},y:{self.objectY}" # 将坐标编码到message字段self.get_logger().info('Object position\nx: %d y: %d' %(self.objectX , self.objectX )) # 输出日志信息,提示已经反馈return responsedef main(args=None): # ROS2节点主入口main函数rclpy.init(args=args) # ROS2 Python接口初始化node = ImageSubscriber("service_object_server") # 创建ROS2节点对象并进行初始化rclpy.spin(node) # 循环等待ROS2退出node.destroy_node() # 销毁节点对象rclpy.shutdown() # 关闭ROS2 Python接口
2.4 客户端代码
detect_client.py
你的路径/service_ws/src/learn_service_py/learn_service_py/detect_client.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_srvs.srv import Triggerclass ObjectDetectionClient(Node):def __init__(self):super().__init__('object_detection_client')self.cli = self.create_client(Trigger, 'get_target_position')while not self.cli.wait_for_service(timeout_sec=1.0):self.get_logger().info('等待服务上线...')self.req = Trigger.Request()def send_request(self):# Trigger接口无需设置请求参数self.future = self.cli.call_async(self.req)self.get_logger().info("已发送检测请求")def main(args=None):rclpy.init(args=args)client = ObjectDetectionClient()client.send_request()while rclpy.ok():rclpy.spin_once(client)if client.future.done():try:response = client.future.result()if response.success:# 解析message字段中的坐标coords = response.message.split(',')x = int(coords[0].split(':')[1])y = int(coords[1].split(':')[1])client.get_logger().info(f"目标坐标解析结果: X={x}, Y={y}")else:client.get_logger().error("目标检测失败")except Exception as e:client.get_logger().error(f"服务调用异常: {str(e)}")breakclient.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
2.5 配置代码接口
service_ws/src/learn_service_py/setup.py
service_detect_srv ,service_detect_clt就是我们最后执行节点的名字
entry_points={'console_scripts': ['service_detect_srv=learn_service_py.detect_service:main','service_detect_clt=learn_service_py.detect_client:main' , ],},
2.6 编译与环境
cd service_ws
colcon build
路径临时添加到环境变量(每新开一个终端都需要)
cd service_ws
source install/setup.bash
2.7 运行节点
如果有usb摄像头,可以安装统一的驱动
sudo apt install ros-humble-usb-cam
conda deactivate #如果存在虚拟环境的话
cd /home/zengxy/code/ROS2/dev_ws
source install/setup.bashros2 run usb_cam usb_cam_node_exe # 要有摄像头 # sudo apt install ros-humble-usb-cam
ros2 run learn_service_py service_detect_srv
ros2 run learn_service_py service_detect_clt # 如果无usb启动得到的是0.0
结果
服务端,在usb启动后
客户端请求一次的效果