当前位置: 首页 > web >正文

FastDDS Transport功能模块初步整理

一. 总体结构

在这里插入图片描述

二. 主要类的功能

2.1 TransportDescriptor和TransportInterface

​ FastDDS中整个Transport类的设计遵循的是设计模式中的建造者模式,其中,TransportDescriptor就是建造者,而TransportInterface则是建造出来的产品。

​ TransportDescriptor是抽象类,申明了后续实现类必须实现的最终要的一个函数,也是建造者需要完成的主要工作,就是建造出实际的Transport对象:

struct RTPS_DllAPI TransportDescriptorInterface
{  .../*** Factory method pattern. It will create and return a TransportInterface* corresponding to this descriptor. This provides an interface to the NetworkFactory* to create the transports without the need to know about their type*/virtual TransportInterface* create_transport() const = 0;...
};

​ TransportInterface定义了Transort的接口,所有的Transport都必须实现该接口,根据传输方式的不同有TCPTransportInterface, UDPTransportInterface和SharedMemTransport,而根据TCP/UDP版本的不同,前两者分别有V4和V6两个派生类。
在这里插入图片描述
TransportInterface接口类的初衷是为了在FastRTPS中隔离传输层的实现,用户需要实现基于特定传输层协议(TCP, UDP, SharedMem)的通道和对应地址之间的代码逻辑。TransportInterface接口类中申明了实现类必须实现的一些接口(init,IsInputChannelOpen,IsLocatorSupported, is_locator_allowed, RemoteToMainLocal,transform_remote_locator等),其中最重要的两个接口是OpenOutputChannelOpenInputChannel这两个接口函数,前者会创建SenderResource而后者创建ReceiverResource。

2.2 TCPTransportInterface和UDPTransportInterface

2.2.1 TCPTransportInterface:

​ TCPTransportInterface定义并且实现了基于TCP传输层的通信操作,主要接口都在TCPTransportInterface中,另外还有一个辅助类TCPAcceptor用于实现TCP独有的连接操作,接收客户端的连接。

​ TCPTransoprtInterface中的perform_listen_operation函数实现基于TCP传输层的数据接收操作,send函数实现基于TCP传输层的数据发送操作,在初始化TCPTransportInterface的时候,如果发现TCPTransportDescriptor中定义了listen_port,那么会额外创建TCPAcceptBasic对象:

TCPv4Transport::TCPv4Transport(const TCPv4TransportDescriptor& descriptor): TCPTransportInterface(LOCATOR_KIND_TCPv4), configuration_(descriptor)
{...// 如果TCPv4TransportDescriptor配置了listening_ports监听端口,则创建TCPAcceptorBasicfor (uint16_t port : configuration_.listening_ports){Locator locator(LOCATOR_KIND_TCPv4, port);create_acceptor_socket(locator);}...
}

​ TCPAcceptBasic可以理解为对于listen socket的封装,主要执行listen操作,返回建立连接了的client socket给到TCPTransoprtInterface:

void TCPAcceptorBasic::accept(TCPTransportInterface* parent)
{...acceptor_.async_accept(socket_,[parent, locator, this](const std::error_code& error){if (!error){auto socket = std::make_shared<tcp::socket>(std::move(socket_));  // 返回建立连接的客户端socketparent->SocketAccepted(socket, locator, error); //TCPTransoprtInterface根据该socket创建ChannelResource}});
}
2.2.2 UDPTransportInterface:

​ UDPTRansportInterface的接口结构比TCPTransportInterface要简单很多,并且接口函数也会少许多,因为没有TCP的连接操作,此外,UDPTransportInterface中没有实现perform_listen_operation接口(UDP的perform_listen_operation接口在UDPChannelResource中实现),send函数实现基于UDP传输层的数据发送操作。

2.2.3 OpenInputChannel和OpenOutputChannel

​ 在2.1 TransportInterface中说过,其最重要的接口就是OpenInputChannel和OpenOutputChannel,下面以UDPTransportInterface来看下这两个接口实现了什么功能。

​ 首先,这两个接口的名字中都带有Channel,但是不要被名字误导,认为这两个接口都会和下面的ChannelResource打交道,从代码看,只有InputChannel才会创建ChannelResource,而OutputChannel创建的是SenderResource。

OpenInputChannel:

bool UDPv4Transport::OpenInputChannel(const Locator& locator,TransportReceiverInterface* receiver,uint32_t maxMsgSize) {...if (!IsInputChannelOpen(locator)){success = OpenAndBindInputSockets(locator, receiver, IPLocator::isMulticast(locator), maxMsgSize);}... // 如果是组播地址,下面还会创建额外的ChannelResource,同时将网卡加入组播地址
}bool UDPTransportInterface::OpenAndBindInputSockets(...) {...std::vector<std::string> vInterfaces = get_binding_interfaces_list(); // 获取白名单网卡列表for (std::string sInterface : vInterfaces){   // 在每张网卡上创建InputChannelResource(一般来说用户会通过白名单限制用于DDS通信的网卡,不太会出现在多个网卡上创建的情况)UDPChannelResource* p_channel_resource;p_channel_resource = CreateInputChannelResource(sInterface, locator, is_multicast, maxMsgSize, receiver);mInputSockets[IPLocator::getPhysicalPort(locator)].push_back(p_channel_resource);}...
}UDPChannelResource* UDPTransportInterface::CreateInputChannelResource(...) {// 创建接收数据用的socketeProsimaUDPSocket unicastSocket = OpenAndBindInputSocket(sInterface,IPLocator::getPhysicalPort(locator), is_multicast);// 创建ChannelResource,ChannelResource包装传入的socket,并且对外提供接收数据的统一接口UDPChannelResource* p_channel_resource = new UDPChannelResource(this, unicastSocket, maxMsgSize, locator,sInterface, receiver);return p_channel_resource;
}

OpenOutputChannel:

bool UDPTransportInterface::OpenOutputChannel(SendResourceList& sender_resource_list,   // 函数中创建的SenderResource放在这个list中返回给上层const Locator& locator)
{...if (is_interface_whitelist_empty()) { ... } // 没有配置网卡白名单,会在0.0.0.0地址上创建socket和SenderResourceelse {// 获取网卡列表(需要剔除已经在locator参数上创建SenderResource的网卡)get_unknown_network_interfaces(sender_resource_list, locNames, true);for (const auto& infoIP : locNames){   // 创建用于发送数据的socketeProsimaUDPSocket unicastSocket = OpenAndBindUnicastOutputSocket(generate_endpoint(infoIP.name, port), port);SetSocketOutboundInterface(unicastSocket, infoIP.name);  // 设置多播数据的发送接口...sender_resource_list.emplace_back(  // 创建SenderResource,用于包装上面创建的发送socket的操作接口static_cast<SenderResource*>(new UDPSenderResource(*this, unicastSocket, false, true)));}}...
}

2.3 ChannelResource, TCPChannelResource, UDPChannelResource

​ ChannelResource包装了用于接口数据的socket,并且对外提供相对统一的操作接口(针对不同的传输类型还是有所区别的)
在这里插入图片描述
​ ChannelResource是接口类,主要提供了用于接收数据使用的线程成员thread_以及保存接收到的消息的message_buffer_成员,后续可以看到transportinterface的perform_listen_operation函数就是在这个thread_成员上运行的。

class ChannelResource {...inline void thread(std::thread&& pThread){// 初始化接收数据的线程thread_...}...
};UDPChannelResource::UDPChannelResource(...): ChannelResource(maxMsgSize)...
{	//指定thread_线程运行perform_listen_operation函数thread(std::thread(&UDPChannelResource::perform_listen_operation, this, locator));
}

​ TCPChannelResource和UDPChannelResource两者都包含了用于接收数据使用的socket对象,除此以外,两者的接口差距还是挺大的(TCP和UDP的通信流程本来就有区别),TCPChannelResource的接口比较多,实现也很复杂,并且不属于DDS协议中约定的默认通信方式,因此不需要太关注TCP的视线。

​ 这里主要来看UDPChannelResource的接口,UDPChannelResource的接口很简单,就是perform_listen_operation:

void UDPChannelResource::perform_listen_operation(Locator input_locator) {// set thread name for debugpthread_setname_np(pthread_self(), "UDPChannel");   // thread_的线程名称为UDPChannelLocator remote_locator;while (alive()){// Blocking receive.auto& msg = message_buffer();if (!Receive(msg.buffer, msg.max_size, msg.length, remote_locator)) // Receive函数调用socket的recv_from接收数据(阻塞式){continue;}// 将收到的消息交个MessageReceiver去处理(通过message_receiver接口主导到UDPChannelResource中)...}
}

2.4 SenderResource, UDPSenderResource

​ SenderResource从名字看就是用于发送数据的,SenderResource是接口类,其中最重要的一个接口就是send,send内部调用了send_lambda_这个std::function类型的成员,send_lambda_成员有SenderResource的派生类来实现:
在这里插入图片描述
​ 对于UDPSenderResource来说,send_lambda_函数对象依赖UDPTransportInterface的接口来完成最终的数据发送:

send_lambda_ = [this, &transport](...) -> bool{	// 调用UDPTransportInterface::send接口, 并且将socket对象传入return transport.send(data, dataSize, socket_, destination_locators_begin,destination_locators_end, only_multicast_purpose_, whitelisted_,max_blocking_time_point);};

[!TIP]

读到这段代码的时候,不太理解为什么不在UDPSenderResource的send_lambda_中直接实现send的逻辑代码,反而要依赖UDPTransportInterface来实现。个人理解是不是因为SenderResource这个类主要功能还是体现在Resource上,说明其只是一个用来保管发送socket的资源类而已,个人猜想。

​ RTPSParticipantImpl是SenderResource的容器,保存了其创建的所有SenderResource,在发送数据的时候会调用这些SenderResource的接口:

bool sendSync(...) {...for (auto& send_resource : send_resource_list_)  // send_resource_list_中保存了该Participant创建的所有SenderResource{...send_resource->send(msg->buffer, msg->length, &locators_begin, &locators_end, max_blocking_time_point);}...
}

​ 第三章节中的3.2发送数据中会说明SenderResource的send接口是怎么被调用到的,调用栈是如何的。

2.5 ReceiverResource, MessageReciever

​ ReceiverResource实现TransportReceiverInterface接口,TransportReceiverInterface这个接口的用途从名字看就是用于接收Transport传输层收到数据的接口,其实现了TransoprtReceiverInterface接口中的OnDataReceived函数接口,OnDataReceived函数接口在UDPChannelResource的数据接收接口perform_listen_operation函数中被调用:

void UDPChannelResource::perform_listen_operation(Locator input_locator) {...// Processes the data through the CDR Message interface.if (message_receiver() != nullptr){message_receiver()->OnDataReceived(msg.buffer, msg.length, input_locator, remote_locator);}...
}

​ ReceiverResource的OnDataReceived接口实现非常简单,就是将Transoprt收到的CDRMessage交给MessageReceiver,MessageReceiver会对该CDRMessage进行进一步细化的处理,看过DDS协议文档的应该知道一个完整的DDS Message包括Header和Submessages两部分,其中Submessages中包含不止一种子消息(HeartBeat,GAP,Timestamp,Data, DataFrag等等)。
在这里插入图片描述
而MessageReceiver的作用就是解析ReceiverResource传递过来的CDRMessage,解析其中的Header以及各个子消息,可以看到MessageReceiver中定义了各种proess函数用来处理不同的子消息:
在这里插入图片描述
​ 这些proc函数中最重要的一个函数是process_data_message_without_security(假设没有开启TLS功能,开启的话就是with_security),该函数将Data子消息中的payload交给RTPSReader进行处理:

void MessageReceiver::process_data_message_without_security(const EntityId_t& reader_id,CacheChange_t& change)
{auto process_message = [&change](RTPSReader* reader){reader->processDataMsg(&change);};findAllReaders(reader_id, process_message); // 找到对应EntityID的RTPSReader,调用其processDataMsg处理Data子消息
}

三. 主要流程

3.1 初始化

​ 初始化流程中,RTPSParticipantImpl和各种Endpoint会创建各个SenderResource/ReceiverResource,这里主要梳理一下会创建哪些Resource,在什么时候创建的:
在这里插入图片描述
在这里插入图片描述
​ 对于RecevierResource创建的socket,其端口一般是有固定的计算规则的,根据domainid,participantid以及是地址是否为组播地址可以算出固定的端口。

​ 对于SenderResource创建的socket,则是随机绑定未使用的端口,而且因为每个RTPSWriter知道匹配的Reader信息(保存在ReaderProxy/ReaderLocator中),因此,socket绑定的IP都是本地网卡的IP,然后发送的时候使用sendto发送到Reader的地址和端口上去就行了。

3.2 发送数据

从RTPS层看,发送数据时,我们一般向RTPSWriter索取一个CacheChange,然后将要发送的数据填充到CacheChange的payload中,最后将这个CacheChange加入到WriterHistory中。

​ 从我们将数据填充到CacheChange到通过SenderResource关联的socket发送到对端的大致流程如下:
在这里插入图片描述

3.3 接收数据

​ 接收数据的工作从UDPChannelResource的thread_线程开始的,前面说到过,UDPChannelResource的thread_线程被用来运行perform_listen_operation函数,该函数中调用关联的socket执行receive_from操作,从绑定的Locator上读取数据,读取到的数据通过MessageReceiver,RTPSParticipantImpl最终到达RTPSReader手上:
在这里插入图片描述

http://www.xdnf.cn/news/5823.html

相关文章:

  • 《医院网络安全运营能力成熟度评估指南》(试行版)研究解读
  • Spring Boot 的自动配置为 Spring MVC 做了哪些事情?
  • matlab多智能体网络一致性研究
  • 【C++详解】类和对象(上)类的定义、实例化、this指针
  • C++11 ——右值引用和移动语义
  • 手动硬密封固定式对夹V型球阀:复杂介质工况下的高性价比流体控制方案-耀圣
  • 深度学习基础
  • Kotlin-类和对象
  • Angular | 利用 `ChangeDetectorRef` 解决 Angular 动态显示输入框的聚焦问题
  • Java后端开发day48--反射动态代理
  • 【速写】TRL:Trainer的细节与思考(PPO/DPO+LoRA可行性)
  • 【PostgreSQL数据分析实战:从数据清洗到可视化全流程】金融风控分析案例-10.4 模型部署与定期评估
  • 虹科技术 | 简化汽车零部件测试:LIN/CAN总线设备的按键触发功能实现
  • C/C++内存管理
  • const char* 指向字符串数组和字符串的区别
  • css3基于伸缩盒模型生成一个小案例
  • 华三路由器单臂路由配置
  • 数字IC后端培训教程之数字后端项目典型案例分析
  • Spring Boot 的 CommandLineRunner
  • 【爬虫】12306查票
  • android特许权限调试
  • 特伦斯折叠重锤V70:实现专业演奏,从这里开始
  • DES两种加密模式
  • 普林斯顿数学三剑客读本分析。
  • element ui 实现el-form表单校验不通过时自动滚动到不通过的第一项去
  • 【题解-洛谷】B3881 [信息与未来 2015] 拴奶牛
  • 告别静态配置!Spring Boo动态线程池实战指南:Nacos+Prometheus全链路监控
  • 今日行情明日机会——20250512
  • std::move 和 std::forward
  • 图像的EXIF方向信息(Orientation标签)