【仿muduo库实现并发服务器】Connection模块
仿muduo库实现并发服务器
- 一.Connection模块
- 二.成员变量
- 1.连接唯一ID(连接管理)
- 2.Socket对象(套接字操作管理)
- 3.Channel对象(连接事件管理)
- 4.Buffer对象(缓冲区管理)
- 5.ConnStatus对象(连接状态管理)
- 6.EventLoop对象(连接监控/定时任务管理)
- 7.Any对象(上下文管理)
- 8.是否启动非活跃超时连接销毁标志位
- 9.5个阶段性回调函数
- 三.成员函数
- 1.私有成员函数(为内部提供的接口)
- 1.1对连接的操作要都放入对应的线程中运行
- EnableInactiveReleaseInLoop()
- CancelInactiveReleaseInLoop()
- EstablishedInLoop()
- SendInLoop()
- ShutdownInLoop()
- ReleaseInLoop()
- UpgradeInLoop()
- 1.2描述符的就绪事件回调函数构建
- HandleRead()
- HandleWrite()
- HandleClose()
- HandleError()
- HandleEvent()
- 2.公有成员函数(为外部提供的接口)
- 2.1构造函数
- 2.2对连接操作的函数
- 2.3对阶段性回调函数设置的函数
- 2.4对成员变量操作的函数
一.Connection模块
Connection模块封装了Socket模块,Buffer模块,Channel模块,EventLoop模块等。
它实现对一个通信套接字的整体管理,每一个进行数据通信的套接字(也就是通过accept获取到的新连接)都会通过Connection进行管理。
主要的处理流程:
accept获取到新连接后就通过Connection进行管理,在Connection创建之处,该套接字对应channel的5个事件回调函数就已经被设置进去。然后启动非活跃超时连接销毁任务。最后通过eventloop中的poller启动该连接的读事件监控。
一旦客户端发送数据,就会触发读事件就绪,就会执行读事件的回调函数,读回调的主要工作就是从Socket中读取到对端发送的数据,并放入用户态的接受缓冲区中。
最后调用用户传入进行的业务处理函数,对数据进行处理。
在业务处理函数中当数据被处理完后,就会调用Connection给用户提供的Send发送接口,将数据放入用户态的发送缓冲区中,然后启动读事件监控。
读事件一旦就绪,就会将用户态的发送缓冲区中的数据通过Socket发送出去。
二.成员变量
// 一.成员变量
private:uint64_t _conn_id; // 1.连接的唯一标识,用来管理和查找连接// uint64_t _timer_id;定时器的唯一id,这里简单使用连接的id来代替int _sockfd; // 2.连接关联的描述符Socket _socket; // 3.对套接字的操作管理Channel _channel; // 4.对连接的事件管理ConnStatus _status; // 5.连接的状态EventLoop *_loop; // 6.连接所关联的一个loopBuffer _in_buffer; // 7.输入缓冲区,存放的是从socket中读取到的数据Buffer _out_buffer; // 8.输出缓冲区,存放的是要发送给对端的数据Any _context; // 9.连接的上下文bool _inactive_release; // 10.用来是否启动非活跃超时销毁任务的标志位,默认是关闭的false;// 5个阶段性回调函数using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;ConnectedCallback _connected_callback; // 11.当连接完全建立成功时要执行的阶段性回调函数,在Established()中会执行MessageCallback _message_callback; // 12.当输入缓冲区中存在数据时要执行的阶段性回调函数,在HandleRead()中会执行ClosedCallback _closed_callback; // 13.当连接关闭时要执行的阶段性回调函数,在Release()中会执行AnyEventCallback _anyevent_callback; // 14.当连接任意事件时要执行的阶段性回调函数ClosedCallback _svr_closed_callback; // 15.当服务器要关闭连接时要执行的回调函数
1.连接唯一ID(连接管理)
这个ID,用于连接的管理和查找,每一个连接都有唯一的ID。
还要注意连接要需要一个定时任务的id,这个定时器id,就是说我们如果要启动一个非活跃连接的销毁任务。那么,当添加定时任务时你就得给我一个指定的定时任务的id。和一个超时时间。所以我添加一个定时任务里边必须得设置一个timerid。
因为当一个连接,有事件触发了,它就要去刷新这个连接的活跃度。刷新连接的活跃度就是要延迟这个连接的一个定时销毁任务的执行,
所以呢,延迟定时任务首先是需要找到该定时任务的,怎么找呢?根据定时任务的id在哈希表中找,所以呢这个定时id它也是必须是唯一的。
这里直接用connid来表示定时器的id了,后面操作都会变得特别的方便。
2.Socket对象(套接字操作管理)
Socket对象是对套件字进行各种操作。从socket中读取数据,从socket中发送数据等操作。
3.Channel对象(连接事件管理)
那什么时候对socket操作呢?那肯定是某种事件就绪了,可读事件就绪就去对套接字进行读取操作
写事件就绪了就对套接字进行一个写入数据操作。所以呢还需要一个channel对象来对各种事件进行管理。
4.Buffer对象(缓冲区管理)
连接共有两个用户态的缓冲区,一个是接受缓冲区in_buffer,一个是发送缓冲区out_buffer。
输入缓冲区中存储的从socket读取到的数据。
输出缓冲区中存储的是服务器处理完要发送给对端的数据。
输入缓冲存放我们的是从socket中读取到的数据,socket正常会直接把读取到的数据放到我们的内核缓冲区里面来,然后让用户去进行一个处理,但是处理的时候呢,会出现这个数据不是一个完整的数据,如果不是一个完整数据,我就先不处理,就放入接受缓冲区中存着。
案例:假设有一个客户端给我们发送了一个请求。这个请求超过了内核接收缓冲区大小。
如果用户不把socket接收缓冲的数据都读出来,那socket就无法再继续接收新的数据了,但如果读取出来,也不是完整的数据,那你这个请求永远都不可能完整。
所以我们要把socket接收到的数据呢,全部要拿出来放到我们的缓冲区里边缓存起来。
服务器处理完数据后,就要将响应发送出去,发送数据的时候并不是直接来通过sock进行发送而是先把数据放到用户态发送缓冲区,等到我们的socket描述符事件可写了然后再去写入,否则的话,如果你直接进行数据发送就会导致一个情况那就是如果发送缓冲区socket的内核发送缓冲区里面数据已经满了。那你数据放不进去了,你发送是不是就会阻塞在这里边呢?那你那么阻塞的话,程序就卡在这了。
他就必须有一个输入缓冲区把你的数据你先放到这个缓冲区里面来等到可写事件触发了能写了再去写这么一个功能
5.ConnStatus对象(连接状态管理)
存在一个问题,一个连接如果我现在要去关闭,那我应该直接关闭连接吗?
不是的!
应该是在连接,接受缓冲区没有数据可处理了,发送缓冲区里边也没有数据再发送了这时候才应该去关闭连接。
所以这时候呢?就应该有一个状态。它标记连接当前它是否处于一个正常的通信状态或者一个待关闭状态或者一个关闭状态。
typedef enum
{DISCONNECTED, // 连接已经关闭了的状态CONNECTING, // 连接创建出来,但各种属性还没有设置的状态,待处理状态CONNECTED, // 连接完全创建成功,可以通信的状态DISCONNECTING // 待关闭状态
} ConnStatus;
6.EventLoop对象(连接监控/定时任务管理)
connection提供的操作,其实都是给组件使用者所提供的一个接口,而这些接口呢
在进行使用的时候,并不能让用户直接去进行操作,因为,要解决线程安全的一个问题。
所以我们就必须把对连接的所有的操作都放到我们的一个eventloop线程里边去执行。所以将connection呢绑定到固定的一个eventloop线程上去。
那么就相当于在创建一个连接之后呢,就给它分配了一个loop那么,这个连接就永远工作在它上边。
eventloop中的poller模块用于对通信套接字进行监听。
eventloop中的timerwheel模块用于添加定时任务。
7.Any对象(上下文管理)
8.是否启动非活跃超时连接销毁标志位
如果要启动或者取消非活跃连接的超时销毁任务,么所以他必须得有一个标志位能够去控制启动取消,默认是取消。
9.5个阶段性回调函数
这5个阶段性回调函数其实它就是连接在不同的阶段应该干的事
情,而不是某种事件触发,它所干的事情。一定要把这个要区分开了。
事件触发指是触发了可读事件把数据读取出来放到接收缓冲区,但是阶段性回调函数是什么呢?是接收到数据放到接收缓冲区之后,那么处于这个阶段的时候,我应该调用人家的业务处理函数是不是来进行应用处理,它是一个某个阶段的处理函数。
这个阶段性回调函数它是提供给外界来进行对连接的某种操作的,所以呢,对外的操作都使用智能指针所管理的对象来进行操作。
三.成员函数
1.私有成员函数(为内部提供的接口)
1.1对连接的操作要都放入对应的线程中运行
给用户提供一个发送数据的操作,其实并不是直接去发送一个数据,而是要先放入对应的任务队列中,在线程里边执行。
关闭连接那也并不是直接去关闭,而是是放到我们的一个线程当中再去进行执行关闭。所以这些接口,其实内部它们都要去调用我们的一个loop的一个runinloop,将它们加入到我们的一个任务队列当中去。
对于连接的操作操作呢,放到我们的一个loop线程里边去进行
EnableInactiveReleaseInLoop()
真正执行启动非活跃超时连接销毁的操作。
首先将标志位设置为true,表明启动。
然后判断时间轮定时器中是否存在该任务,如果已经存在,那么再启动就是刷新该任务,如果不存在就添加一个定时器。
绑定的销毁任务就是Release函数
// 3.真正的启动非活跃超时销毁任务void EnableInactiveReleaseInLoop(int sec){// ①设置标志位为true ②判断是否存在任务,如果存在任务则刷新延迟一下,如果不存在则添加任务_inactive_release = true;if (_loop->HasTaskE(_conn_id)) // 存在了就直接刷新延迟一下return _loop->RefreshTaskE(_conn_id);// 不存在就添加_loop->AddTaskE(_conn_id, sec, std::bind(&Connection::Release, this));}
CancelInactiveReleaseInLoop()
真正的取消非活跃超时销毁任务的操作。
首先将标准位设置为fasle,表示取消。
判断该任务是否存在,如果存在就取消该任务。
最终也是根据任务对象中的标志位来决定,是否执行销毁任务。
// 4.真正的取消非活跃超时销毁任务void CancelInactiveReleaseInLoop(){_inactive_release = false;if (_loop->HasTaskE(_conn_id)){return _loop->CancelTaskE(_conn_id);}}
当取消非活跃超时销毁任务时,canceled就会设置为true。就不会执行销毁任务。
// 析构,当对象释放时执行定时任务,并且从timerwheel中移除该定时任务的信息~TimerTask(){if (!_canceled)//当外界取消了定时任务,cancel的值就会被设置为true,cancel为true时就不去执行销毁任务{_task_cb();}// 定时任务销毁后,就会将对应的信息从时间轮中去除_release_cb();}
EstablishedInLoop()
这个接口是专门向外提供的一个用于连接建立完成之后的状态,那么所做的一个处理
比如连接获取之后,所处的状态下要进行各种设置,channel要设置它的几种事件回调,以及启动读监控。然后呢调用我们的一个连接完成阶段性回调函数都就都在这个接口里边。
就是一个连接一旦已经创建出来了,那我接下来要干什么呢?
就要设置channel的各个回调函数,然后把它添加到我们的可读监控里面去开始监听,以及调用建立成功阶段的回调函数。
当前的状态必须是上层的半连接状态CONNECTING,也就是连接还没有完全的设置完毕,连接的各种操作还并没有完全,在构造函数里边我们到时候会给他去那么进行各种构造,设置channel的回调函数。只有在完成调用该接口之后,那么它才算是连接真正完成。
因为启动读事件监控这一步是不能放到构造函数里面完成的,因为存在一开始就启动读事件监控了之后就有可能立刻有事件触发就绪,它这时候就会去有去刷新连接的活跃度去延迟定时任务的执行,那问题就来了:如果放到构造函数里边的话,那这时候呢?还没有添加定时任务呢,那到时候再去找这个找这个定时任务的时候铁定是找不着的所以逻辑是有问题的。
所以呢,就是启动读事件监控,他应该是在连接设置了非活跃连接销毁任务之后来进行。
所以启动读事件就绪需要单独进行,然后呢这块完了之后呢,我们的一个连接才是完全处理完毕了,才可以正常通信了。
// 5.连接创建初,还需要各种设置:①启动写事件监控 ②调用connected_callback回调函数void EstablishedInLoop(){// ①.设置状态为Connected ②启动读事件监控 ③调用connected_callback阶段性回调函数assert(_status == CONNECTING); // 确保当前连接的状态为CONNECTING。_status = CONNECTED; // 调用完这个,连接才算真正创建完毕_channel.EnableRead(); // 启动读事件监控// shared_from_this()是返回当前对象的shared_ptr对象if (_connected_callback)_connected_callback(shared_from_this());}
SendInLoop()
这个接口呢并不是实际的发送接口,只是把服务器处理完的数据放到了用户态发送缓冲区,然后启动可写事件监控。
实际的发送数据是在我们的一个Handlewrite里边来完成的,也就触发了可写事件之后才去那么进行一个发送的操作。
// 1.真正的将数放入到发送缓冲区中 ①将要发送给对端的数据放入到输出缓冲区中 ②启动写事件监控void SendInLoop(Buffer &buf){// 在业务处理函数中,对数据处理完后,就要调用send将响应放入到输出缓冲区中if (_status == DISCONNECTED)return;_out_buffer.WriteBufferAndPop(buf); // 将响应放入输出缓冲区中if (_channel.WriteAble() == false)_channel.EnableWrite(); // 启动写事件监控}
ShutdownInLoop()
这个接口也不是实际的一个释放连接操作,其实它只是需要去判断用户态接受缓冲区中是否还有数据待处理,用户态发送缓冲区中是否还有数据待发送。
当调用该接口时,连接就需要设置为我们的一个半关闭状态,就是没有数据在处理了就去关闭它。
有可能会存在一种情况就是我们的接收缓冲区里面是有数据的,但是,它数据可能不完整数据,上层组件使用者人家可能对接收缓冲区的数据根本就不处理。 人家确实就是有数据,但是我不想处理,那要不要关闭连接?是要关闭连接的。
就只管说我让你处理一下,你自己处理完了,不管你有没有处理完,我接下来都要去释放连接了的。
// 2.并不直接关闭,真正的先检测判断输入缓冲区中是否还有数据待处理,输出缓冲区中是否还有数据待发送void ShutdownInLoop(){// 首先将连接的状态设置为DISCONNECTING,待关闭状态_status = DISCONNECTING;// 先检测输入缓冲区中是否还有数据if (_in_buffer.AvailableReadSize() > 0){if(_message_callback)_message_callback(shared_from_this(), &_in_buffer);}// 再检测输出缓冲区中是否还有数据if (_out_buffer.AvailableReadSize() > 0){// 如果有数据就要启动写事件监控if (_channel.WriteAble() == false)_channel.EnableWrite();}// 如果最后都没有了数据就可以释放连接了if (_out_buffer.AvailableReadSize() == 0)return Release();}
ReleaseInLoop()
这个接口才是真正的释放连接的操作,非活跃超时销毁任务中的任务就是该函数。
真正的释放连接主要做一下步骤:
0.首先将连接状态置为已关闭DISCONNECTED状态
1.移除该连接在poller中所有的监控
2.关闭该连接的套接字
3.在关闭时如果时间轮定时器中还有该连接的任务,就要取消任务。
4.最后先执行关闭阶段要执行的阶段性回调函数,以及服务器真正要关闭连接的操作回调函数(就是将该连接从管理连接表中移除,将最后一个计数器减为0,这个连接才真正的被释放掉)
// 6.真正实际释放连接的操作void ReleaseInLoop(){// ①设置连接的状态为DISCONNECTED_status = DISCONNECTED;// ②移除连接的所有监控_channel.Remove();// ③关闭套接字_socket.Close();// ④如果这个连接还有非活跃定时销毁任务,就要取消定时任务,否则第二次销毁会报错if (_loop->HasTaskE(_conn_id))CancelInactiveReleaseInLoop();// ⑤执行阶段性的关闭回调函数// 首先要执行的用户级别的关闭阶段性回调函数,不能先执行服务器关闭阶段的回调函数,因为第二个是真正的关闭连接。服务器中存储着最终的连接的shared_ptr对象的一个计数// 如果先执行第二个就会使第一个回调函数访问连接释放,野指针if (_closed_callback)_closed_callback(shared_from_this());if (_svr_closed_callback)_svr_closed_callback(shared_from_this());}
这里有一个细节,当时写的时候出现bug,调试了才发现,就是最后一个行,没有加判断条件的话会段错误。
因为当定时销毁任务执行时,它会判断当前连接是否还有其他定时任务,而这个时候该连接的定时任务对象以及被销毁了,不然怎么会执行析构函数呢,而下面又想要访问该对象,如果不加以判断是否存在,就会发生错误。
void CancelTimerInLoop(uint64_t id){auto it = _timers.find(id);if (it == _timers.end())return;PtrTask pt = _timers[id].lock(); // 获取weakptr保存的shared_ptr,//当执行Release的时候说明任务对象已经被释放了,只有释放了才会调用析构函数执行里面的任务。//而该对象已经都被释放掉了,获取到该对象的shared_ptr也就为空,就不会再区访问该对象if (pt) pt->canceled();}
UpgradeInLoop()
1.2描述符的就绪事件回调函数构建
描述符的各种事件怎么处理都是由连接设置的,只有连接知道当读事件就绪时要干嘛,当写事件就绪时要干嘛。
所以要在连接内部设置好读/写/挂断/错误/任意5个事件的回调函数。这里先构建出来,然后在连接的构造里面,会进行设置。
HandleRead()
客户端发送数据过来,就会触发读事件就绪,读事件就绪应该干什么呢?
首先从socket中读取对方发送来的数据,然后放入到用户态的接受缓存区中。
最后调用阶段性回调函数即业务处理函数。
细节:
如果recv读取小于零,就是socket读取数据过程出错了怎么办?
应该直接关闭连接吗?
明显不是,出错了不能直接关闭连接,它应该这时候干什么呢?它这时候应该做的一个事情就看一下我们的发送缓冲区还有没有数据待发送。我们的接收缓冲区有没有数据待处理。
所以我们应该直接返回调用shutdowninloop();
为什么呢?因为服务器这边的读取出错,但它之前可能读取过数据,所以接受缓冲区中可能存在数据,发送缓冲区中也可能存在数据没有发送完,所以关闭连接是不好的。
void HandleRead(){char buf[65535];ssize_t ret = _socket.NonBlockRecv(buf, 65535); // 从socket中读取数据if (ret < 0){// ret<0说明读取失败,就要关闭连接,但关闭之前要判断缓冲区是否还有数据return ShutdownInLoop();}// ret>0说明读取成功_in_buffer.WriteAndPop(buf, ret); // 将读取的数据放入接受缓冲区中,并移动位置// 调用业务处理函数if (_in_buffer.AvailableReadSize() > 0)return _message_callback(shared_from_this(), &_in_buffer);}
HandleWrite()
当业务处理函数处理完数据之后,就会调用Send函数将响应放入到用户态的发送缓冲区中,并启动写事件监控,这个时候一旦内核发送缓冲区有空间,就会触发写事件监控,就会调用该接口。
该接口主要是将用户态的发送缓冲区中的数据通过socket发送给对端。而因为写事件是常就绪的,我们需要按需设置写监控,当将用户态的发送缓冲区中的数据全部发送出去后,就关闭写事件监控,防止它持续就绪,等待下次用户态中发送缓冲区中有数据时再触发。
细节1:
当send发送时出错了呢,怎么办?
不过如果错误是因为对端内核发送缓冲区里边已经没有空间的放进去数据了,或者它被信号打断了。我们就返回一个零代表一个数据数据都没发出去,这是可以原谅的。
但是呢?就是说,如果返回的是一个小于零是一个负1,那就代表是真的错了,真的是不可原谅的错误。那这时候,连接是不是就该关闭了。那连接能直接释放吗?
不能!
我们还得考虑一个情况:发送是发送不过去了,但是用户态的接受缓冲区中如果还存在数据待处理呢?这不影响呀,所以在关闭之前我们还需要判断一下接受缓冲区中是否还有数据,如果有则直接调用业务处理函数,处理一下。然后直接真正的释放连接。
细节2:
发送数据可能一次没有发送完,这样就会发送多次,直到用户态的发送缓冲区中没有数据了才会停止,将写事件监控取消。
触发调用HandleWrite的接口有多个,一个是正常通信中业务处理完会调用Send将数据放入发送缓冲区,然后启动写事件监控。
一个是要组件使用者主动释放连接时调用Shutdown,它会判断发送缓冲区中是否还有数据待发送,来决定启动写事件监控,当还有数据时,就要发送,这个已经是相当于还剩下最后一口气了,发送完就要释放连接了,不能再干啥了。
所以当状态为DISCONNECTING就表明是调用Shutdown,触发的,发送完数据就要调用Release释放连接。
// 9.当连接触发写事件时要调用的接口。①将发送缓冲区的数据通过socket发送出去void HandleWrite(){ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPostion(), _out_buffer.AvailableReadSize());if (ret < 0){//判断接受缓冲区中是否还有数据待处理if (_in_buffer.AvailableReadSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}// ret<0说明发送失败,后面再发送也会失败,所以关闭连接之前只需要关心输入缓存区中是否还有数据。return Release();}// ret>0说明发送成功,要注意将输出缓冲区的读位置移动_out_buffer.MoveRead(ret);// 有可能socket的缓冲区空间不足,发送一次后_out_buffer中还有数据,就会再次触发写事件,直到_out_buffer中没有数据if (_out_buffer.AvailableReadSize() == 0){// 说明输出缓冲区中的数据已经全部通过socket发送给对端了,这个时候就需要关闭写事件监控了// 因为写事件监控总是就绪的,所以按需要来设置监控_channel.DisableWrite();// 能触发调用HandleWrite的接口有好几个,有Send,Shutdown,当调用Shutdown接口时,也会启动写事件监控,执行完后就要释放连接if (_status == DISCONNECTING)return Release();}return;}
HandleClose()
当对端挂掉了,就会触发该接口,
对方挂掉了,服务器肯定无法发送了,但不影响它接受缓冲区中可能还有数据待处理,所以在关闭连接之前需要判断一下。
// 10.当连接触发关闭事件时要调用的接口。void HandleClose(){// 对端关闭了,服务器就无法再发送数据过去,但是还可以处理输入缓冲区中的数据if (_in_buffer.AvailableReadSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}return Release();}
HandleError()
同上
// 11.当连接触发错误事件时要调用的接口void HandleError(){return HandleClose();}
HandleEvent()
任意事件主要是用来刷新连接时长的,也就是延迟连接的销毁任务。
不过要注意,刷新的前提是已经有任务,任务已经启动了才可以对它刷新,所以根据标志位来确定是否启动了非活跃超时销毁任务。
最后还要执行一下阶段性回调函数,操作什么根据外界决定。
// 12.当连接触发任意事件时要调用的接口void HandleEvent(){// ①刷新连接时长,也就是延迟定时任务的执行 ②执行用户级的阶段性回调函数// 如果已经启动了非活跃超时连接销毁任务,则标志位一定为trueif (_inactive_release == true){_loop->RefreshTaskE(_conn_id);}if (_anyevent_callback){_anyevent_callback(shared_from_this());}}
2.公有成员函数(为外部提供的接口)
这些接口都是将对应真正的操作封装一遍,压入到对应loop中的任务队列中的。
而任务队列中的操作才是真正的操作。
2.1构造函数
一个Connection创建时,需要一个唯一的连接ID,对应的通信套接字,还有绑定一个eventloop线程。
在 构造函数中实现连接的一些基本设置,比如设置连接channel的5个事件回调。但要注意启动读事件监控不能放在构造函数中。
// 构造函数Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(conn_id),_sockfd(sockfd), _socket(_sockfd), _channel(loop, _sockfd), _status(CONNECTING),_loop(loop), _inactive_release(false){// 连接刚创建时,就将channel的5个事件回调函数设置好了_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));// 启动读事件监控不能放在构造函数里面进行,因为怕一设置,就有事件就绪,这个时候还没添加定时任务,更无法刷新延迟任务了// 所以启动读事件监控的操作一定要在启动非活跃销毁任务之后进行}
2.2对连接操作的函数
对连接的操作全部封装一遍,通过调用runinloop将这些真正对连接的操作压入到任务队列中执行。
不过有一个注意点,就是Send()函数,将服务器处理完的数据放入到用户态的发送缓冲区中,这个外界传入的data数据可能是一个临时的数据,而Send函数只是将发送操作压入到任务队列中,有可能没有被立刻执行,所以当真正执行时,存储这个data数据的空间已经被释放了,就无法再去访问了,所以在Send()函数还没有将操作压入到任务队列中时,先将这个空间的数据拷贝出来,用一个buffer对象存储,然后传入传入给SendInLoop,SendInLoop访问的就是buffer一个拷贝对象。这样就不怕data空间释放了。
/*对连接的操作:*/// 1.将数据放入到发送缓冲区中// 外界传入的data可能是一个临时的数据,而Send也只是将发送操作压入到任务队列中,有可能没有被立刻执行// 当执行发送操作时,存储数据的空间可能释放了,就无法访问了,所以重新构造一个容器变量存储该数据,这样访问// 访问的是一个拷贝变量而不是空间地址,就不怕了void Send(const char *data, size_t len){Buffer buf;buf.WriteAndPop(data, len);return _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, buf));}// 2.给组件使用者使用的关闭接口,并不是直接关闭,而是先检测判断缓冲区中是否还有数据void Shutdown(){return _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}// 3.启动非活跃超时销毁任务,当连接超过一定时间后没有操作就释放掉void EnableInactiveRelease(int sec){return _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}// 4.取消非活跃超时销毁任务void CancelInactiveRelease(){return _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}// 5.连接创建后要对其进行后续属性设置,channel事件回调函数设置,读事件监控启动,调用connected_callback函数void Established(){return _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}// 6.释放连接的操作是在这里执行,但又不是直接在这里执行而是在ReleaseInLoop中执行void Release(){return _loop->RunInLoop(std::bind(&Connection::ReleaseInLoop, this));}// 7.更新协议,本质就是替换连接的上下文以及阶段性回调函数void Upgrade(const Any &context,const ConnectedCallback &cb1,const MessageCallback &cb2,const ClosedCallback &cb3,const AnyEventCallback &cb4,const ClosedCallback &cb5);
2.3对阶段性回调函数设置的函数
向外提供还有就是设置连接阶段性回调函数设置,这些设置函数提供出来,用不用由用户决定,就是将上层用户定义的函数,设置进去。等到对应的阶段就会去执行。
/*5个阶段性回调函数的设置*/// 8.设置连接完全创建时要执行的阶段性回调函数void SetConnectedCallback(const ConnectedCallback &cb){_connected_callback = cb;}// 9.设置连接输入缓冲区中有数据时要执行的阶段性回调函数void SetMessageCallback(const MessageCallback &cb){_message_callback = cb;}// 10.设置连接关闭时,要执行的阶段性回调函数void SetClosedCallback(const ClosedCallback &cb){_closed_callback = cb;}// 11.设置连接任意阶段要执行的阶段性回调函数void SetAnyEventCallback(const AnyEventCallback &cb){_anyevent_callback = cb;}// 12.设置服务器要关闭连接时要执行的阶段性回调函数void SetSvrClosedCallback(const ClosedCallback &cb){_svr_closed_callback = cb;}
2.4对成员变量操作的函数
/*对成员变量的操作*/// 13.获取该连接的套接字int Fd() { return _sockfd; }// 14.获取该连接的唯一标识uint64_t Id() { return _conn_id; }// 15.判断当前阶段是否是连接完全创建成功,即可正常通信阶段bool IsConnected() { return _status == CONNECTED; }// 16.设置上下文void SetContext(const Any &context);// 17获取上下文Any *GetContext();