C++负载均衡远程调用学习之集成测试与自动启动脚本
目录
01 Lars-LbAgentV0.7-route_lb获取路由全部主机信息
02 Lars-LbAgentV0.7-API模块注册功能实现和测试工
03 Lars-LbAgentV0.7-项目构建工具
04 Lars-LbAgentV0.7-启动工具脚本实现
05 Lars-有关fd泄露的调试办法
06 Lars-qps性能测试
07 git企业开发基本流程
01 Lars-LbAgentV0.7-route_lb获取路由全部主机信息
#### C. 测试
启动server
```bash
$ ./lars_reactor
begin accept
```
启动client
```bash
$ nc 127.0.0.1 7777
```
客户端输入 文字,效果如下:
服务端:
```bash
ibuf.length() = 21
recv data = hello lars, By Aceld
```
客户端:
```bash
$ nc 127.0.0.1 7777
hello lars, By Aceld
hello lars, By Aceld
```
ok!现在我们的读写buffer机制已经成功的集成到我们的lars网络框架中了。
02 Lars-LbAgentV0.7-API模块注册功能实现和测试工
## 4) 事件触发event_loop
接下来我们要尝试添加多路IO的处理机制,当然linux的平台下, 最优的选择就是使用epoll来做,但是用原生的epoll实际上编程起来扩展性不是很强,那么我们就需要封装一套IO事件处理机制。
### 4.1 io_event基于IO事件封装
我们首先定义一个IO事件类来包括一个时间需要拥有的基本成员信息.
> lars_reactor/include/event_base.h
```cpp
#pragma once
/*
* 定义一些IO复用机制或者其他异常触发机制的事件封装
*
* */
class event_loop;
//IO事件触发的回调函数
typedef void io_callback(event_loop *loop, int fd, void *args);
/*
* 封装一次IO触发实现
* */
struct io_event
{
io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}
int mask; //EPOLLIN EPOLLOUT
io_callback *read_callback; //EPOLLIN事件 触发的回调
io_callback *write_callback;//EPOLLOUT事件 触发的回调
void *rcb_args; //read_callback的回调函数参数
void *wcb_args; //write_callback的回调函数参数
};
```
一个`io_event`对象应该包含 一个epoll的事件标识`EPOLLIN/EPOLLOUT`,和对应事件的处理函数`read_callback`,`write_callback`。他们都应该是`io_callback`类型。然后对应的函数形参。
### 4.2 event_loop事件循环处理机制
接下来我们就要通过event_loop类来实现io_event的基本增删操作,放在原生的`epoll`堆中。
> lars_reactor/include/event_loop.h
```h
#pragma once
/*
*
* event_loop事件处理机制
*
* */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include "event_base.h"
#define MAXEVENTS 10
// map: fd->io_event
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定义指向上面map类型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在监听的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;
class event_loop
{
public:
//构造,初始化epoll堆
event_loop();
//阻塞循环处理事件
void event_process();
//添加一个io事件到loop中
void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
//删除一个io事件从loop中
void del_io_event(int fd);
//删除一个io事件的EPOLLIN/EPOLLOUT
void del_io_event(int fd, int mask);
private:
int _epfd; //epoll fd
//当前event_loop 监控的fd和对应事件的关系
io_event_map _io_evs;
//当前event_loop 一共哪些fd在监听
listen_fd_set listen_fds;
//一次性最大处理的事件
struct epoll_event _fired_evs[MAXEVENTS];
};
```
03 Lars-LbAgentV0.7-项目构建工具
**属性**:
`_epfd`:是epoll原生堆的fd。
`_io_evs`:是一个hash_map对象,主要是方便我们管理`fd`<—>`io_event`的对应关系,方便我们来查找和处理。
`_listen_fds`:记录目前一共有多少个fd正在本我们的`event_loop`机制所监控.
`_fried_evs`:已经通过epoll_wait返回的被激活需要上层处理的fd集合.
**方法**:
`event_loop()`:构造函数,主要初始化epoll.
`event_process()`:永久阻塞,等待触发的事件,去调用对应的函数callback方法。
`add_io_event()`:绑定一个fd和一个`io_event`的关系,并添加对应的事件到`event_loop`中。
`del_io_event()`:从`event_loop`删除该事件。
04 Lars-LbAgentV0.7-启动工具脚本实现
具体实现方法如下:
> lars_reactor/src/event_loop.cpp
```cpp
#include "event_loop.h"
#include <assert.h>
//构造,初始化epoll堆
event_loop::event_loop()
{
//flag=0 等价于epll_craete
_epfd = epoll_create1(0);
if (_epfd == -1) {
fprintf(stderr, "epoll_create error\n");
exit(1);
}
}
//阻塞循环处理事件
void event_loop::event_process()
{
while (true) {
io_event_map_it ev_it;
int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
for (int i = 0; i < nfds; i++) {
//通过触发的fd找到对应的绑定事件
ev_it = _io_evs.find(_fired_evs[i].data.fd);
assert(ev_it != _io_evs.end());
io_event *ev = &(ev_it->second);
if (_fired_evs[i].events & EPOLLIN) {
//读事件,掉读回调函数
void *args = ev->rcb_args;
ev->read_callback(this, _fired_evs[i].data.fd, args);
}
else if (_fired_evs[i].events & EPOLLOUT) {
//写事件,掉写回调函数
void *args = ev->wcb_args;
ev->write_callback(this, _fired_evs[i].data.fd, args);
}
else if (_fired_evs[i].events &(EPOLLHUP|EPOLLERR)) {
//水平触发未处理,可能会出现HUP事件,正常处理读写,没有则清空
if (ev->read_callback != NULL) {
void *args = ev->rcb_args;
ev->read_callback(this, _fired_evs[i].data.fd, args);
}
else if (ev->write_callback != NULL) {
void *args = ev->wcb_args;
ev->write_callback(this, _fired_evs[i].data.fd, args);
}
else {
//删除
fprintf(stderr, "fd %d get error, delete it from epoll\n", _fired_evs[i].data.fd);
this->del_io_event(_fired_evs[i].data.fd);
}
}
}
}
}
05 Lars-有关fd泄露的调试办法
/*
* 这里我们处理的事件机制是
* 如果EPOLLIN 在mask中, EPOLLOUT就不允许在mask中
* 如果EPOLLOUT 在mask中, EPOLLIN就不允许在mask中
* 如果想注册EPOLLIN|EPOLLOUT的事件, 那么就调用add_io_event() 方法两次来注册。
* */
//添加一个io事件到loop中
void event_loop::add_io_event(int fd, io_callback *proc, int mask, void *args)
{
int final_mask;
int op;
//1 找到当前fd是否已经有事件
io_event_map_it it = _io_evs.find(fd);
if (it == _io_evs.end()) {
//2 如果没有操作动作就是ADD
//没有找到
final_mask = mask;
op = EPOLL_CTL_ADD;
}
else {
//3 如果有操作董酒是MOD
//添加事件标识位
final_mask = it->second.mask | mask;
op = EPOLL_CTL_MOD;
}
//4 注册回调函数
if (mask & EPOLLIN) {
//读事件回调函数注册
_io_evs[fd].read_callback = proc;
_io_evs[fd].rcb_args = args;
}
else if (mask & EPOLLOUT) {
_io_evs[fd].write_callback = proc;
_io_evs[fd].wcb_args = args;
}
//5 epoll_ctl添加到epoll堆里
_io_evs[fd].mask = final_mask;
//创建原生epoll事件
struct epoll_event event;
event.events = final_mask;
event.data.fd = fd;
if (epoll_ctl(_epfd, op, fd, &event) == -1) {
fprintf(stderr, "epoll ctl %d error\n", fd);
return;
}
//6 将fd添加到监听集合中
listen_fds.insert(fd);
}
06 Lars-qps性能测试
//删除一个io事件从loop中
void event_loop::del_io_event(int fd)
{
//将事件从_io_evs删除
_io_evs.erase(fd);
//将fd从监听集合中删除
listen_fds.erase(fd);
//将fd从epoll堆删除
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
}
//删除一个io事件的EPOLLIN/EPOLLOUT
void event_loop::del_io_event(int fd, int mask)
{
//如果没有该事件,直接返回
io_event_map_it it = _io_evs.find(fd);
if (it == _io_evs.end()) {
return ;
}
int &o_mask = it->second.mask;
//修正mask
o_mask = o_mask & (~mask);
if (o_mask == 0) {
//如果修正之后 mask为0,则删除
this->del_io_event(fd);
}
else {
//如果修正之后,mask非0,则修改
struct epoll_event event;
event.events = o_mask;
event.data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event);
}
}
```
07 git企业开发基本流程
这里`del_io_event`提供两个重载,一个是直接删除事件,一个是修正事件。
### 4.3 Reactor集成event_loop机制
好了,那么接下来,就让让Lars Reactor框架集成`event_loop`机制。
首先简单修正一个`tcp_server.cpp`文件,对之前的`do_accept()`的调度时机做一下修正。
1. 在`tcp_server`成员新增`event_loop`成员。
> lars_reactor/include/tcp_server.h
```h
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
class tcp_server
{
public:
//server的构造函数
tcp_server(event_loop* loop, const char *ip, uint16_t port);
//开始提供创建链接服务
void do_accept();
//链接对象释放的析构
~tcp_server();
private:
int _sockfd; //套接字
struct sockaddr_in _connaddr; //客户端链接地址
socklen_t _addrlen; //客户端链接地址长度
// ============= 新增 ======================
//event_loop epoll事件机制
event_loop* _loop;
// ============= 新增 ======================
};
```