当前位置: 首页 > news >正文

LCM中间件入门(2):LCM核心实现原理解析

文章目录

        • 一、`good()`函数:LCM实例状态检查的实现原理
          • 1. 实现逻辑
          • 2. 简化代码示例(C语言核心逻辑)
        • 二、`publish()`:向指定channel发送消息的原理
          • 1. 完整流程拆解
          • 2. 简化代码示例(C++核心逻辑)
        • 三、`subscribe()`:接收指定channel消息的原理
          • 1. 完整流程拆解
          • 2. 简化代码示例(C++核心逻辑)
      • 四、整体协同机制总结

LCM的核心功能基于C语言实现(C++接口为其封装),其底层通过 UDP网络通信消息序列化回调管理实现发布-订阅模式。以下从代码原理层面解析关键函数的工作机制。

一、good()函数:LCM实例状态检查的实现原理

good()函数用于判断LCM实例是否初始化成功,其核心是检查LCM内部关键资源的有效性。

1. 实现逻辑

LCM实例(lcm_t结构体)的核心成员包括:

  • 网络套接字(socket_fd):用于收发数据的UDP套接字;
  • 线程状态(thread_running):接收消息的后台线程是否启动;
  • 错误码(error):记录初始化或运行中的错误状态。

good()函数的本质是检查这些成员是否处于“可用状态”:

  • 套接字是否成功创建(socket_fd != -1);
  • 后台线程是否正常运行(针对需要异步接收的模式);
  • 无致命错误(error == 0)。
2. 简化代码示例(C语言核心逻辑)
// LCM实例结构体(简化)
typedef struct {int socket_fd;          // UDP套接字描述符int thread_running;     // 接收线程状态(1=运行,0=停止)int error;              // 错误码(0=无错误)// 其他成员:回调表、组播地址等
} lcm_t;// good()函数实现
int lcm_good(lcm_t *lcm) {return (lcm != NULL && lcm->socket_fd != -1 && lcm->thread_running && lcm->error == 0);
}

在C++接口中,lcm::LCM::good()是对上述C函数的封装,返回bool类型。

二、publish():向指定channel发送消息的原理

publish()的核心是将消息序列化为字节流,并通过UDP组播发送到与channel关联的网络地址,同时在数据包中嵌入channel标识。

1. 完整流程拆解
  1. 消息序列化
    根据.lcm文件生成的编解码函数(如example_temperature_t_pack()),将消息结构体转换为二进制字节流(解决跨平台数据格式差异)。

  2. channel与网络地址映射
    LCM默认将channel名称映射为UDP组播地址(239.255.x.y,其中x.y由channel名称的哈希值计算得出),确保同一channel的消息仅被订阅该channel的节点接收。

  3. 数据包封装
    构造LCM协议数据包,格式为:

    [4字节魔数] + [4字节消息长度] + [channel名称] + [序列化的消息数据]
    

    魔数(0x4C434D00,即"LCM\0")用于接收方识别LCM数据包。

  4. UDP发送
    通过LCM实例的套接字将数据包发送到channel对应的组播地址。

2. 简化代码示例(C++核心逻辑)
// C++ publish()接口
void LCM::publish(const std::string& channel, const void* data, size_t len) {if (!good()) return;  // 检查实例状态// 1. 计算channel对应的组播地址(基于哈希)struct sockaddr_in addr;lcm_channel_to_multicast(channel.c_str(), &addr);  // 内部哈希映射// 2. 封装LCM协议头uint8_t header[8];header[0] = 0x4C; header[1] = 0x43; header[2] = 0x4D; header[3] = 0x00;  // 魔数*(uint32_t*)(header + 4) = htonl(len);  // 消息长度(网络字节序)// 3. 拼接完整数据包:头 + channel + 消息数据std::vector<uint8_t> packet;packet.insert(packet.end(), header, header + 8);packet.insert(packet.end(), channel.begin(), channel.end());packet.push_back('\0');  // channel以空字符结尾packet.insert(packet.end(), (uint8_t*)data, (uint8_t*)data + len);// 4. 通过UDP发送到组播地址sendto(lcm->socket_fd, packet.data(), packet.size(), 0,(struct sockaddr*)&addr, sizeof(addr));
}
三、subscribe():接收指定channel消息的原理

subscribe()的核心是注册回调函数并与channel绑定,后台线程接收数据包后,根据channel查找对应的回调并触发执行。

1. 完整流程拆解
  1. 回调函数注册
    订阅时,LCM将channel名称消息类型回调函数存储在内部的回调表(哈希表,channel -> 回调列表)中。

  2. 后台接收线程
    LCM初始化时启动一个后台线程,循环从套接字读取UDP数据包:

    • 解析数据包,验证魔数和格式;
    • 提取channel名称和序列化的消息数据。
  3. 消息路由与反序列化
    根据解析出的channel名称,在回调表中查找对应的回调函数:

    • 若找到,调用自动生成的反序列化函数(如example_temperature_t_unpack()),将字节流转换为消息结构体;
    • 调用注册的回调函数,传入消息结构体。
  4. 线程安全处理
    回调函数的执行在后台线程中进行,若需在多线程环境中使用,需用户自行添加同步机制(如互斥锁)。

2. 简化代码示例(C++核心逻辑)
// 回调表结构(简化):channel -> 回调函数列表
typedef struct {std::unordered_map<std::string, std::vector<Callback>> callbacks;std::mutex mutex;  // 保护回调表的线程安全
} CallbackTable;// 订阅函数实现
void LCM::subscribe(const std::string& channel, void (*callback)(const ReceiveBuffer*, const std::string&, void*),void* userdata) {std::lock_guard<std::mutex> lock(callback_table.mutex);// 将回调函数注册到channel对应的列表中callback_table.callbacks[channel].emplace_back(callback, userdata);
}// 后台接收线程逻辑
void receive_thread(lcm_t* lcm) {while (lcm->thread_running) {uint8_t buffer[65536];  // UDP最大包长struct sockaddr_in sender;socklen_t sender_len = sizeof(sender);ssize_t n = recvfrom(lcm->socket_fd, buffer, sizeof(buffer), 0,(struct sockaddr*)&sender, &sender_len);if (n <= 0) continue;// 解析数据包:检查魔数、提取channel和消息数据if (buffer[0] != 0x4C || buffer[1] != 0x43 || buffer[2] != 0x4D || buffer[3] != 0x00)continue;  // 非LCM数据包,忽略uint32_t msg_len = ntohl(*(uint32_t*)(buffer + 4));std::string channel = (char*)(buffer + 8);  // channel以空字符结尾const uint8_t* msg_data = buffer + 8 + channel.size() + 1;// 查找回调并执行std::lock_guard<std::mutex> lock(callback_table.mutex);auto it = callback_table.callbacks.find(channel);if (it != callback_table.callbacks.end()) {for (auto& cb : it->second) {// 构造接收缓冲区,调用回调ReceiveBuffer rbuf{msg_data, msg_len};cb.function(&rbuf, channel, cb.userdata);}}}
}

四、整体协同机制总结

工作原理交互图如下:

PublisherLCM_Pub (发布者实例)Network (UDP组播)LCM_Sub (订阅者实例)Subscriber初始化LCM实例创建lcm_t结构体1. 创建UDP套接字2. 启动接收线程3. 初始化错误码为0调用good()返回状态 (socket有效+线程运行+无错误)初始化LCM实例创建lcm_t结构体1. 创建UDP套接字2. 启动接收线程3. 初始化回调表(空)调用good()返回状态 (socket有效+线程运行+无错误)订阅通道THERMOMETER调用subscribe("THERMOMETER", 回调函数)1. 加锁保护回调表2. 将回调函数注册到"THERMOMETER"条目下订阅完成发布消息到通道构造Temperature消息(温度值、时间戳等)调用publish("THERMOMETER", 消息)publish()内部处理1. 消息序列化(调用自动生成的pack函数)2. 计算channel哈希映射到组播地址(239.255.x.y)3. 封装LCM数据包(魔数+长度+channel+序列化数据)通过UDP发送数据包到组播地址接收UDP数据包(含"THERMOMETER"标识)接收线程处理1. 验证魔数和数据包格式2. 提取channel("THERMOMETER")和序列化数据3. 消息反序列化(调用自动生成的unpack函数)4. 查找回调表中"THERMOMETER"对应的回调函数触发回调函数(传入反序列化后的消息)执行回调逻辑(打印温度数据等)PublisherLCM_Pub (发布者实例)Network (UDP组播)LCM_Sub (订阅者实例)Subscriber
  1. 初始化阶段lcm_t实例创建套接字、启动接收线程,`good(
  2. )`验证这些资源是否就绪。
  3. 发布阶段publish()将消息序列化,通过channel映射的组播地址发送UDP包,嵌入channel标识。
  4. 订阅阶段subscribe()将回调注册到channel对应的哈希表;接收线程解析UDP包,根据channel查找回调,反序列化消息后触发执行。

这种设计实现了无中心节点的轻量化通信,通过UDP组播和哈希映射保证低延迟,适用于实时系统中基于channel的高效数据交互。

http://www.xdnf.cn/news/1218439.html

相关文章:

  • InfluxDB 与 Python 框架结合:Django 应用案例(二)
  • kmp复习,需要多看多练
  • Kubernetes 应用部署实战:为什么需要 Kubernetes?
  • InfluxDB 与 Python 框架结合:Django 应用案例(三)
  • Java Matcher对象中find()与matches()的区别
  • QT6 Python UI文件转换PY文件的方法
  • HttpServletRequest 和 HttpServletResponse核心接口区别
  • 哈希的概念及其应用
  • linux线程封装和互斥
  • Flutter Chen Generator - yaml配置使用
  • 了解SQL
  • 从姑苏区人工智能大模型基础设施招标|学习服务器、AI处理器、GPU
  • 【车联网kafka】Kafka核心架构与实战经验(第二篇)
  • 防火墙安全实验
  • 《秋招在即!Redis数据类型面试题解析》
  • Vue3+Vite项目如何简单使用tsx
  • SpringBoot+SpringAI打造智能对话机器人
  • MySQL 8.0 OCP 1Z0-908 题目解析(38)
  • Kafka Streams窗口技术全解析:从理论到电商实时分析实战
  • TTS语音合成|GPT-SoVITS语音合成服务器部署,实现http访问
  • Linux多线程线程控制
  • 前端核心技术Node.js(五)——Mongodb、Mongoose和接口
  • 计算机网络学习(一、Cisco Packet Tracer软件安装)
  • 计算机网络学习--------三次握手与四次挥手
  • diffusion原理和代码延伸笔记1——扩散桥,GOUB,UniDB
  • 【计算机网络】5传输层
  • 网络与信息安全有哪些岗位:(4)应急响应工程师
  • 【网络安全】等级保护2.0解决方案
  • 物联网与AI深度融合,赋能企业多样化物联需求
  • Redis实战(4)-- BitMap结构与使用