当前位置: 首页 > news >正文

Muduo网络库流程分析

目录

Tcp模块的流程 

测试代码

TcpServer初始化阶段详解

 TcpServer构造函数执行流程

EventLoop创建详细步骤:

Channel与Poller的关联

设置Acceptor回调函数

启动监听

启动服务器完整流程

连接建立流程详解

新连接到达流程

连接初始化详解

连接就绪详解

数据收发流程详解

数据接收流程

 数据发送流程

关于Http模块的流程

HTTP请求的接收流程

HTTP响应的发送流程

模块初始化都干了什么事情

Poller对象的创建

EventLoop对象的创建

 Channel对象的创建

Socket对象的创建

Acceptor对象的创建

TcpServer对象的创建

​编辑LoopThreadPool对象的创建

LoopThread对象的创建

Timer_wheel对象的创建

Connection对象的创建

1. 连接触发监听套接字的可读事件

2. Poller检测到事件

3. EventLoop处理就绪事件

4. Acceptor的HandleRead方法被调用

5. TcpServer::NewConnection方法被调用

6. Connection对象创建

7. Connection::Established方法执行

8. Connection::EstablishedInLoop方法执行

9. 启动读事件监控

10. 更新Poller中的事件监控

11. 调用连接建立回调函数

12. 连接就绪,等待数据交互


Tcp模块的流程 

EventLoop模块

构造函数 

class EventLoop {private:using Functor = std::function<void()>;std::thread::id _thread_id;//线程IDint _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;Poller _poller;//进行所有描述符的事件监控std::vector<Functor> _tasks;//任务池std::mutex _mutex;//实现任务池操作的线程安全TimerWheel _timer_wheel;//定时器模块public:EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)),_timer_wheel(this) {//给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));//启动eventfd的读事件监控_event_channel->EnableRead();}
};

 Start接口

        void Start() {while(1) {//1. 事件监控, std::vector<Channel *> actives;_poller.Poll(&actives);//2. 事件处理。 for (auto &channel : actives) {channel->HandleEvent();}//3. 执行任务RunAllTask();}}

 TcpServer模块

class TcpServer {    private:uint64_t _next_id;      //这是一个自动增长的连接ID,int _port;int _timeout;           //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop;    //这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;    //这是监听套接字的管理对象LoopThreadPool _pool;   //这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;    public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//将监听套接字挂到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }//用于添加一个定时任务void RunAfter(const Functor &task, int delay) {_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start() { _pool.Create();  _baseloop.Start(); }
};

Poller模块

class Poller {private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;public:Poller() {_epfd = epoll_create(MAX_EPOLLEVENTS);if (_epfd < 0) {ERR_LOG("EPOLL CREATE FAILED!!");abort();//退出程序}}
};

 Acceptor模块

class Acceptor {private:Socket _socket;//用于创建监听套接字EventLoop *_loop; //用于对监听套接字进行事件监控Channel _channel; //用于对监听套接字进行事件管理using AcceptCallback = std::function<void(int)>;AcceptCallback _accept_callback;public:/*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*//*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) {_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }void Listen() { _channel.EnableRead(); }void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return ;}if (_accept_callback) _accept_callback(newfd);}
};

 Channel模块

构造函数 

class Channel {private:int _fd;EventLoop *_loop;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件using EventCallback = std::function<void()>;EventCallback _read_callback;   //可读事件被触发的回调函数EventCallback _write_callback;  //可写事件被触发的回调函数EventCallback _error_callback;  //错误事件被触发的回调函数EventCallback _close_callback;  //连接断开事件被触发的回调函数EventCallback _event_callback;  //任意事件被触发的回调函数public:Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}
};

LoopThreadPool模块 

构造函数 

class LoopThreadPool {private:int _thread_count;int _next_idx;EventLoop *_baseloop;std::vector<LoopThread*> _threads;std::vector<EventLoop *> _loops;public:LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}
};

Create接口 

        void Create() {if (_thread_count > 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++) {_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return ;}

Connection模块

class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection> {private:uint64_t _conn_id;  // 连接的唯一ID,便于连接的管理和查找//uint64_t _timer_id;   //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器IDint _sockfd;        // 连接关联的文件描述符bool _enable_inactive_release;  // 连接是否启动非活跃销毁的判断标志,默认为falseEventLoop *_loop;   // 连接所关联的一个EventLoopConnStatu _statu;   // 连接状态Socket _socket;     // 套接字操作管理Channel _channel;   // 连接的事件管理Buffer _in_buffer;  // 输入缓冲区---存放从socket中读取到的数据Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据Any _context;       // 请求的接收处理上下文/*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*//*换句话说,这几个回调都是组件使用者使用的*/using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*//*就应该从管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}
};

一个Muduo网络库的流程:

在main线程中首先创建一个TcpServer对象_server,

这个_server在构造的时候创建main_loop,从属线程池_pool,_acceptor,设置非活跃连接时间,非活跃连接关闭为false,并且把从属线程池也注册到main_loop上,把acceptor注册到main_loop上,然后给_acceptor绑定新连接到来的回调函数(连接ID递增,增加连接计数器,确保每个连接有唯一ID,创建连接对象:PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)),创建新的Connection智能指针对象,通过线程池获取下一个可用的EventLoop (_pool.NextLoop()),传入新的连接ID和套接字文件描述符,设置回调函数,启用非活跃连接超时检测,初始化连接:conn->Established() 将连接标记为已建立,可能包括:更改连接状态为CONNECTED,启用读事件监听,调用连接建立回调函数,保存连接:将新连接添加到连接管理容器中,使用连接ID作为键,这样服务器可以通过ID快速查找和管理连接),然后调用Listen接口用来监听新连接的到来

这个main_loop在构造的时候创建了_event_channel(使用智能指针管理),_poller,_event_fd,_thread_id(初始化为当前线程的ID,),_task,_mutex,_timer_wheel,然后给_event_channel绑定可读事件回调函数,并且启动eventfd的读事件监控

这个_poller在构造的时候创建了_epfd存储由epoll_create()返回的epoll文件描述符的整数,_evs是一个epoll_event结构体数组,用于存储事件,_channels是一个哈希表,将文件描述符(int)与Channel指针关联起来,使用epoll_create()创建一个epoll实例

这个_pool在构造的时候创建了 _thread_count;线程池中IO线程的数量, _next_idx用于轮询分配线程的下标(实现负载均衡),EventLoop *_baseloop指向主线程的EventLoop,std::vector<LoopThread*> _threads保存所有IO线程对象的指针,std::vector<EventLoop *> _loops保存所有IO线程对应的EventLoop指针

这个_acceptor在构造的时候创建了_socket用于创建监听套接字,EventLoop *_loop用于对监听套接字进行事件监控,Channel _channel用于对监听套接字进行事件管理 _socket(CreateServer(port))创建监听socket并绑定端口, _loop(loop)保存主事件循环指针,_channel(loop, _socket.Fd())创建Channel,管理监听socket的事件,并且 _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this))它使用 std::bind 将 Acceptor 类的 HandleRead 成员函数绑定为当通道上有数据可读时将被调用的回调函数。

这个socket在构造的时候会创一个文件描述符_sockfd,当调用CreateServer会创建套接字,绑定地址,开始监听,设置非阻塞,启动地址重用

这个_channel在构造的时候创建了个_fd,*_loop指针,需要监控的事件类型_events,已经触发的事件类型_revents,各种事件回调函数的设置,并且初始化_loop,将Channel与特定的事件循环(EventLoop)关联起来。以便这个loop能管理和调度该Channel上的事件。

这个conn在构造的时候,会把该conn注册到传入的loop中,同时也会创建一个channel,把这个channel注册到这个loop中,同时也需要把sockfd注册到channel上,因为Channel知道它要监听哪个文件描述符(sockfd),Channel知道它的事件应该由哪个事件循环(loop)处理,并且给这个channel绑定上读/写/错误/关闭的事件回调函数,然后会创建 _in_buffer用户态输入缓冲区---存放从socket中读取到的数据。_out_buffer用户态输出缓冲区---存放要发送给对端的数据。当客户端发送数据时,SubLoop检测到读事件,调用lcpconnection::handleRead进行数据的读取,操作系统内核接收数据,暂存在socket的接收缓冲区,然后再把数据被读入InputBuffer,调用用户注册的messageCallback,将数据传递给上层应用处理,上层应用根据自己的业务逻辑处理数据。然后服务器构建好了要把数据响应给上层应用调用TcpConnection::send()发送数据,数据被写入OutputBuffer,然后把OutputBuffer的数据尝试发送到内核缓冲区,如果不能完全发送,则:将剩余数据留在OutputBuffer中,向Poller注册该连接socket的写事件,当socket可写时,SubLoop检测到写事件,继续发送OutputBuffer中的数据,数据发送完毕后,取消写事件关注

测试代码

#include "m_server.h"// 简单的回显服务器测试
class EchoServer {
private:TcpServer _server;public:EchoServer(int port) : _server(port) {// 设置服务器回调函数_server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));_server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));// 启用非活跃连接超时释放(60秒)_server.EnableInactiveRelease(60);// 设置工作线程数量_server.SetThreadCount(4);}// 连接建立回调void OnConnected(const PtrConnection& conn) {INF_LOG("新连接建立: %d", conn->Id());// 发送欢迎消息std::string welcome = "欢迎连接到回显服务器!\n";conn->Send(welcome.c_str(), welcome.size());}// 消息处理回调void OnMessage(const PtrConnection& conn, Buffer* buf) {// 读取所有可读数据std::string msg = buf->ReadAsStringAndPop(buf->ReadAbleSize());INF_LOG("收到消息[%d]: %s", conn->Id(), msg.c_str());// 回显消息std::string echo_prefix = "回显: ";std::string response = echo_prefix + msg;conn->Send(response.c_str(), response.size());}// 连接关闭回调void OnClosed(const PtrConnection& conn) {INF_LOG("连接关闭: %d", conn->Id());}// 启动服务器void Start() {INF_LOG("回显服务器启动在端口: %d", 8080);_server.Start();}
};int main(int argc, char* argv[]) {int port = 8080;if (argc > 1) {port = atoi(argv[1]);}INF_LOG("启动回显服务器,端口: %d", port);// 创建并启动服务器EchoServer server(port);server.Start();return 0;
}

TcpServer初始化阶段详解

 TcpServer构造函数执行流程

private:uint64_t _next_id;      //这是一个自动增长的连接ID,int _port;int _timeout;           //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop;    //这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;    //这是监听套接字的管理对象LoopThreadPool _pool;   //这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;
TcpServer(int port):_port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//将监听套接字挂到baseloop上
}

详细步骤:

成员初始化:

  • _port: 存储服务器监听端口
  • _next_id: 连接ID计数器初始化为0
  • _enable_inactive_release: 非活跃连接释放标志设为false

当TcpServer构造函数中初始化_baseloop成员变量时,会创建EventLoop对象:

EventLoop::EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)),_poller(),  // 创建Poller对象_timer_wheel(this) {// 设置eventfd的读事件回调_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();
}

EventLoop创建详细步骤:

初始化线程ID: 

   _thread_id = std::this_thread::get_id()
  • 记录创建EventLoop的线程ID,用于后续线程安全检查 

创建eventfd:

   _event_fd = CreateEventFd()static int CreateEventFd() {int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0) {ERR_LOG("CREATE EVENTFD FAILED!!");abort();}return efd;}
  • 创建一个eventfd用于线程间通信,设置为非阻塞模式

创建event_channel:

   _event_channel = new Channel(this, _event_fd)
  • 创建一个Channel对象管理eventfd的事件

创建Poller对象:

   _poller()  // 默认构造函数
  • 创建Poller对象用于事件监控

创建TimerWheel对象

   _timer_wheel(this)
  •  创建定时器管理对象

设置eventfd回调:

   _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
  •  设置eventfd的读事件回调函数

启用eventfd读事件监控:

   _event_channel->EnableRead();
  • 启用eventfd的读事件监控,这会将eventfd添加到Poller中 

 Poller创建过程

当EventLoop构造函数中初始化_poller成员变量时,会创建Poller对象:

Poller::Poller() {_epfd = epoll_create(MAX_EPOLLEVENTS);if (_epfd < 0) {ERR_LOG("EPOLL CREATE FAILED!!");abort();}
}

Channel与Poller的关联

当调用Channel::EnableRead()等方法时,会将Channel添加到Poller中:

      Acceptor创建过程:

      • 传入&_baseloop和port构造Acceptor
      • Acceptor构造函数内部用CreateServer(port)创建监听套接字:
      Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) {_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
      }
      • CreateServer 内部调用:
      int CreateServer(int port) {bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();
      }
      • Socket::CreateServer 执行五个关键操作:
      bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {//1. 创建套接字if (Create() == false) return false;//2. 设置非阻塞(如果需要)if (block_flag) NonBlock();//3. 绑定地址if (Bind(ip, port) == false) return false;//4. 开始监听(默认最大连接队列1024)if (Listen() == false) return false;//5. 启动地址端口重用ReuseAddress();return true;
      }
      • 创建Channel对象管理监听套接字,设置读事件回调函数为 Acceptor::HandleRead

      Channel构造:

         Channel(EventLoop *loop, int fd): _fd(fd), _events(0), _revents(0), _loop(loop) {}
        • 将监听套接字的文件描述符与Channel关联
        • 初始化事件标志和关联的EventLoop
        • 此时还未设置任何监控事件(events = 0)

        设置回调函数

           _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
        • 将Acceptor::HandleRead方法绑定为Channel的读事件回调
        • 当监听套接字可读时(有新连接到达),会触发此回调

         HandleRead实现

           void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd);}
        • 接受新连接,获取新连接的文件描述符
        • 调用预设的_accept_callback处理新连接(即TcpServer::NewConnection)

        线程池创建:

        • 构造 pool(&_baseloop),传入主事件循环指针
        • 此时线程池只是初始化,没有创建工作线程,等待后续 SetThreadCount 和 Create 调用

        设置Acceptor回调函数

        _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));

        详细解析:

        回调绑定机制:

        • 使用 std::bind 创建函数对象,绑定 TcpServer::NewConnection 成员函数
        • this 指针作为第一个参数,确保回调能访问到TcpServer对象
        • std::placeholders::1 占位符表示将来自Acceptor的新连接fd传给NewConnection

        NewConnection函数职责:

        void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);conn->Established();_conns.insert(std::make_pair(_next_id, conn));
        }

        启动监听

        _acceptor.Listen();

        内部实现详解:

        Acceptor::Listen内部:

           void Listen() { _channel.EnableRead(); }
        • 调用Channel的EnableRead方法启动监听套接字的读事件监控

        Channel::EnableRead内部:

             void EnableRead() { _events |= EPOLLIN; Update(); }
        • 将EPOLLIN标志添加到_events中
        • 调用Update更新事件监控

        Channel::Update内部: 

             void Update() { return _loop->UpdateEvent(this); }
        • 调用EventLoop的UpdateEvent方法

        EventLoop::UpdateEvent内部:

        void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
        • 调用Poller的UpdateEvent方法

        Poller::UpdateEvent内部:

        void UpdateEvent(Channel *channel) {bool ret = HasChannel(channel);if (ret == false) {//不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);
        }

         

        • 检查Channel是否已在监控集合中
        • 如果不存在, 添加到_channels映射并调用epoll_ctl添加监控
        • 如果存在, 调用epoll_ctl修改监控事件

        Poller::Update内部:

        void Update(Channel *channel, int op) {int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0) {ERR_LOG("EPOLLCTL FAILED!");}return;
        }
        • 准备epoll_event结构
        • 调用epoll_ctl将监听套接字添加到epoll实例
        • 此时监听套接字的可读事件(有新连接)开始被监控

        启动服务器完整流程

        void Start() {_pool.Create();  // 创建线程池中的工作线程_baseloop.Start(); // 启动主事件循环
        }

         线程池创建:

        void Create() {if (_thread_count > 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++) {_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return ;
        }
        • 线程池创建指定数量的LoopThread
        • 调用GetLoop获取并保存每个线程的EventLoop指针
        • GetLoop会等待线程创建并初始化EventLoop
        class LoopThread {
        private:std::mutex _mutex;          // 互斥锁std::condition_variable _cond;   // 条件变量EventLoop *_loop;       // EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread;    // EventLoop对应的线程private:void ThreadEntry() {EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);//加锁_loop = &loop;_cond.notify_all();}loop.Start();}public:LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}EventLoop *GetLoop() {EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex);//加锁_cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞loop = _loop;}return loop;}
        };

         线程创建:

           LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
        • 构造函数初始化_loop为NULL
        • 创建线程,给这个线程绑定入口函数为ThreadEntry
        • 线程立即开始执行
           void ThreadEntry() {EventLoop loop;  // 在线程栈上创建EventLoop对象{std::unique_lock<std::mutex> lock(_mutex);  // 加锁_loop = &loop;  // 将指针指向线程栈上的EventLoop_cond.notify_all();  // 通知等待的线程}loop.Start();  // 开始事件循环}
        • 在线程内部创建EventLoop对象
        • 通过互斥锁保护设置_loop指针
        • 通知可能等待的GetLoop()调用
        • 启动事件循环

         获取EventLoop指针

           EventLoop *GetLoop() {EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex);  // 加锁_cond.wait(lock, [&](){ return _loop != NULL; });  // loop为NULL就一直阻塞loop = _loop;}return loop;}
        • 主线程调用此函数获取工作线程的EventLoop
        • 使用条件变量等待EventLoop创建完成
        • 返回EventLoop指针

        主事件循环启动:

        void Start() {while(1) {//1. 事件监控std::vector<Channel *> actives;_poller.Poll(&actives);//2. 事件处理for (auto &channel : actives) {channel->HandleEvent();}
        • 进入无限循环,监控事件、处理就绪事件、执行任务队列
        • 此时服务器完全启动,等待客户端连接 

        连接建立流程详解

        新连接到达流程

        新连接到达,监听套接字可读事件触发,

        Acceptor::HandleRead被调用

        void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd);
        }
        • Poller检测到监听套接字可读(有新连接请求)
        • EventLoop的事件循环调用Channel::HandleEvent
        • Channel根据事件类型调用Acceptor::HandleRead
        • 调用Socket::Accept获取新连接的文件描述符
        • 触发_accept_callback即TcpServer::NewConnection

        Socket::Accept实现:

        int Accept() {// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);int newfd = accept(_sockfd, NULL, NULL);if (newfd < 0) {ERR_LOG("SOCKET ACCEPT FAILED!");return -1;}return newfd;
        }
        • 调用系统调用accept接受新连接
        • 返回新连接的文件描述符

        连接初始化详解

        1. TcpServer::NewConnection实现:
        void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);conn->Established();_conns.insert(std::make_pair(_next_id, conn));
        }
        1. 工作线程选择机制:
        EventLoop* NextLoop() {if (_thread_count == 0) {return _baseloop;}_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];
        }
        • 如果没有工作线程,使用主线程的EventLoop
        • 否则采用简单的轮询算法选择一个工作线程
        • 确保连接均匀分布在各个线程中
        1. Connection构造过程:
        Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
        }
        • 初始化连接ID和状态
        • 创建Socket和Channel对象管理连接
        • 设置Channel的各种事件回调
        1. 非活跃连接释放机制:
        void EnableInactiveRelease(int sec) {_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
        }void EnableInactiveReleaseInLoop(int sec) {_enable_inactive_release = true;if (_loop->HasTimer(_conn_id)) {return _loop->TimerRefresh(_conn_id);}_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
        }
        • 将操作封装为任务,确保在正确的线程中执行
        • 添加定时器任务,超时后自动释放连接
        • 使用连接ID作为定时器ID

        连接就绪详解

        1. Connection::Established实现:
        void Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
        }
        • 使用RunInLoop确保在连接所属的EventLoop线程中执行
        1. Connection::EstablishedInLoop实现:
        void EstablishedInLoop() {assert(_statu == CONNECTING);_statu = CONNECTED;_channel.EnableRead();if (_connected_callback) _connected_callback(shared_from_this());
        }
        • 断言确保当前状态为CONNECTING
        • 将状态更新为CONNECTED
        • 调用Channel::EnableRead启动读事件监控:
          void EnableRead() { _events |= EPOLLIN; Update(); }
        • 最后调用用户设置的连接建立回调
        1. shared_from_this机制:
        _connected_callback(shared_from_this())
        • Connection继承自std::enable_shared_from_this
        • shared_from_this()返回管理当前对象的shared_ptr
        • 确保回调中使用的Connection对象生命周期安全
        1. 回调函数调用:
        • 回调函数是在连接分配到的工作线程中执行的
        • 这确保了每个连接的所有操作都在同一个线程中进行
        • 避免了多线程并发访问导致的竞态条件
        1. 连接保存到管理表:
        _conns.insert(std::make_pair(_next_id, conn));
        • 将连接保存到TcpServer的_conns哈希表中
        • 使用连接ID作为键,便于后续查找和管理

        数据收发流程详解

        数据接收流程

        1. 可读事件触发,Connection::HandleRead被调用

        当客户端发送数据到服务器时,连接对应的socket变为可读状态,触发事件处理:

        // Channel::HandleEvent内部逻辑(事件分发)
        void HandleEvent() {if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {if (_read_callback) _read_callback();}// ...其他事件处理
        }
        • Poller检测到连接socket可读
        • EventLoop调用对应Channel的HandleEvent
        • Channel根据事件类型调用Connection::HandleRead

        Connection::HandleRead实现详解

        void HandleRead() {// 1. 接收socket的数据,放到缓冲区char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if (ret < 0) {// 出错了,不能直接关闭连接return ShutdownInLoop();}// 将数据放入输入缓冲区,写入之后顺便将写偏移向后移动_in_buffer.WriteAndPush(buf, ret);// 2. 调用message_callback进行业务处理if (_in_buffer.ReadAbleSize() > 0) {// shared_from_this--从当前对象自身获取自身的shared_ptr管理对象return _message_callback(shared_from_this(), &_in_buffer);}
        }

         Socket::NonBlockRecv实现

        ssize_t NonBlockRecv(void *buf, size_t len) {return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
        }ssize_t Recv(void *buf, size_t len, int flag = 0) {// ssize_t recv(int sockfd, void *buf, size_t len, int flag);ssize_t ret = recv(_sockfd, buf, len, flag);if (ret <= 0) {// EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误// EINTR  表示当前socket的阻塞等待,被信号打断了if (errno == EAGAIN || errno == EINTR) {return 0; // 表示这次接收没有接收到数据}ERR_LOG("SOCKET RECV FAILED!!");return -1;}return ret; // 实际接收的数据长度
        }

         Buffer::WriteAndPush实现

        void WriteAndPush(const void *data, uint64_t len) {Write(data, len);MoveWriteOffset(len);
        }void Write(const void *data, uint64_t len) {// 1. 保证有足够空间,2. 拷贝数据进去if (len == 0) return;EnsureWriteSpace(len);const char *d = (const char *)data;std::copy(d, d + len, WritePosition());
        }

         调用用户消息回调

        _message_callback(shared_from_this(), &_in_buffer);
        • 传递连接管理对象和输入缓冲区
        • 在回显服务器示例中,对应EchoServer::OnMessage
        • 用户在回调中可以读取缓冲区数据并进行处理

         数据发送流程

        用户调用Connection::Send发送数据

        void Send(const char *data, size_t len) {Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
        }
        • 创建临时Buffer对象存储数据
        • 使用std::move优化性能,避免拷贝
        • 通过RunInLoop确保在正确的线程中执行SendInLoop

        Connection::SendInLoop实现

        void SendInLoop(Buffer &buf) {if (_statu == DISCONNECTED) return;_out_buffer.WriteBufferAndPush(buf);if (_channel.WriteAble() == false) {_channel.EnableWrite();}
        }
        • 检查连接状态,断开则不发送
        • 将数据写入输出缓冲区
        • 如果未启动写事件监控,则启动

        Channel::EnableWrite实现

        void EnableWrite() { _events |= EPOLLOUT; Update(); }
        • 添加EPOLLOUT事件到监控
        • 通过Update更新事件监控

        连接可写事件触发,Connection::HandleWrite被调用

        void HandleWrite() {// _out_buffer中保存的数据就是要发送的数据ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret < 0) {// 发送错误就该关闭连接了if (_in_buffer.ReadAbleSize() > 0) {_message_callback(shared_from_this(), &_in_buffer);}return Release(); // 这时候就是实际的关闭释放操作了}_out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动if (_out_buffer.ReadAbleSize() == 0) {_channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控// 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放if (_statu == DISCONNECTING) {return Release();}}return;
        }

         Socket::NonBlockSend实现

        ssize_t NonBlockSend(void *buf, size_t len) {if (len == 0) return 0;return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞
        }ssize_t Send(const void *buf, size_t len, int flag = 0) {// ssize_t send(int sockfd, void *data, size_t len, int flag);ssize_t ret = send(_sockfd, buf, len, flag);if (ret < 0) {if (errno == EAGAIN || errno == EINTR) {return 0;}ERR_LOG("SOCKET SEND FAILED!!");return -1;}return ret; // 实际发送的数据长度
        }

        处理发送结果

        • 如果发送出错(ret < 0):
        • 处理剩余输入数据
        • 调用Release关闭连接
        • 如果发送成功:
        • 更新缓冲区读偏移(MoveReadOffset(ret))
        • 如果缓冲区为空,关闭写事件监控(DisableWrite())
        • 如果连接状态为DISCONNECTING且所有数据已发送,释放连接

         Channel::DisableWrite实现

        void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
        • 从事件标志中移除EPOLLOUT
        • 通过Update更新事件监控

        关于Http模块的流程

        HTTP请求的接收流程

        客户端发送HTTP请求

        • 数据通过TCP连接到达服务器。

        数据进入内核接收缓冲区

        • 操作系统内核接收数据,暂存在socket的接收缓冲区。

        muduo通过read()读取数据到in_buffer

        • muduo检测到socket可读事件,调用read()把数据读到in_buffer。

        HTTP解析器处理in_buffer

        • muduo的HTTP模块会不断检查in_buffer,尝试解析出完整的HTTP请求(包括请求行,请求头,请求体)。
        • 如果数据不完整(比如POST体还没收全),就继续等待数据到来。

        解析出完整请求后,调用用户回调

        • 一旦解析出完整的HTTP请求,muduo会调用你注册的HTTP请求处理回调(如onRequest),把HttpRequest对象传给你的业务代码。

        HTTP响应的发送流程

        你的业务代码生成HttpResponse

        • 你在回调里构造HttpResponse对象,设置响应内容、状态码、头部等。

        muduo把HttpResponse序列化到out_buffer

        • muduo会把HttpResponse对象序列化成HTTP协议格式的字符串,写入out_buffer。

        尝试write()到socket

        • muduo尝试把out_buffer的数据写入socket(内核发送缓冲区)。
        • 如果一次写不完,剩余数据继续留在out_buffer,等待下次socket可写时再发。

        客户端收到HTTP响应

        • 数据通过网络传输到客户端,客户端解析HTTP响应。

        首先是会创建个TCPserver对象_server,然后就是然后这个_server成员列表会创建个_baseloop对象,_acceptor对象,_pool对象,会先执行它们的默认构造函数,base_loop对象的成员列表中会创建poller对象,channel对象,timer_wheel对象

        模块初始化都干了什么事情

        Poller对象的创建

        EventLoop对象的创建

         Channel对象的创建

        Socket对象的创建

        Acceptor对象的创建

        TcpServer对象的创建

        LoopThreadPool对象的创建

        LoopThread对象的创建

        Timer_wheel对象的创建

        Connection对象的创建

        1. 连接触发监听套接字的可读事件

        客户端发起连接请求(connect),操作系统内核接收到SYN包,完成三次握手后,监听套接字变为可读状态(有新连接待接受)。

        2. Poller检测到事件

        在主事件循环(baseloop)中:

        - _poller.Poll(&actives)检测到监听套接字的可读事件

        • 将监听套接字对应的Channel添加到活跃Channel列表中
        3. EventLoop处理就绪事件

        在主事件循环的事件处理阶段:

        for (auto &channel : actives) {channel->HandleEvent();
        }
        • 调用监听套接字Channel的HandleEvent方法
        • Channel根据就绪事件类型调用相应的回调函数
        4. Acceptor的HandleRead方法被调用

        监听套接字的可读事件触发了Channel的读回调,即Acceptor::HandleRead:

        void Acceptor::HandleRead() {int newfd = _socket.Accept();  // 接受新连接if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd);  // 调用接受连接回调
        }
        5. TcpServer::NewConnection方法被调用

        Acceptor的接受连接回调是TcpServer::NewConnection:

        void TcpServer::NewConnection(int fd) {_next_id++;  // 生成新的连接ID// 选择一个EventLoop来处理这个连接(负载均衡)EventLoop* loop = _pool.NextLoop();// 创建Connection对象管理这个连接PtrConnection conn(new Connection(loop, _next_id, fd));// 设置Connection的各种回调函数conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));// 如果启用了非活跃连接超时释放,设置超时时间if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);// 完成连接初始化conn->Established();// 将连接添加到管理表_conns.insert(std::make_pair(_next_id, conn));
        }
        6. Connection对象创建

        当执行new Connection(loop, _next_id, fd)时:

        • 初始化成员变量:连接ID、文件描述符、EventLoop指针等
        • 创建Socket对象包装文件描述符
        • 创建Channel对象关联到EventLoop和连接套接字
        • 设置Channel的各种事件回调函数
        • 连接状态设置为CONNECTING
        7. Connection::Established方法执行
        void Connection::Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
        }
        • 将EstablishedInLoop任务放入连接所属的EventLoop的任务队列
        • 这确保了连接的初始化在正确的线程中执行
        8. Connection::EstablishedInLoop方法执行

        在连接所属的EventLoop线程中:

        void Connection::EstablishedInLoop() {assert(_statu == CONNECTING);  // 确保当前状态是CONNECTING_statu = CONNECTED;  // 修改连接状态为CONNECTED// 启动读事件监控_channel.EnableRead();// 调用连接建立回调函数if (_connected_callback) _connected_callback(shared_from_this());
        }
        9. 启动读事件监控

        当执行_channel.EnableRead()时:

        void Channel::EnableRead() { _events |= EPOLLIN; Update(); 
        }void Channel::Update() {// 通过EventLoop更新事件监控_loop->UpdateEvent(this);
        }
        10. 更新Poller中的事件监控

        EventLoop::UpdateEvent调用Poller::UpdateEvent:

        void Poller::UpdateEvent(Channel *channel) {bool ret = HasChannel(channel);if (ret == false) {// 不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);
        }void Update(Channel *channel, int op) {int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0) {ERR_LOG("EPOLLCTL FAILED!!");}return;
        }
        • 将连接套接字添加到epoll的监控列表中
        • 设置监控的事件类型(EPOLLIN表示可读事件)
        11. 调用连接建立回调函数

        执行用户设置的连接建立回调函数:

        if (_connected_callback) _connected_callback(shared_from_this());
        • 将当前Connection的shared_ptr传递给回调函数
        • 用户可以在回调函数中处理连接建立事件
        12. 连接就绪,等待数据交互

        至此,新连接的处理流程完成:

        • 连接已被接受并分配了唯一ID
        • 连接被分配给了一个EventLoop进行管理
        • 连接的读事件监控已启动
        • 连接状态已设置为CONNECTED
        • 连接已添加到TcpServer的连接管理表中
        • 用户的连接建立回调函数已被调用

        连接现在处于就绪状态,可以进行数据收发。当有数据到达时,会触发连接套接字的可读事件,从而调用Connection::HandleRead方法处理数据。

        http://www.xdnf.cn/news/641917.html

        相关文章:

      1. quill 富文本多张图片排序
      2. SRS流媒体服务器之RTC播放环境搭建
      3. 揭开C语言指针的神秘面纱:地址、变量与“指向”的力量
      4. systemverilog的单精度浮点和双精度浮点
      5. AI测试怎么做投入产出比分析以及人员分配?
      6. YOLOV8涨点技巧之DSS模块(一种轻量化火灾检测模型)
      7. Unity引擎源码-物理系统详解-其三
      8. C++23 std::out_ptr 和 std::inout_ptr:提升 C 互操作性
      9. 锁与死锁的诊断:如何通过 SHOW ENGINE INNODB STATUS 解锁瓶颈
      10. 加密货币投资亏损后,能否以“欺诈”或“不当销售”索赔?
      11. 如何在 Windows 11 上安装 Ubuntu 20.04 WSL2
      12. 《红警2000》游戏信息
      13. YOLOv8源码修改(5)- YOLO知识蒸馏(下)设置蒸馏超参数:以yolov8-pose为例
      14. Karakeep | 支持Docker/NAS 私有化部署!稍后阅读工具告别云端依赖,让知识收藏更有序
      15. 机器学习---特征降维
      16. C++指针与引用:const修饰的奥秘
      17. 视频剪辑SDK定制开发技术方案与报价书优雅草卓伊凡
      18. pinia状态管理使用
      19. 星际旅行家(广度优先搜索+邻接表)
      20. 直流电机 pwm 调速
      21. 第五十一节:增强现实基础-单应性矩阵计算
      22. MySQL#Select语句执行过程
      23. LLMs之Qwen:《Qwen3 Technical Report》翻译与解读
      24. 2025年5月系分论文题(回忆版)
      25. C# 怎么做chat柱状图能实现不同的颜色,还带游标
      26. 篇章二 基础——包装类
      27. ADS学习笔记(二) 交流小信号仿真
      28. Windows逆向工程提升之x86结构化异常SEH处理机制
      29. Java 可扩展状态系统设计:备忘录模式的工程化实践与架构演进
      30. TCP建立连接为什么不是两次握手,而是三次,为什么不能在第二次握手时就建立连接?