C++负载均衡远程调用学习之异步消息任务功能与连接属性
目录
1.LarV0.11-异步消息机制的event_loop增添属性分析
2.LARS
3.LarV0.11异步消息发送机制的实现及测试
4.LarV0.11异步消息任务机制bug修复和效果演示
5.LarV0.12链接参数属性的绑定
1.LarV0.11-异步消息机制的event_loop增添属性分析
## 4) 事件触发event_loop
接下来我们要尝试添加多路IO的处理机制,当然linux的平台下, 最优的选择就是使用epoll来做,但是用原生的epoll实际上编程起来扩展性不是很强,那么我们就需要封装一套IO事件处理机制。
2.LARS
### 4.1 io_event基于IO事件封装
我们首先定义一个IO事件类来包括一个时间需要拥有的基本成员信息.
> lars_reactor/include/event_base.h
```cpp
#pragma once
/*
* 定义一些IO复用机制或者其他异常触发机制的事件封装
*
* */
class event_loop;
//IO事件触发的回调函数
typedef void io_callback(event_loop *loop, int fd, void *args);
/*
* 封装一次IO触发实现
* */
struct io_event
{
io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}
int mask; //EPOLLIN EPOLLOUT
io_callback *read_callback; //EPOLLIN事件 触发的回调
io_callback *write_callback;//EPOLLOUT事件 触发的回调
void *rcb_args; //read_callback的回调函数参数
void *wcb_args; //write_callback的回调函数参数
};
```
一个`io_event`对象应该包含 一个epoll的事件标识`EPOLLIN/EPOLLOUT`,和对应事件的处理函数`read_callback`,`write_callback`。他们都应该是`io_callback`类型。然后对应的函数形参。
3.LarV0.11异步消息发送机制的实现及测试
## 15) 异步消息任务机制
我们之前在`include/task_msg.h`中, 其中task的消息类型我们只是实现了`NEW_CONN`,目的是`thread_pool`选择一个线程,让一个线程里的`thread_queue`去创建一个连接对象。但是并没有对`NEW_TASK`的任务类型进行定义。这种类型是允许服务端去执行某项具体的业务。并不是根据客户端来消息去被动回复的业务,而是服务端主动发送的业务给到客户端。
4.LarV0.11异步消息任务机制bug修复和效果演示
### 15.1 任务函数类型
我们先定义task的回调函数类型
> lars_reactor/include/event_loop.h
```c
//...
//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);
//...
```
为了防止循环头文件引用,我们把typedef定义在`event_loop.h`中。
> lars_reactor/include/task_msg.h
```c
#pragma once
#include "event_loop.h"
//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);
struct task_msg
{
enum TASK_TYPE
{
NEW_CONN, //新建链接的任务
NEW_TASK, //一般的任务
};
TASK_TYPE type; //任务类型
//任务的一些参数
union {
//针对 NEW_CONN新建链接任务,需要传递connfd
int connfd;
//针对 NEW_TASK 新建任务,
//可以给一个任务提供一个回调函数
struct {
task_func task_cb; //注册的任务函数
void *args; //任务函数对应的形参
};
};
};
```
`task_func`是我们定义的一个任务的回调函数类型,第一个参数当然就是让哪个loop机制去执行这个task任务。很明显,一个loop是对应一个thread线程的。也就是让哪个thread去执行这个task任务。args是`task_func`的函数形参。
5.LarV0.12链接参数属性的绑定
### 15.2 event_loop模块添加task任务机制
我们知道,task绑定一个loop,很明显,一个`event_loop`应该拥有需要被执行的task集合。
在这里,我们将event_loop加上已经就绪的task任务的属性
> lars_reactor/include/event_loop.h
```c
#pragma once
/*
*
* event_loop事件处理机制
*
* */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include <vector>
#include "event_base.h"
#include "task_msg.h"
#define MAXEVENTS 10
// map: fd->io_event
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定义指向上面map类型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在监听的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;
//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);
class event_loop
{
public:
//构造,初始化epoll堆
event_loop();
//阻塞循环处理事件
void event_process();
//添加一个io事件到loop中
void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
//删除一个io事件从loop中
void del_io_event(int fd);
//删除一个io事件的EPOLLIN/EPOLLOUT
void del_io_event(int fd, int mask);
// ===========================================
//获取全部监听事件的fd集合
void get_listen_fds(listen_fd_set &fds) {
fds = listen_fds;
}
//=== 异步任务task模块需要的方法 ===
//添加一个任务task到ready_tasks集合中
void add_task(task_func func, void *args);
//执行全部的ready_tasks里面的任务
void execute_ready_tasks();
// ===========================================
private:
int _epfd; //epoll fd
//当前event_loop 监控的fd和对应事件的关系
io_event_map _io_evs;
//当前event_loop 一共哪些fd在监听
listen_fd_set listen_fds;
//一次性最大处理的事件
struct epoll_event _fired_evs[MAXEVENTS];
// ===========================================
//需要被执行的task集合
typedef std::pair<task_func, void*> task_func_pair;
std::vector<task_func_pair> _ready_tasks;
// ===========================================
};
```
添加了两个属性:
`task_func_pair`: 回调函数和参数的键值对.
`_ready_tasks`: 所有已经就绪的待执行的任务集合。
同时添加了两个主要方法:
`void add_task(task_func func, void *args)`: 添加一个任务到_ready_tasks中.
`void execute_ready_tasks()`:执行全部的_ready_tasks任务。
将这两个方法实现如下:
> lars_reactor/src/event_loop.cpp
```c
//...
//添加一个任务task到ready_tasks集合中
void event_loop::add_task(task_func func, void *args)
{
task_func_pair func_pair(func, args);
_ready_tasks.push_back(func_pair);
}
//执行全部的ready_tasks里面的任务
void event_loop::execute_ready_tasks()
{
std::vector<task_func_pair>::iterator it;
for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) {
task_func func = it->first;//任务回调函数
void *args = it->second;//回调函数形参
//执行任务
func(this, args);
}
//全部执行完毕,清空当前的_ready_tasks
_ready_tasks.clear();
}
//...
```