linux网络编程之读缓冲区设计
目录
一、设计动机
二、缓冲区结构设计
三、核心组件
1. 成员变量
2. 构造与移动语义
3. 指针访问接口
4. 大小访问
5. 读写指针位置更新
6. 内存整理
7. 扩容策略
8. 写入、访问接口
9. 高效 readv() 封装
四、使用方式
五、完整代码
在高并发、高吞吐的网络服务器开发中,缓冲区(Buffer) 的设计直接影响到 I/O 的效率、内存管理的开销和整体系统性能。本文以一个高性能的读缓冲区实现 MessageBuffer
为例,详细介绍其设计理念、内存管理策略、API 设计及其在 Reactor 模型下的实际应用。
一、设计动机
在典型的基于 Reactor 模型 的 TCP 服务端架构中,I/O 操作必须是非阻塞的,并且应该尽可能减少系统调用次数与内存拷贝。
二、缓冲区结构设计
缓冲区整体结构如下:
rpos_
:读指针,指向当前有效数据的起始位置;
wpos_
:写指针,指向当前可写位置;
rpos_ ~ wpos_
:为有效数据区域;
wpos_ ~ buffer_.size()
:为空闲可写区域;
三、核心组件
1. 成员变量
std::vector<uint8_t> buffer_; // 内部缓冲区存储
std::size_t rpos_; // 当前读取位置
std::size_t wpos_; // 当前写入位置
使用 std::vector<uint8_t> 作为底层容器,便于动态扩容并具备良好的内存局部性。
2. 构造与移动语义
MessageBuffer(); // 默认构造,初始大小 4KB
explicit MessageBuffer(std::size_t size); // 指定初始大小
MessageBuffer(const MessageBuffer &) = delete;
MessageBuffer &operator=(const MessageBuffer &) = delete;
禁止拷贝,防止误用带来大数据开销;
支持移动构造与移动赋值,适用于高性能场景中的资源转移。
3. 指针访问接口
uint8_t *GetBasePointer();
uint8_t *GetReadPointer();
uint8_t *GetWritePointer();
提供裸指针访问,方便 readv()
等系统调用直接使用,不做额外封装,避免损失性能。
4. 大小访问
std::size_t GetActiveSize(); // 有效数据大小
std::size_t GetFreeSize(); // 可写区域大小
std::size_t GetBufferSize(); // 整体缓冲区大小
用于判断是否需要整理或扩容。
5. 读写指针位置更新
void ReadCompleted(std::size_t size);
void WriteCompleted(std::size_t size);
读写后更新
rpos_
和wpos_
,便于精确控制缓冲区状态。
6. 内存整理
void Normalize();
当
rpos_ > 0
且有空间碎片时,进行前移整理,把有效数据移动到起始位置
7. 扩容策略
void EnsureFreeSpace(std::size_t size);
确保有足够的空闲空间写入数据,逻辑:
若剩余空间不足,先 Normalize;
若整理后仍不足,再扩容;
8. 写入、访问接口
void Write(const uint8_t *data, std::size_t size);
std::pair<uint8_t *, std::size_t> GetAllData();
.高效写入接口,封装空间检查、内存拷贝与写指针推进逻辑。
.获取所有有效数据,返回读指针位置和有效数据长度
9. 高效 readv()
封装
int Recv(int fd, int *err);
该函数封装了对
readv
的使用逻辑,实现高效的数据读取。核心设计如下:
使用两块缓冲区:
主缓冲区(堆上):写入已有空间;
临时缓冲区(栈上):用于超出部分的数据,大小为 65535 字节;
统一系统调用
readv(fd, iov, 2)
读取数据到两块区域;避免多次
recv()
导致系统调用开销与阻塞问题;提高整体吞吐量和 CPU 利用率;
四、使用方式
在典型的 Reactor
框架中,MessageBuffer
会被作为用于管理客户端连接的对象的成员,用于管理连接读缓冲区:
void TcpConn::HandleRead() {int err = 0;// 调用 MessageBuffer::Recv() 使用 readv() 读取数据,读取数据到缓冲区中。int n = input_buffer_.Recv(fd_, &err);if (n > 0 && read_cb_) {// 数据读取成功,交给上层逻辑处理} else if (n == 0 || (n < 0 && err != EAGAIN && err != EWOULDBLOCK)) { // 连接关闭或错误则关闭连接。// 关闭连接}
}
五、完整代码
/* 用于 TCP 中收发数据的缓冲区封装,具备高效的内存管理策略,避免频繁拷贝。* +-------------------------+* | valid | free |* |<---data----->| |* ^ ^ ^* rpos_ wpos_ buffer_.size()** rpos_:已读位置(读指针)* wpos_:已写位置(写指针)* rpos_ ~ wpos_:是有效数据区* wpos_ ~ end: 是可写空闲区 ** 写入数据后 wpos_ 右移,读取数据后 rpos_ 右移,不移动内存。只有在空间不够或清理时才做 memmove*/
class MessageBuffer
{
public:// 默认构造 初始大小设为4KBMessageBuffer() : rpos_(0), wpos_(0){buffer_.resize(4096);}// 有参构造 允许手动指定初始大小explicit MessageBuffer(std::size_t size) : rpos_(0), wpos_(0){buffer_.resize(size);}// 不允许拷贝和赋值MessageBuffer(const MessageBuffer &) = delete;MessageBuffer &operator=(const MessageBuffer &) = delete;// 允许移动构造MessageBuffer(MessageBuffer &&other) noexcept: buffer_(std::move(other.buffer_)), rpos_(other.rpos_), wpos_(other.wpos_){other.rpos_ = 0;other.wpos_ = 0;}// 允许移动赋值MessageBuffer &operator=(MessageBuffer &&other) noexcept{if (this != &other){buffer_ = std::move(other.buffer_);rpos_ = other.rpos_;wpos_ = other.wpos_;other.rpos_ = 0;other.wpos_ = 0;}return *this;}/* 这三个接口用于底层 readv() / writev() 或用户逻辑操作数据。*/// 返回缓冲区起始地址uint8_t *GetBasePointer(){return buffer_.data();}// 返回当前可读数据的起始地址uint8_t *GetReadPointer(){return buffer_.data() + rpos_;}// 返回当前可写的起始地址uint8_t *GetWritePointer(){return buffer_.data() + wpos_;}/* 位置管理 */// 数据被上层读取后,更新读指针void ReadCompleted(std::size_t size){rpos_ += size;}// 数据写入后,更新写指针void WriteCompleted(std::size_t size){wpos_ += size;}/* 获取大小信息 */// 有效数据大小std::size_t GetActiveSize() const{return wpos_ - rpos_;}// 可写空间大小std::size_t GetFreeSize() const{return buffer_.size() - wpos_;}// 整个缓冲区大小std::size_t GetBufferSize() const{return buffer_.size();}/* 内存整理(腾挪数据) */void Normalize(){if (rpos_ > 0){ // 将未读数据移动到缓冲区起始,以“回收”前部空间。// 使用 std::memmove 而非 std::memcpy 是因为:// memmove 安全处理重叠区域(src 和 dest 可能重叠)// memcpy 在重叠情况下会引发未定义行为std::memmove(buffer_.data(), buffer_.data() + rpos_, GetActiveSize());wpos_ -= rpos_; // 调整写指针rpos_ = 0; // 重置读指针}}/* 空间保证机制 */void EnsureFreeSpace(std::size_t size){if (GetBufferSize() - GetActiveSize() < size){// 总容量不足,先整理再扩容Normalize();buffer_.resize(buffer_.size() + std::max(size, buffer_.size() / 2));// 指数扩容// buffer_.resize(buffer_.size() * 2 + size);}else if (GetFreeSize() < size){// 只整理腾出空间即可Normalize();}}/* 写入接口 */void Write(const uint8_t *data, std::size_t size){if (size > 0){EnsureFreeSpace(size); // 保证有空间std::memcpy(GetWritePointer(), data, size); // 写数据到缓冲区WriteCompleted(size); // 更新写指针}}/* 获取所有数据接口 */std::pair<uint8_t *, std::size_t> GetAllData(){return {GetReadPointer(), GetActiveSize()};}/* 获取第一个 \r\n 之前的数据(若未找到返回nullptr和0) */std::pair<uint8_t *, std::size_t> GetDataUntilCRLF(){uint8_t *data = GetReadPointer();std::size_t active_size = GetActiveSize();if (active_size < 2) return {nullptr, 0};for (std::size_t i = 0; i < active_size - 1; ++i){if (data[i] == '\r' && data[i + 1] == '\n'){return {data, i}; // 数据长度为i,不包含\r\n}}return {nullptr, 0}; // 未找到}/* 接收函数** 为什么用 readv 而不是多次 recv()?* 使用 recv 函数* recv(fd, main_buf, main_size);* recv(fd, tmp_buf, tmp_size);* 每次调用都要用户态 ⇄ 内核态切换;如果数据未到齐,第二次 recv() 可能会阻塞(或返回 EAGAIN);不利于 Reactor 非阻塞模型。* 使用 readv 函数* struct iovec iov[2];* readv(fd, iov, 2);* 单次系统调用,效率更高;分散读(scatter read),将数据写入多个 buffer;不需要腾挪主缓冲数据(零拷贝);一次就能完整接收数据,提高吞吐量;**/// 1. 尽可能的不腾挪数据// 2. 避免了每次都从栈上拷贝到堆上int Recv(int fd, int *err){char extra[65535]; // 临时扩展缓冲区(栈上)65535struct iovec iov[2];// 第一块:主缓冲区的剩余空间iov[0].iov_base = GetWritePointer();iov[0].iov_len = GetFreeSize();// 第二块:用于溢出的临时空间iov[1].iov_base = extra;iov[1].iov_len = sizeof(extra);// 分散读取两块缓冲区ssize_t n = readv(fd, iov, 2);if (n < 0) // 读取失败{*err = errno;return n;}else if (n == 0) // 对端关闭连接{*err = ECONNRESET;return 0;}else if (n <= GetFreeSize()) // 读到的数据 ≤ 主缓冲区剩余空间,直接读到主缓冲区中{WriteCompleted(n);return n;}else // 剩余空间不足,部分写入主缓冲,超出部分写到临时空间{// WRN: GetfreeSize() 在 WriteCompleted() 中会被更新, extra_size 需要提前计算std::size_t extra_size = n - GetFreeSize();WriteCompleted(GetFreeSize()); // 先写满主缓冲Write(reinterpret_cast<uint8_t *>(extra), extra_size); // 再把extra内容写入return n;}}private:std::vector<uint8_t> buffer_; // 实际数据缓冲区std::size_t rpos_; // 当前读取位置std::size_t wpos_; // 当前写入位置
};