当前位置: 首页 > ds >正文

C++负载均衡远程调用学习之异步消息任务功能与连接属性

目录

1.LarV0.11-异步消息机制的event_loop增添属性分析

2.LARS

3.LarV0.11异步消息发送机制的实现及测试

4.LarV0.11异步消息任务机制bug修复和效果演示

5.LarV0.12链接参数属性的绑定


1.LarV0.11-异步消息机制的event_loop增添属性分析

## 4) 事件触发event_loop

​        接下来我们要尝试添加多路IO的处理机制,当然linux的平台下, 最优的选择就是使用epoll来做,但是用原生的epoll实际上编程起来扩展性不是很强,那么我们就需要封装一套IO事件处理机制。

2.LARS

### 4.1 io_event基于IO事件封装

​        我们首先定义一个IO事件类来包括一个时间需要拥有的基本成员信息.

> lars_reactor/include/event_base.h

```cpp
#pragma once
/*
 * 定义一些IO复用机制或者其他异常触发机制的事件封装
 *
 * */

class event_loop;

//IO事件触发的回调函数
typedef void io_callback(event_loop *loop, int fd, void *args);

/*
 * 封装一次IO触发实现 
 * */
struct io_event 
{
             io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}

    int mask; //EPOLLIN EPOLLOUT
    io_callback *read_callback; //EPOLLIN事件 触发的回调 
    io_callback *write_callback;//EPOLLOUT事件 触发的回调
    void *rcb_args; //read_callback的回调函数参数
    void *wcb_args; //write_callback的回调函数参数
};
```

​        一个`io_event`对象应该包含 一个epoll的事件标识`EPOLLIN/EPOLLOUT`,和对应事件的处理函数`read_callback`,`write_callback`。他们都应该是`io_callback`类型。然后对应的函数形参。

3.LarV0.11异步消息发送机制的实现及测试

## 15) 异步消息任务机制

​        我们之前在`include/task_msg.h`中, 其中task的消息类型我们只是实现了`NEW_CONN`,目的是`thread_pool`选择一个线程,让一个线程里的`thread_queue`去创建一个连接对象。但是并没有对`NEW_TASK`的任务类型进行定义。这种类型是允许服务端去执行某项具体的业务。并不是根据客户端来消息去被动回复的业务,而是服务端主动发送的业务给到客户端。

4.LarV0.11异步消息任务机制bug修复和效果演示

### 15.1 任务函数类型

​        我们先定义task的回调函数类型

> lars_reactor/include/event_loop.h

```c
//...


//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);

//...

```

​       为了防止循环头文件引用,我们把typedef定义在`event_loop.h`中。

> lars_reactor/include/task_msg.h

```c
#pragma  once
#include "event_loop.h"

//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);

struct task_msg
{
    enum TASK_TYPE
    {
        NEW_CONN,   //新建链接的任务
        NEW_TASK,   //一般的任务
    };

    TASK_TYPE type; //任务类型

    //任务的一些参数
    
    union {
        //针对 NEW_CONN新建链接任务,需要传递connfd
        int connfd;

        //针对 NEW_TASK 新建任务, 
        //可以给一个任务提供一个回调函数
        struct {
            task_func task_cb; //注册的任务函数
            void *args;        //任务函数对应的形参
        };
    };
};
```

​        `task_func`是我们定义的一个任务的回调函数类型,第一个参数当然就是让哪个loop机制去执行这个task任务。很明显,一个loop是对应一个thread线程的。也就是让哪个thread去执行这个task任务。args是`task_func`的函数形参。

​    

5.LarV0.12链接参数属性的绑定

### 15.2 event_loop模块添加task任务机制

​        我们知道,task绑定一个loop,很明显,一个`event_loop`应该拥有需要被执行的task集合。

​         在这里,我们将event_loop加上已经就绪的task任务的属性

> lars_reactor/include/event_loop.h

```c
#pragma once
/*
 *
 * event_loop事件处理机制
 *
 * */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include <vector>
#include "event_base.h"

#include "task_msg.h"

#define MAXEVENTS 10

// map: fd->io_event 
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定义指向上面map类型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在监听的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;

//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);

class event_loop 
{
public:
    //构造,初始化epoll堆
    event_loop();

    //阻塞循环处理事件
    void event_process();

    //添加一个io事件到loop中
    void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);

    //删除一个io事件从loop中
    void del_io_event(int fd);

    //删除一个io事件的EPOLLIN/EPOLLOUT
    void del_io_event(int fd, int mask);

    // ===========================================
    //获取全部监听事件的fd集合
    void get_listen_fds(listen_fd_set &fds) {
        fds = listen_fds;
    }

    //=== 异步任务task模块需要的方法 ===
    //添加一个任务task到ready_tasks集合中
    void add_task(task_func func, void *args);
    //执行全部的ready_tasks里面的任务
    void execute_ready_tasks();
    // ===========================================
private:
    int _epfd; //epoll fd

    //当前event_loop 监控的fd和对应事件的关系
    io_event_map _io_evs;

    //当前event_loop 一共哪些fd在监听
    listen_fd_set listen_fds;

    //一次性最大处理的事件
    struct epoll_event _fired_evs[MAXEVENTS];

        // ===========================================
    //需要被执行的task集合
    typedef std::pair<task_func, void*> task_func_pair;
    std::vector<task_func_pair> _ready_tasks;
    // ===========================================
};
```

添加了两个属性:

`task_func_pair`: 回调函数和参数的键值对.

`_ready_tasks`: 所有已经就绪的待执行的任务集合。



同时添加了两个主要方法:

`void add_task(task_func func, void *args)`: 添加一个任务到_ready_tasks中.

`void execute_ready_tasks()`:执行全部的_ready_tasks任务。

将这两个方法实现如下:

> lars_reactor/src/event_loop.cpp

```c
//...

//添加一个任务task到ready_tasks集合中
void event_loop::add_task(task_func func, void *args)
{
    task_func_pair func_pair(func, args);
    _ready_tasks.push_back(func_pair);
}

//执行全部的ready_tasks里面的任务
void event_loop::execute_ready_tasks()
{
    std::vector<task_func_pair>::iterator it;

    for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) {
        task_func func = it->first;//任务回调函数
        void *args = it->second;//回调函数形参

        //执行任务
        func(this, args);
    }

    //全部执行完毕,清空当前的_ready_tasks
    _ready_tasks.clear();
}

//...
```

http://www.xdnf.cn/news/3824.html

相关文章:

  • CVPR2021 | 重新思考视觉Transformer中的自注意力机制
  • Java学习手册:Spring 生态其他组件介绍
  • 单细胞测序试验设计赏析(一)
  • AWS在跨境电商中的全场景实践与未来生态构建
  • D. 例题3.2.2 整数划分问题
  • 二种MVCC对比分析
  • 学习黑客风险Risk
  • iOS启动优化:从原理到实践
  • 2025年渗透测试面试题总结-拷打题库35(题目+回答)
  • 【C++】:C++17新特性
  • Vivado FPGA 开发 | 创建工程 / 仿真 / 烧录
  • 2845. 统计趣味子数组的数目
  • 【LLaMA-Factory实战】Web UI快速上手:可视化大模型微调全流程
  • The Sims 4 模拟人生 4 [DLC 解锁] [Steam Epic EA] [Windows SteamOS]
  • 《操作系统真象还原》第十二章(2)——进一步完善内核
  • 影刀RPA中新增自己的自定义指令
  • UDP网络编程
  • Xilinx FPGA | 管脚约束 / 时序约束 / 问题解析
  • 安卓基础(悬浮窗)
  • Java中深拷贝与浅拷贝的深入探讨
  • C++类_虚基类
  • IDEA快速上手Maven项目:模板选择 + 多模块拆分
  • Spring Boot 微服务打包为 Docker 镜像并部署到镜像仓库实战案例
  • 合成复用原则(CRP)
  • IDEA回滚代码操作
  • Windows下调试WebRTC源码
  • BOSS的收入 - 华为OD机试(A卷,C++题解)
  • 昇腾的昇思MindSpore是什么?跟TensorFlow/PyTorch 等第三方框架有什么区别和联系?【浅谈版】
  • c++ 二级指针 vs 指针引用
  • 小土堆pytorch数据加载概念以及实战