网络缓冲区
用户态网络缓冲区
- 网络缓冲区原理
- 为什么需要用户态网络缓冲区
- Linux下如何接收和发送数据包
- 用户态网络缓冲区设计的本质
- 网络缓冲区代码实现
网络缓冲区原理
为什么需要用户态网络缓冲区
在网络开发中,我们经常使用到read/write/recv/send
等系统调用接口,我们需要理解这些函数的本质还是个拷贝函数,我们以read
和write
为例,他们其实就是将数据从用户空间拷贝到内核空间当中,内核态中是存在接收缓冲区和发送缓冲区的。
接下来我们来看read
和write
系统调用函数的含义:
ssize_t read(int fd, void *buf, size_t count);ssize_t write(int fd, const void *buf, size_t count);
对于read
和write
函数来说,都是有返回值的,返回值就代表我们实际上拷贝的数量,也就是我当前需要写入到内核缓冲区当中的数量,而参数之一的count
所代表的就是预估的一个拷贝的数量。
当前我们就需要思考一个问题,用户态的情况下,我们是不知道对应内核态的缓冲区有多大的,我们怎么能保证我们所需要拷贝的数据一次性就能拷贝过去而不是分好几次进行拷贝的呢?如果一次性拷贝不完剩下的数据不就会被丢掉吗,显然是不行的,所以在这儿我们就需要去设置一个用户态缓冲区来保存这些数据,保证其在没有被完整拷贝之前不会被丢弃掉,这也是用户态缓冲区所需要设置的一个重要的原因之一。
Linux下如何接收和发送数据包
我们都知道,网络通信是围绕着整个网络通信协议栈的:
在我们用户态看来,数据包就是 data,在 TCP 协议栈当中,是以 segment 表示,IP 协议中以 packet 表示,MAC 当中以 frame 表示,整个协议栈中,数据包都是以 sk_buffer 来进行流转的,协议栈也只会去识别对应的 sk_buffer。
网络数据其实整个流转流程也是在上图这样一个状态下进行流转的,首先我们来看一下接收数据包的流程:
- 网卡接收到数据报,通过 DMA 将数据包写入到内存(ringbuffer结构当中);
- 网卡向 CPU 发起硬件中断,CPU 收到硬件中断请求,根据中断表查找中断处理函数,调用中断处理函数;
- 中断处理函数将屏蔽硬件中断,发起软件中断(硬件中断是一个线程在执行,不能长时间被占用,避免 CPU 频繁被网卡中断,这儿需要使用软件中断处理耗时操作,避免执行时间过长,CPU 无法响应其他的硬件中断);
- 内核专门线程负责软件中断,从 ringbuffer 当中将数据取出到 sk_buffer 当中(注意,这个是循环操作,直到 ringbuffer 中没有数据);
- 从帧头取出 IP 协议,判断是 IPV4 还是 IPV6 ,去掉帧头帧尾;
- 从 IP 头中看出上一层是 TCP 协议还是 UDP 协议,根据五元组或者是 fd 找到对应的 socket ,将数据提取出来放到对应的 socket 接收缓冲区当中,软件中断处理结束以后开启硬件中断;
- 应用程序通过调用系统调用函数将接收缓冲区当中的数据拷贝到用户的缓冲区当中。
在了解发送数据包的流程的流程时我们需要思考一个问题,UDP/TCP 协议的缓冲区是否一致?
我们要知道对于 UDP 协议来说,他是面向数据报的一种协议,也就是说用户态下发送一个数据包,有多大使用 UDP 协议就会发多大,如果超过对应的长度 UDP 协议就会丢掉多余的部分,也不会重传,这也就意味着 UDP 协议其实是用不到发送缓冲区的,我直接发送原始的数据包即可。但是接收缓冲区却是必不可少的,因为接收数据的过程中我们可能存在一次性接收不完的情况发生,对应的数据就需要先被暂存下来。
再来看一下发送数据包的流程:
- 用户态下调用系统调用函数将数据拷贝到 sk_buffer 当中并且将数据放到 socket 的发送缓冲区当中(TCP);
- 网络协议栈从 socket 的发送缓冲区当中取出 sk_buffer 并且会克隆一个新的 sk_buffer(TCP是支持重传机制的,克隆就是为了保证可以进行重传);
- 根据协议栈向下进行传递,一次增加 TCP/UCP 头部,IP 头部,MAC帧头,帧尾(TCP会进行分段,IP 会进行分片(TCP/UDP 都会));
- 触发软件中断通知网卡驱动程序,有新的数据包需要进行发送;
- 网卡驱动程序依次从发送队列中取出数据 sk_buffer 放到 ringbuffer 当中(内存 DMA 区域,网卡读到);
- 触发网卡发送,发送成功,触发硬件中断,释放掉对应的 ringbuffer 和 sk_buffer(TCP 是克隆的,UDP 是原始的);
- 当收到 TCP 的 ACK 应答以后,就会释放掉原始的 sk_buffer。
对于 TCP 协议来说,他的发送缓冲区是分段设计的,可以参考一下之前的一片文章TCP协议详解,我们在了解了网络数据包的接收和发送原理以后,我们再回来看发送缓冲区与接收缓冲区。
用户态网络缓冲区设计的本质
发送缓冲区
对于发送缓冲区来说,我们可以理解为生产者与消费者速度不一致的问题,生产者生产数据的速度如果大于消费者消费的速度,我们就需要保证生产者发送的数据被接收到,那我们就需要一个缓冲区将数据先保存下来,等待对端对数据进行处理。
另一个解决的问题就是用户态本身不会知道内核中缓冲区有多大,并不是一次将数据都发送完毕,此时就需要预先将数据存储起来,缓存那些没有被发送出去的数据。
接收缓冲区
接收缓冲区同样也是要去解决掉生产者的速度大于消费者速度的问题,跟发送缓冲区一致,而另一个要解决的问题就是粘包问题。
为什么会出现粘包问题?
对于用户态来说,从内核的接收缓冲区当中读取到的数据是不确定的,我们不能保证他就是一个完整的包,他可能是半个包,也可能是一个半的包,如果是半个包,我们就需要先将这个数据包保存下来,等到读取到一个完整包数据以后在进行处理,如果是一个半包的数据,就需要优先去处理一个完整包的数据,将剩下半个包的数据暂存下来,基于这种考虑,就需要用到我们的用户态接收缓冲区。
如何解决粘包问题?
解决粘包问题有两种方式:
- 我们程序员自己去制定一套规则对数据包进行处理,比如说用特殊分隔符界定数据包(
\r\n
),我们再读取到这个数据包的时候,如果读取到的是\r\n
,就证明他之前的数据是一个完整的数据包,此时就进行处理即可; - 用长度去界定数据包,我们可以让一个数据包的头部分配两个字节去保存一个完整的数据包的长度,我们在读取数据的过程中只读取这个长度的数据包,然后进行处理,就保证了我们处理的是一个完整的数据包。
网络缓冲区代码实现
实现一个用户态的网路缓冲区,我们首先需要考虑什么样的数据结构最为合适,第一种就是定长数组,固定长度。
如果使用定长数组的话,会存在的问题就在于:
- 空间大小不确定,会出现分分配空间不足或者是分配的空间太大了,导致空间浪费的现象发生;
- 会频繁的进行数据的腾挪,因为我们读取到一个完整的数据包以后,就需要将剩下的数据腾挪首部的位置,保证下一次的数据读取。
接下来我们可以考虑 ringbuffer 这种环形队列结构:
对于这种结构来说:
- 解决了数据腾挪的问题,因为他是循环的结构,但是他也是固定大小,伸缩性也会比较差,而且还会出现数据离散性的问题。
对于离散性,我们可以只用系统调用函数readv/writev
去解决掉,这两个函数的作用就是用于将多个非连续的内存缓冲区中的数据一次性写入文件描述符,解决掉数据不连续我们依然可以读取到一个 buffer 中的问题。
对于伸缩性,我们可以使用 STL 容器中的 vector 来进行实现,他是可以进行扩容的,那么最终的一个数据结构就是一个 vector 加上 head 与 tail 两个索引来进行设计。
#ifndef __MESSAGE_BUFFER__
#define __MESSAGE_BUFFER__#include <bits/types/struct_iovec.h>
#include <stdint.h>
#include <vector>
#include <cstring>
#include <sys/uio.h>
#include <errno.h>class MessageBuffer
{
public:MessageBuffer() : rpos_(0), wpos_(0){buffer_.resize(4096);}explicit MessageBuffer(std::size_t size) : rpos_(0), wpos_(0){buffer_.resize(size);}// 允许移动构造MessageBuffer(MessageBuffer &&other) noexcept: buffer_(std::move(other.buffer_)), rpos_(other.rpos_), wpos_(other.wpos_){other.rpos_ = 0;other.wpos_ = 0;}// 移动赋值MessageBuffer &operator=(MessageBuffer &&other) noexcept{if (this != &other){buffer_ = std::move(other.buffer_);wpos_ = other.wpos_;rpos_ = other.rpos_;other.wpos_ = 0;other.rpos_ = 0;}return *this;}// 获取头指针uint8_t* GetBasePointer(){return buffer_.data();}// 获取读指针uint8_t* GetReadPointer(){return buffer_.data() + rpos_;}// 获取写指针uint8_t* GetWritePointer(){return buffer_.data() + wpos_;}// 移动读的下标void ReadCompleted(std::size_t size){rpos_ += size;}// 移动写的下标void WriteCompleted(std::size_t size){wpos_ += size;}// 有效数据长度std::size_t GetActiveSize() const{return wpos_ - rpos_;}// 当前空闲空间,不需要腾挪数据std::size_t GetFreeSize() const{return buffer_.size() - wpos_;}// 整个buffer的大小std::size_t GetBufferSize() const{return buffer_.size();}// 腾挪数据void NormalSize(){if (rpos_ > 0) {std::memmove(buffer_.data(), buffer_.data() + rpos_, GetActiveSize());wpos_ -= rpos_;rpos_ = 0;}}// 确定当前空间是否足够,尽可能的不去进行扩容和腾挪数据void EnsureSpace(std::size_t size){if (GetBufferSize() - GetActiveSize() < size) {buffer_.resize(buffer_.size() + std::max(size, buffer_.size() / 2));NormalSize();}else if (GetFreeSize() < size) {NormalSize();}}// 写进用户态缓冲区void Write(const uint8_t* data, std::size_t size){if (size > 0){EnsureSpace(size);std::memcpy(GetWritePointer(), data, size);WriteCompleted(size);}}// 获取到所有的数据std::pair<uint8_t*, std::size_t> GetAllData(){return {GetReadPointer(), GetActiveSize()};}// 获取第一个 \r\n 之前的数据的指针和大小(若未找到返回nullptr和0)std::pair<uint8_t *, std::size_t> GetDataUntilCRLF(){uint8_t* data = GetReadPointer();std::size_t active_size = GetActiveSize();for(size_t i = 0; i < active_size - 1; i++){if(data[i] == '\r' && data[i + 1] == '\n'){return {data, i};}}return {nullptr, 0};}// linux reactor readv// 1. 尽可能的不腾挪数据// 2. 避免了每次都从栈上拷贝到堆上int Recv(int fd, int* err){char extra[65535]; // UDP最大发送长度,大于这个长度需要在应用自己分层struct iovec iov[2];iov[0].iov_base = GetWritePointer();iov[0].iov_len = GetFreeSize();iov[1].iov_base = extra;iov[1].iov_len = 65535;// 通过readv读去离散型数据int n = readv(fd, iov, 2);if (n < 0) {*err = errno;return n;} else if (n == 0) {*err = ECONNRESET;return 0;} else if (n < GetFreeSize()) {WriteCompleted(n);return n;} else {std::size_t extra_size = n - GetFreeSize();WriteCompleted(GetFreeSize());Write(reinterpret_cast<uint8_t*>(extra), extra_size);return n;}}/*char buffer[65535];int n = read(fd, buffer, 65535);if (n == 0) {// 断开连接} else if (n < 0) (// ETif (errno == EINTR){}if (errno == EAGAIN I| errno == EWOULDBLOCK){//读取数据时没有数据可读}else {// 发生错误}else {//读取到数据Write(buffer, n);*/MessageBuffer(const MessageBuffer &) = delete;MessageBuffer &operator=(const MessageBuffer &) = delete;private:std::vector<uint8_t> buffer_;std::size_t rpos_;std::size_t wpos_;
};#endif
注意:
- 我们当前的设计当中,应该尽可能的去保证数据不进行腾挪和扩容,这个也是会产生消耗的;
- 我们从内核的接收缓冲区当中读取数据时,一般情况下都会有一个操作,就是将对应的数据拷贝到我们的栈上,然后在读到对应的用户态缓冲区当中,这相当于是进行了两次数据拷贝,在我们的设计当中,使用了
readv
函数,支持离散性数据拷贝,避免了两次数据拷贝情况的发生,也保证了尽量不去腾挪数据的情况。
注意,我们这儿所谈到的缓冲区是用户态网络缓冲区,跟内核的网络缓冲区是存在区别的,这两个概念是不可以进行混淆的。