Muduo网络库流程分析
目录
Tcp模块的流程
测试代码
TcpServer初始化阶段详解
TcpServer构造函数执行流程
EventLoop创建详细步骤:
Channel与Poller的关联
设置Acceptor回调函数
启动监听
启动服务器完整流程
连接建立流程详解
新连接到达流程
连接初始化详解
连接就绪详解
数据收发流程详解
数据接收流程
数据发送流程
关于Http模块的流程
HTTP请求的接收流程
HTTP响应的发送流程
模块初始化都干了什么事情
Poller对象的创建
EventLoop对象的创建
Channel对象的创建
Socket对象的创建
Acceptor对象的创建
TcpServer对象的创建
编辑LoopThreadPool对象的创建
LoopThread对象的创建
Timer_wheel对象的创建
Connection对象的创建
1. 连接触发监听套接字的可读事件
2. Poller检测到事件
3. EventLoop处理就绪事件
4. Acceptor的HandleRead方法被调用
5. TcpServer::NewConnection方法被调用
6. Connection对象创建
7. Connection::Established方法执行
8. Connection::EstablishedInLoop方法执行
9. 启动读事件监控
10. 更新Poller中的事件监控
11. 调用连接建立回调函数
12. 连接就绪,等待数据交互
Tcp模块的流程
EventLoop模块
构造函数
class EventLoop {private:using Functor = std::function<void()>;std::thread::id _thread_id;//线程IDint _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;Poller _poller;//进行所有描述符的事件监控std::vector<Functor> _tasks;//任务池std::mutex _mutex;//实现任务池操作的线程安全TimerWheel _timer_wheel;//定时器模块public:EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)),_timer_wheel(this) {//给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));//启动eventfd的读事件监控_event_channel->EnableRead();}
};
Start接口
void Start() {while(1) {//1. 事件监控, std::vector<Channel *> actives;_poller.Poll(&actives);//2. 事件处理。 for (auto &channel : actives) {channel->HandleEvent();}//3. 执行任务RunAllTask();}}
TcpServer模块
class TcpServer { private:uint64_t _next_id; //这是一个自动增长的连接ID,int _port;int _timeout; //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop; //这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor; //这是监听套接字的管理对象LoopThreadPool _pool; //这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback; public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//将监听套接字挂到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }//用于添加一个定时任务void RunAfter(const Functor &task, int delay) {_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start() { _pool.Create(); _baseloop.Start(); }
};
Poller模块
class Poller {private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;public:Poller() {_epfd = epoll_create(MAX_EPOLLEVENTS);if (_epfd < 0) {ERR_LOG("EPOLL CREATE FAILED!!");abort();//退出程序}}
};
Acceptor模块
class Acceptor {private:Socket _socket;//用于创建监听套接字EventLoop *_loop; //用于对监听套接字进行事件监控Channel _channel; //用于对监听套接字进行事件管理using AcceptCallback = std::function<void(int)>;AcceptCallback _accept_callback;public:/*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*//*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) {_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }void Listen() { _channel.EnableRead(); }void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return ;}if (_accept_callback) _accept_callback(newfd);}
};
Channel模块
构造函数
class Channel {private:int _fd;EventLoop *_loop;uint32_t _events; // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件using EventCallback = std::function<void()>;EventCallback _read_callback; //可读事件被触发的回调函数EventCallback _write_callback; //可写事件被触发的回调函数EventCallback _error_callback; //错误事件被触发的回调函数EventCallback _close_callback; //连接断开事件被触发的回调函数EventCallback _event_callback; //任意事件被触发的回调函数public:Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}
};
LoopThreadPool模块
构造函数
class LoopThreadPool {private:int _thread_count;int _next_idx;EventLoop *_baseloop;std::vector<LoopThread*> _threads;std::vector<EventLoop *> _loops;public:LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}
};
Create接口
void Create() {if (_thread_count > 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++) {_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return ;}
Connection模块
class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection> {private:uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找//uint64_t _timer_id; //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器IDint _sockfd; // 连接关联的文件描述符bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为falseEventLoop *_loop; // 连接所关联的一个EventLoopConnStatu _statu; // 连接状态Socket _socket; // 套接字操作管理Channel _channel; // 连接的事件管理Buffer _in_buffer; // 输入缓冲区---存放从socket中读取到的数据Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据Any _context; // 请求的接收处理上下文/*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*//*换句话说,这几个回调都是组件使用者使用的*/using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*//*就应该从管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}
};
一个Muduo网络库的流程:
在main线程中首先创建一个TcpServer对象_server,
这个_server在构造的时候创建main_loop,从属线程池_pool,_acceptor,设置非活跃连接时间,非活跃连接关闭为false,并且把从属线程池也注册到main_loop上,把acceptor注册到main_loop上,然后给_acceptor绑定新连接到来的回调函数(连接ID递增,增加连接计数器,确保每个连接有唯一ID,创建连接对象:PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)),创建新的Connection智能指针对象,通过线程池获取下一个可用的EventLoop (_pool.NextLoop()),传入新的连接ID和套接字文件描述符,设置回调函数,启用非活跃连接超时检测,初始化连接:conn->Established() 将连接标记为已建立,可能包括:更改连接状态为CONNECTED,启用读事件监听,调用连接建立回调函数,保存连接:将新连接添加到连接管理容器中,使用连接ID作为键,这样服务器可以通过ID快速查找和管理连接),然后调用Listen接口用来监听新连接的到来
这个main_loop在构造的时候创建了_event_channel(使用智能指针管理),_poller,_event_fd,_thread_id(初始化为当前线程的ID,),_task,_mutex,_timer_wheel,然后给_event_channel绑定可读事件回调函数,并且启动eventfd的读事件监控
这个_poller在构造的时候创建了_epfd存储由epoll_create()返回的epoll文件描述符的整数,_evs是一个epoll_event结构体数组,用于存储事件,_channels是一个哈希表,将文件描述符(int)与Channel指针关联起来,使用epoll_create()创建一个epoll实例
这个_pool在构造的时候创建了 _thread_count;线程池中IO线程的数量, _next_idx用于轮询分配线程的下标(实现负载均衡),EventLoop *_baseloop指向主线程的EventLoop,std::vector<LoopThread*> _threads保存所有IO线程对象的指针,std::vector<EventLoop *> _loops保存所有IO线程对应的EventLoop指针
这个_acceptor在构造的时候创建了_socket用于创建监听套接字,EventLoop *_loop用于对监听套接字进行事件监控,Channel _channel用于对监听套接字进行事件管理 _socket(CreateServer(port))创建监听socket并绑定端口, _loop(loop)保存主事件循环指针,_channel(loop, _socket.Fd())创建Channel,管理监听socket的事件,并且 _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this))它使用 std::bind 将 Acceptor 类的 HandleRead 成员函数绑定为当通道上有数据可读时将被调用的回调函数。
这个socket在构造的时候会创一个文件描述符_sockfd,当调用CreateServer会创建套接字,绑定地址,开始监听,设置非阻塞,启动地址重用
这个_channel在构造的时候创建了个_fd,*_loop指针,需要监控的事件类型_events,已经触发的事件类型_revents,各种事件回调函数的设置,并且初始化_loop,将Channel与特定的事件循环(EventLoop)关联起来。以便这个loop能管理和调度该Channel上的事件。
这个conn在构造的时候,会把该conn注册到传入的loop中,同时也会创建一个channel,把这个channel注册到这个loop中,同时也需要把sockfd注册到channel上,因为Channel知道它要监听哪个文件描述符(sockfd),Channel知道它的事件应该由哪个事件循环(loop)处理,并且给这个channel绑定上读/写/错误/关闭的事件回调函数,然后会创建 _in_buffer用户态输入缓冲区---存放从socket中读取到的数据。_out_buffer用户态输出缓冲区---存放要发送给对端的数据。当客户端发送数据时,SubLoop检测到读事件,调用lcpconnection::handleRead进行数据的读取,操作系统内核接收数据,暂存在socket的接收缓冲区,然后再把数据被读入InputBuffer,调用用户注册的messageCallback,将数据传递给上层应用处理,上层应用根据自己的业务逻辑处理数据。然后服务器构建好了要把数据响应给上层应用调用TcpConnection::send()发送数据,数据被写入OutputBuffer,然后把OutputBuffer的数据尝试发送到内核缓冲区,如果不能完全发送,则:将剩余数据留在OutputBuffer中,向Poller注册该连接socket的写事件,当socket可写时,SubLoop检测到写事件,继续发送OutputBuffer中的数据,数据发送完毕后,取消写事件关注
测试代码
#include "m_server.h"// 简单的回显服务器测试
class EchoServer {
private:TcpServer _server;public:EchoServer(int port) : _server(port) {// 设置服务器回调函数_server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));_server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));// 启用非活跃连接超时释放(60秒)_server.EnableInactiveRelease(60);// 设置工作线程数量_server.SetThreadCount(4);}// 连接建立回调void OnConnected(const PtrConnection& conn) {INF_LOG("新连接建立: %d", conn->Id());// 发送欢迎消息std::string welcome = "欢迎连接到回显服务器!\n";conn->Send(welcome.c_str(), welcome.size());}// 消息处理回调void OnMessage(const PtrConnection& conn, Buffer* buf) {// 读取所有可读数据std::string msg = buf->ReadAsStringAndPop(buf->ReadAbleSize());INF_LOG("收到消息[%d]: %s", conn->Id(), msg.c_str());// 回显消息std::string echo_prefix = "回显: ";std::string response = echo_prefix + msg;conn->Send(response.c_str(), response.size());}// 连接关闭回调void OnClosed(const PtrConnection& conn) {INF_LOG("连接关闭: %d", conn->Id());}// 启动服务器void Start() {INF_LOG("回显服务器启动在端口: %d", 8080);_server.Start();}
};int main(int argc, char* argv[]) {int port = 8080;if (argc > 1) {port = atoi(argv[1]);}INF_LOG("启动回显服务器,端口: %d", port);// 创建并启动服务器EchoServer server(port);server.Start();return 0;
}
TcpServer初始化阶段详解
TcpServer构造函数执行流程
private:uint64_t _next_id; //这是一个自动增长的连接ID,int _port;int _timeout; //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop; //这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor; //这是监听套接字的管理对象LoopThreadPool _pool; //这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;
TcpServer(int port):_port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//将监听套接字挂到baseloop上
}
详细步骤:
成员初始化:
- _port: 存储服务器监听端口
- _next_id: 连接ID计数器初始化为0
- _enable_inactive_release: 非活跃连接释放标志设为false
当TcpServer构造函数中初始化_baseloop成员变量时,会创建EventLoop对象:
EventLoop::EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)),_poller(), // 创建Poller对象_timer_wheel(this) {// 设置eventfd的读事件回调_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();
}
EventLoop创建详细步骤:
初始化线程ID:
_thread_id = std::this_thread::get_id()
- 记录创建EventLoop的线程ID,用于后续线程安全检查
创建eventfd:
_event_fd = CreateEventFd()static int CreateEventFd() {int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0) {ERR_LOG("CREATE EVENTFD FAILED!!");abort();}return efd;}
- 创建一个eventfd用于线程间通信,设置为非阻塞模式
创建event_channel:
_event_channel = new Channel(this, _event_fd)
- 创建一个Channel对象管理eventfd的事件
创建Poller对象:
_poller() // 默认构造函数
- 创建Poller对象用于事件监控
创建TimerWheel对象
_timer_wheel(this)
- 创建定时器管理对象
设置eventfd回调:
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
- 设置eventfd的读事件回调函数
启用eventfd读事件监控:
_event_channel->EnableRead();
- 启用eventfd的读事件监控,这会将eventfd添加到Poller中
Poller创建过程
当EventLoop构造函数中初始化_poller成员变量时,会创建Poller对象:
Poller::Poller() {_epfd = epoll_create(MAX_EPOLLEVENTS);if (_epfd < 0) {ERR_LOG("EPOLL CREATE FAILED!!");abort();}
}
Channel与Poller的关联
当调用Channel::EnableRead()等方法时,会将Channel添加到Poller中:
Acceptor创建过程:
- 传入&_baseloop和port构造Acceptor
- Acceptor构造函数内部用CreateServer(port)创建监听套接字:
Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) {_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
}
- CreateServer 内部调用:
int CreateServer(int port) {bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();
}
- Socket::CreateServer 执行五个关键操作:
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {//1. 创建套接字if (Create() == false) return false;//2. 设置非阻塞(如果需要)if (block_flag) NonBlock();//3. 绑定地址if (Bind(ip, port) == false) return false;//4. 开始监听(默认最大连接队列1024)if (Listen() == false) return false;//5. 启动地址端口重用ReuseAddress();return true;
}
- 创建Channel对象管理监听套接字,设置读事件回调函数为 Acceptor::HandleRead
Channel构造:
Channel(EventLoop *loop, int fd): _fd(fd), _events(0), _revents(0), _loop(loop) {}
- 将监听套接字的文件描述符与Channel关联
- 初始化事件标志和关联的EventLoop
- 此时还未设置任何监控事件(events = 0)
设置回调函数
_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
- 将Acceptor::HandleRead方法绑定为Channel的读事件回调
- 当监听套接字可读时(有新连接到达),会触发此回调
HandleRead实现
void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd);}
- 接受新连接,获取新连接的文件描述符
- 调用预设的_accept_callback处理新连接(即TcpServer::NewConnection)
线程池创建:
- 构造 pool(&_baseloop),传入主事件循环指针
- 此时线程池只是初始化,没有创建工作线程,等待后续 SetThreadCount 和 Create 调用
设置Acceptor回调函数
_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
详细解析:
回调绑定机制:
- 使用 std::bind 创建函数对象,绑定 TcpServer::NewConnection 成员函数
- this 指针作为第一个参数,确保回调能访问到TcpServer对象
- std::placeholders::1 占位符表示将来自Acceptor的新连接fd传给NewConnection
NewConnection函数职责:
void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);conn->Established();_conns.insert(std::make_pair(_next_id, conn));
}
启动监听
_acceptor.Listen();
内部实现详解:
Acceptor::Listen内部:
void Listen() { _channel.EnableRead(); }
- 调用Channel的EnableRead方法启动监听套接字的读事件监控
Channel::EnableRead内部:
void EnableRead() { _events |= EPOLLIN; Update(); }
- 将EPOLLIN标志添加到_events中
- 调用Update更新事件监控
Channel::Update内部:
void Update() { return _loop->UpdateEvent(this); }
- 调用EventLoop的UpdateEvent方法
EventLoop::UpdateEvent内部:
void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
- 调用Poller的UpdateEvent方法
Poller::UpdateEvent内部:
void UpdateEvent(Channel *channel) {bool ret = HasChannel(channel);if (ret == false) {//不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);
}
- 检查Channel是否已在监控集合中
- 如果不存在, 添加到_channels映射并调用epoll_ctl添加监控
- 如果存在, 调用epoll_ctl修改监控事件
Poller::Update内部:
void Update(Channel *channel, int op) {int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0) {ERR_LOG("EPOLLCTL FAILED!");}return;
}
- 准备epoll_event结构
- 调用epoll_ctl将监听套接字添加到epoll实例
- 此时监听套接字的可读事件(有新连接)开始被监控
启动服务器完整流程
void Start() {_pool.Create(); // 创建线程池中的工作线程_baseloop.Start(); // 启动主事件循环
}
线程池创建:
void Create() {if (_thread_count > 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++) {_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return ;
}
- 线程池创建指定数量的LoopThread
- 调用GetLoop获取并保存每个线程的EventLoop指针
- GetLoop会等待线程创建并初始化EventLoop
class LoopThread {
private:std::mutex _mutex; // 互斥锁std::condition_variable _cond; // 条件变量EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread; // EventLoop对应的线程private:void ThreadEntry() {EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);//加锁_loop = &loop;_cond.notify_all();}loop.Start();}public:LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}EventLoop *GetLoop() {EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex);//加锁_cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞loop = _loop;}return loop;}
};
线程创建:
LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
- 构造函数初始化_loop为NULL
- 创建线程,给这个线程绑定入口函数为ThreadEntry
- 线程立即开始执行
void ThreadEntry() {EventLoop loop; // 在线程栈上创建EventLoop对象{std::unique_lock<std::mutex> lock(_mutex); // 加锁_loop = &loop; // 将指针指向线程栈上的EventLoop_cond.notify_all(); // 通知等待的线程}loop.Start(); // 开始事件循环}
- 在线程内部创建EventLoop对象
- 通过互斥锁保护设置_loop指针
- 通知可能等待的GetLoop()调用
- 启动事件循环
获取EventLoop指针
EventLoop *GetLoop() {EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex); // 加锁_cond.wait(lock, [&](){ return _loop != NULL; }); // loop为NULL就一直阻塞loop = _loop;}return loop;}
- 主线程调用此函数获取工作线程的EventLoop
- 使用条件变量等待EventLoop创建完成
- 返回EventLoop指针
主事件循环启动:
void Start() {while(1) {//1. 事件监控std::vector<Channel *> actives;_poller.Poll(&actives);//2. 事件处理for (auto &channel : actives) {channel->HandleEvent();}
- 进入无限循环,监控事件、处理就绪事件、执行任务队列
- 此时服务器完全启动,等待客户端连接
连接建立流程详解
新连接到达流程
新连接到达,监听套接字可读事件触发,
Acceptor::HandleRead被调用
void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd);
}
- Poller检测到监听套接字可读(有新连接请求)
- EventLoop的事件循环调用Channel::HandleEvent
- Channel根据事件类型调用Acceptor::HandleRead
- 调用Socket::Accept获取新连接的文件描述符
- 触发_accept_callback即TcpServer::NewConnection
Socket::Accept实现:
int Accept() {// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);int newfd = accept(_sockfd, NULL, NULL);if (newfd < 0) {ERR_LOG("SOCKET ACCEPT FAILED!");return -1;}return newfd;
}
- 调用系统调用accept接受新连接
- 返回新连接的文件描述符
连接初始化详解
- TcpServer::NewConnection实现:
void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);conn->Established();_conns.insert(std::make_pair(_next_id, conn));
}
- 工作线程选择机制:
EventLoop* NextLoop() {if (_thread_count == 0) {return _baseloop;}_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];
}
- 如果没有工作线程,使用主线程的EventLoop
- 否则采用简单的轮询算法选择一个工作线程
- 确保连接均匀分布在各个线程中
- Connection构造过程:
Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
}
- 初始化连接ID和状态
- 创建Socket和Channel对象管理连接
- 设置Channel的各种事件回调
- 非活跃连接释放机制:
void EnableInactiveRelease(int sec) {_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
}void EnableInactiveReleaseInLoop(int sec) {_enable_inactive_release = true;if (_loop->HasTimer(_conn_id)) {return _loop->TimerRefresh(_conn_id);}_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
}
- 将操作封装为任务,确保在正确的线程中执行
- 添加定时器任务,超时后自动释放连接
- 使用连接ID作为定时器ID
连接就绪详解
- Connection::Established实现:
void Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
}
- 使用RunInLoop确保在连接所属的EventLoop线程中执行
- Connection::EstablishedInLoop实现:
void EstablishedInLoop() {assert(_statu == CONNECTING);_statu = CONNECTED;_channel.EnableRead();if (_connected_callback) _connected_callback(shared_from_this());
}
- 断言确保当前状态为CONNECTING
- 将状态更新为CONNECTED
- 调用Channel::EnableRead启动读事件监控:
void EnableRead() { _events |= EPOLLIN; Update(); }
- 最后调用用户设置的连接建立回调
- shared_from_this机制:
_connected_callback(shared_from_this())
- Connection继承自std::enable_shared_from_this
- shared_from_this()返回管理当前对象的shared_ptr
- 确保回调中使用的Connection对象生命周期安全
- 回调函数调用:
- 回调函数是在连接分配到的工作线程中执行的
- 这确保了每个连接的所有操作都在同一个线程中进行
- 避免了多线程并发访问导致的竞态条件
- 连接保存到管理表:
_conns.insert(std::make_pair(_next_id, conn));
- 将连接保存到TcpServer的_conns哈希表中
- 使用连接ID作为键,便于后续查找和管理
数据收发流程详解
数据接收流程
1. 可读事件触发,Connection::HandleRead被调用
当客户端发送数据到服务器时,连接对应的socket变为可读状态,触发事件处理:
// Channel::HandleEvent内部逻辑(事件分发)
void HandleEvent() {if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {if (_read_callback) _read_callback();}// ...其他事件处理
}
- Poller检测到连接socket可读
- EventLoop调用对应Channel的HandleEvent
- Channel根据事件类型调用Connection::HandleRead
Connection::HandleRead实现详解
void HandleRead() {// 1. 接收socket的数据,放到缓冲区char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if (ret < 0) {// 出错了,不能直接关闭连接return ShutdownInLoop();}// 将数据放入输入缓冲区,写入之后顺便将写偏移向后移动_in_buffer.WriteAndPush(buf, ret);// 2. 调用message_callback进行业务处理if (_in_buffer.ReadAbleSize() > 0) {// shared_from_this--从当前对象自身获取自身的shared_ptr管理对象return _message_callback(shared_from_this(), &_in_buffer);}
}
Socket::NonBlockRecv实现
ssize_t NonBlockRecv(void *buf, size_t len) {return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
}ssize_t Recv(void *buf, size_t len, int flag = 0) {// ssize_t recv(int sockfd, void *buf, size_t len, int flag);ssize_t ret = recv(_sockfd, buf, len, flag);if (ret <= 0) {// EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误// EINTR 表示当前socket的阻塞等待,被信号打断了if (errno == EAGAIN || errno == EINTR) {return 0; // 表示这次接收没有接收到数据}ERR_LOG("SOCKET RECV FAILED!!");return -1;}return ret; // 实际接收的数据长度
}
Buffer::WriteAndPush实现
void WriteAndPush(const void *data, uint64_t len) {Write(data, len);MoveWriteOffset(len);
}void Write(const void *data, uint64_t len) {// 1. 保证有足够空间,2. 拷贝数据进去if (len == 0) return;EnsureWriteSpace(len);const char *d = (const char *)data;std::copy(d, d + len, WritePosition());
}
调用用户消息回调
_message_callback(shared_from_this(), &_in_buffer);
- 传递连接管理对象和输入缓冲区
- 在回显服务器示例中,对应EchoServer::OnMessage
- 用户在回调中可以读取缓冲区数据并进行处理
数据发送流程
用户调用Connection::Send发送数据
void Send(const char *data, size_t len) {Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
}
- 创建临时Buffer对象存储数据
- 使用std::move优化性能,避免拷贝
- 通过RunInLoop确保在正确的线程中执行SendInLoop
Connection::SendInLoop实现
void SendInLoop(Buffer &buf) {if (_statu == DISCONNECTED) return;_out_buffer.WriteBufferAndPush(buf);if (_channel.WriteAble() == false) {_channel.EnableWrite();}
}
- 检查连接状态,断开则不发送
- 将数据写入输出缓冲区
- 如果未启动写事件监控,则启动
Channel::EnableWrite实现
void EnableWrite() { _events |= EPOLLOUT; Update(); }
- 添加EPOLLOUT事件到监控
- 通过Update更新事件监控
连接可写事件触发,Connection::HandleWrite被调用
void HandleWrite() {// _out_buffer中保存的数据就是要发送的数据ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret < 0) {// 发送错误就该关闭连接了if (_in_buffer.ReadAbleSize() > 0) {_message_callback(shared_from_this(), &_in_buffer);}return Release(); // 这时候就是实际的关闭释放操作了}_out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动if (_out_buffer.ReadAbleSize() == 0) {_channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控// 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放if (_statu == DISCONNECTING) {return Release();}}return;
}
Socket::NonBlockSend实现
ssize_t NonBlockSend(void *buf, size_t len) {if (len == 0) return 0;return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞
}ssize_t Send(const void *buf, size_t len, int flag = 0) {// ssize_t send(int sockfd, void *data, size_t len, int flag);ssize_t ret = send(_sockfd, buf, len, flag);if (ret < 0) {if (errno == EAGAIN || errno == EINTR) {return 0;}ERR_LOG("SOCKET SEND FAILED!!");return -1;}return ret; // 实际发送的数据长度
}
处理发送结果
- 如果发送出错(ret < 0):
- 处理剩余输入数据
- 调用Release关闭连接
- 如果发送成功:
- 更新缓冲区读偏移(MoveReadOffset(ret))
- 如果缓冲区为空,关闭写事件监控(DisableWrite())
- 如果连接状态为DISCONNECTING且所有数据已发送,释放连接
Channel::DisableWrite实现
void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
- 从事件标志中移除EPOLLOUT
- 通过Update更新事件监控
关于Http模块的流程
HTTP请求的接收流程
客户端发送HTTP请求
- 数据通过TCP连接到达服务器。
数据进入内核接收缓冲区
- 操作系统内核接收数据,暂存在socket的接收缓冲区。
muduo通过read()读取数据到in_buffer
- muduo检测到socket可读事件,调用read()把数据读到in_buffer。
HTTP解析器处理in_buffer
- muduo的HTTP模块会不断检查in_buffer,尝试解析出完整的HTTP请求(包括请求行,请求头,请求体)。
- 如果数据不完整(比如POST体还没收全),就继续等待数据到来。
解析出完整请求后,调用用户回调
- 一旦解析出完整的HTTP请求,muduo会调用你注册的HTTP请求处理回调(如onRequest),把HttpRequest对象传给你的业务代码。
HTTP响应的发送流程
你的业务代码生成HttpResponse
- 你在回调里构造HttpResponse对象,设置响应内容、状态码、头部等。
muduo把HttpResponse序列化到out_buffer
- muduo会把HttpResponse对象序列化成HTTP协议格式的字符串,写入out_buffer。
尝试write()到socket
- muduo尝试把out_buffer的数据写入socket(内核发送缓冲区)。
- 如果一次写不完,剩余数据继续留在out_buffer,等待下次socket可写时再发。
客户端收到HTTP响应
- 数据通过网络传输到客户端,客户端解析HTTP响应。
首先是会创建个TCPserver对象_server,然后就是然后这个_server成员列表会创建个_baseloop对象,_acceptor对象,_pool对象,会先执行它们的默认构造函数,base_loop对象的成员列表中会创建poller对象,channel对象,timer_wheel对象
模块初始化都干了什么事情
Poller对象的创建
EventLoop对象的创建
Channel对象的创建
Socket对象的创建
Acceptor对象的创建
TcpServer对象的创建
LoopThreadPool对象的创建
LoopThread对象的创建
Timer_wheel对象的创建
Connection对象的创建
1. 连接触发监听套接字的可读事件
客户端发起连接请求(connect),操作系统内核接收到SYN包,完成三次握手后,监听套接字变为可读状态(有新连接待接受)。
2. Poller检测到事件
在主事件循环(baseloop)中:
- _poller.Poll(&actives)检测到监听套接字的可读事件
- 将监听套接字对应的Channel添加到活跃Channel列表中
3. EventLoop处理就绪事件
在主事件循环的事件处理阶段:
for (auto &channel : actives) {channel->HandleEvent();
}
- 调用监听套接字Channel的HandleEvent方法
- Channel根据就绪事件类型调用相应的回调函数
4. Acceptor的HandleRead方法被调用
监听套接字的可读事件触发了Channel的读回调,即Acceptor::HandleRead:
void Acceptor::HandleRead() {int newfd = _socket.Accept(); // 接受新连接if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd); // 调用接受连接回调
}
5. TcpServer::NewConnection方法被调用
Acceptor的接受连接回调是TcpServer::NewConnection:
void TcpServer::NewConnection(int fd) {_next_id++; // 生成新的连接ID// 选择一个EventLoop来处理这个连接(负载均衡)EventLoop* loop = _pool.NextLoop();// 创建Connection对象管理这个连接PtrConnection conn(new Connection(loop, _next_id, fd));// 设置Connection的各种回调函数conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));// 如果启用了非活跃连接超时释放,设置超时时间if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);// 完成连接初始化conn->Established();// 将连接添加到管理表_conns.insert(std::make_pair(_next_id, conn));
}
6. Connection对象创建
当执行new Connection(loop, _next_id, fd)时:
- 初始化成员变量:连接ID、文件描述符、EventLoop指针等
- 创建Socket对象包装文件描述符
- 创建Channel对象关联到EventLoop和连接套接字
- 设置Channel的各种事件回调函数
- 连接状态设置为CONNECTING
7. Connection::Established方法执行
void Connection::Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
}
- 将EstablishedInLoop任务放入连接所属的EventLoop的任务队列
- 这确保了连接的初始化在正确的线程中执行
8. Connection::EstablishedInLoop方法执行
在连接所属的EventLoop线程中:
void Connection::EstablishedInLoop() {assert(_statu == CONNECTING); // 确保当前状态是CONNECTING_statu = CONNECTED; // 修改连接状态为CONNECTED// 启动读事件监控_channel.EnableRead();// 调用连接建立回调函数if (_connected_callback) _connected_callback(shared_from_this());
}
9. 启动读事件监控
当执行_channel.EnableRead()时:
void Channel::EnableRead() { _events |= EPOLLIN; Update();
}void Channel::Update() {// 通过EventLoop更新事件监控_loop->UpdateEvent(this);
}
10. 更新Poller中的事件监控
EventLoop::UpdateEvent调用Poller::UpdateEvent:
void Poller::UpdateEvent(Channel *channel) {bool ret = HasChannel(channel);if (ret == false) {// 不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);
}void Update(Channel *channel, int op) {int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0) {ERR_LOG("EPOLLCTL FAILED!!");}return;
}
- 将连接套接字添加到epoll的监控列表中
- 设置监控的事件类型(EPOLLIN表示可读事件)
11. 调用连接建立回调函数
执行用户设置的连接建立回调函数:
if (_connected_callback) _connected_callback(shared_from_this());
- 将当前Connection的shared_ptr传递给回调函数
- 用户可以在回调函数中处理连接建立事件
12. 连接就绪,等待数据交互
至此,新连接的处理流程完成:
- 连接已被接受并分配了唯一ID
- 连接被分配给了一个EventLoop进行管理
- 连接的读事件监控已启动
- 连接状态已设置为CONNECTED
- 连接已添加到TcpServer的连接管理表中
- 用户的连接建立回调函数已被调用
连接现在处于就绪状态,可以进行数据收发。当有数据到达时,会触发连接套接字的可读事件,从而调用Connection::HandleRead方法处理数据。