在ROS中实现消息通信和服务通信Python
在ROS中实现消息通信和服务通信,需创建功能包并编写相应代码。以下是详细步骤:
1. 创建工作空间和功能包
# 创建并初始化工作空间
mkdir -p ~/catkin_ws/src
cd ~/catkin_ws
catkin_make# 创建功能包(添加必要依赖)
cd src
catkin_create_pkg msg_pkg_demo roscpp rospy std_msgs message_generation message_runtime
2. 创建自定义消息和服务
# 创建msg和srv目录
cd msg_pkg_demo
mkdir msg srv# 创建消息文件 msg/DemoMsg.msg
echo -e "string content\nint32 id" > msg/DemoMsg.msg# 创建服务文件 srv/DemoSrv.srv
echo -e "string request\n---\nstring response" > srv/DemoSrv.srv
3. 修改配置文件
package.xml 添加:
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
CMakeLists.txt 修改:
# 添加message_generation
find_package(catkin REQUIRED COMPONENTSroscpprospystd_msgsmessage_generation
)# 声明消息/服务
add_message_files(FILES DemoMsg.msg)
add_service_files(FILES DemoSrv.srv)# 生成消息
generate_messages(DEPENDENCIES std_msgs)# 添加运行时依赖
catkin_package(CATKIN_DEPENDS message_runtime roscpp rospy
)
4. 编写节点代码
创建 scripts 目录存放Python节点:
mkdir scripts
发布者节点 (scripts/publisher.py)
#!/usr/bin/env python
import rospy
from msg_pkg_demo.msg import DemoMsgdef publisher():pub = rospy.Publisher('demo_topic', DemoMsg, queue_size=10)rospy.init_node('publisher_node', anonymous=True)rate = rospy.Rate(1) # 1Hzmsg_id = 0while not rospy.is_shutdown():msg = DemoMsg()msg.content = "Hello ROS!"msg.id = msg_idpub.publish(msg)rospy.loginfo(f"Published: {msg.content} ID: {msg.id}")msg_id += 1rate.sleep()if __name__ == '__main__':try:publisher()except rospy.ROSInterruptException:pass
订阅者节点 (scripts/subscriber.py)
#!/usr/bin/env python
import rospy
from msg_pkg_demo.msg import DemoMsgdef callback(data):rospy.loginfo(f"Received: {data.content} ID: {data.id}")def subscriber():rospy.init_node('subscriber_node', anonymous=True)rospy.Subscriber('demo_topic', DemoMsg, callback)rospy.spin()if __name__ == '__main__':subscriber()
服务端节点 (scripts/server.py)
#!/usr/bin/env python
import rospy
from msg_pkg_demo.srv import DemoSrv, DemoSrvResponsedef handle_request(req):rospy.loginfo(f"Request: {req.request}")return DemoSrvResponse(f"Processed: {req.request}")def server():rospy.init_node('service_server')s = rospy.Service('demo_service', DemoSrv, handle_request)rospy.loginfo("Service Ready")rospy.spin()if __name__ == '__main__':server()
客户端节点 (scripts/client.py)
#!/usr/bin/env python
import rospy
from msg_pkg_demo.srv import DemoSrvdef service_client():rospy.wait_for_service('demo_service')try:demo_service = rospy.ServiceProxy('demo_service', DemoSrv)resp = demo_service("Test Request")rospy.loginfo(f"Response: {resp.response}")except rospy.ServiceException as e:rospy.logerr(f"Service call failed: {e}")if __name__ == '__main__':rospy.init_node('service_client')service_client()
5. 设置执行权限
chmod +x scripts/*.py
6. 编译并测试
# 编译工作空间
cd ~/catkin_ws
catkin_make# 加载环境
source devel/setup.bash
测试消息通信
终端1:发布者
rosrun msg_pkg_demo publisher.py
终端2:订阅者
rosrun msg_pkg_demo subscriber.py
测试服务通信
终端1:服务端
rosrun msg_pkg_demo server.py
终端2:客户端
rosrun msg_pkg_demo client.py
预期输出
- 发布者终端:每秒输出
Published: Hello ROS! ID: [递增数字]
- 订阅者终端:实时显示接收到的消息内容
- 服务端终端:显示 “Service Ready” 和客户端请求内容
- 客户端终端:显示服务端返回的响应字符串
关键说明
- 消息/服务生成:ROS会在编译时自动生成Python/C++消息代码(位于
devel/lib/python2.7/dist-packages/msg_pkg_demo
) - 依赖关系:
message_generation
:编译时生成消息message_runtime
:运行时依赖
- 通信类型:
- 话题通信:异步发布/订阅(
publisher.py
↔subscriber.py
) - 服务通信:同步请求/响应(
client.py
→server.py
)
- 话题通信:异步发布/订阅(
通过此实现,您已完整演示了ROS中的消息通信(话题)和服务通信机制。可根据需要扩展消息/服务数据结构或节点逻辑。