当前位置: 首页 > backend >正文

C++负载均衡远程调用学习之消息路分发机制

目录

 

1.LARV0.5-TCP_server链接管理的功能实现及测试

2.LARV0.6

3.LARV0.6

4.LARV0.6

5.LARV0.6-tcp_server集成

6.LARV0.6-tcp_server集成消息路由分发机制总结

7.LARV0.6回顾


 

1.LARV0.5-TCP_server链接管理的功能实现及测试

### 16.2 完成Lars Reactor V0.12开发

### server端

```c
#include "tcp_server.h"
#include <string>
#include <string.h>
#include "config_file.h"

tcp_server *server;

//回显业务的回调函数
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("callback_busi ...\n");
    //直接回显
    conn->send_message(data, len, msgid);

    printf("conn param = %s\n", (const char *)conn->param);
}

//打印信息回调函数
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("recv client: [%s]\n", data);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


//新客户端创建的回调
void on_client_build(net_connection *conn, void *args)
{
    int msgid = 101;
    const char *msg = "welcome! you online..";

    conn->send_message(msg, strlen(msg), msgid);

    //将当前的net_connection 绑定一个自定义参数,供我们开发者使用
    const char *conn_param_test = "I am the conn for you!";
    conn->param = (void*)conn_param_test;
}

//客户端销毁的回调
void on_client_lost(net_connection *conn, void *args)
{
    printf("connection is lost !\n");
}


int main() 
{
    event_loop loop;

    //加载配置文件
    config_file::setPath("./serv.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 8888);

    printf("ip = %s, port = %d\n", ip.c_str(), port);

    server = new tcp_server(&loop, ip.c_str(), port);

    //注册消息业务路由
    server->add_msg_router(1, callback_busi);
    server->add_msg_router(2, print_busi);

    //注册链接hook回调
    server->set_conn_start(on_client_build);
    server->set_conn_close(on_client_lost);


    loop.event_process();

    return 0;
}
```

2.LARV0.6消息路由分发机制msg_router的定义

### client端

```c
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>


//客户端业务
void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
{
    //得到服务端回执的数据 
    char *str = NULL;
    
    str = (char*)malloc(len+1);
    memset(str, 0, len+1);
    memcpy(str, data, len);
    printf("recv server: [%s]\n", str);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


//客户端销毁的回调
void on_client_build(net_connection *conn, void *args)
{
    int msgid = 1; 
    const char *msg = "Hello Lars!";

    conn->send_message(msg, strlen(msg), msgid);
}

//客户端销毁的回调
void on_client_lost(net_connection *conn, void *args) 
{
    printf("on_client_lost...\n");
    printf("Client is lost!\n");
}



int main() 
{

    event_loop loop;

    //创建tcp客户端
    tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");


    //注册消息路由业务
    client.add_msg_router(1, busi);
    client.add_msg_router(101, busi);


    //设置hook函数
    client.set_conn_start(on_client_build);
    client.set_conn_close(on_client_lost);


    //开启事件监听
    loop.event_process();

    return 0;
}
```

​        和之前的client无任何改变。



### 运行结果

3.LARV0.6抽象链接类Net_connection定义

* 服务端:

```c
$ ./server 
msg_router init...
ip = 127.0.0.1, port = 7777
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
callback_busi ...
conn param = I am the conn for you!
```

​        会发现我们是可以在callback中拿到conn的属性



# 四、Lars-DNS Service开发

## **1) 简介**

​        负责接收各agent对某modid、cmdid的请求并返回该modid、cmdid下的所有节点,即为agent提供获取路由服务

### 1.1 架构

![3-Lars-dnsserver](./pictures/3-Lars-dnsserver.png)

### **1.2 网络模块**

​    DnsService服务模型采用了one loop per thread TCP服务器,主要是基于Lars-Reactor:

- 主线程Accepter负责接收连接(agent端连接)
- Thread loop们负责处理连接的请求、回复;(agent端发送查询请求,期望获取结果)



### **1.3 双map模型** 

​    DnsServer使用两个map存储路由数据(key = `modid<<32 + cmdid` , value = set of `ip<<32 + port`)

- 一个`RouterDataMap_A`:主数据,查询请求在此map执行
- 另一个`RouterDataMap_B`:后台线程周期性重加载路由到此map,作为最新数据替换掉上一个map

这两个map分别由指针`data_pointer`与`temp_pointer`指向.



### 1.4 Backend Thread守护线程

**dns service还有个业务线程:** 

1、负责周期性(default:1s)检查`RouteVersion`表版本号,如有变化,说明`RouteData`有变更,则重加载`RouteData`表内容;然后将`RouteChange`表中被变更的`modid`取出,根据订阅列表查出`modid`被哪些连接订阅后,向所有工作线程发送任务:要求订阅这些`modid`的连接推送`modid`路由到agent

2、此外,还负责周期性(default:8s)重加载`RouteData`表内容

4.LARV0.6-tcp_server集成路由消息分发机制

### **主业务**

1. 服务启动时,`RouteData`表被加载到`data_pointer`指向的`RouterDataMap_A`中, `temp_pointer`指向的`RouterDataMap_B`为空

          2. 服务启动后,agent发来Query for 请求某`modid/cmdid`,到其所在Thread Loop上,上读锁查询`data_pointer`指向的`RouterDataMap_A`,返回查询结果;
          3. 如果此`modid/cmdid`不存在,则把`agent ip+port`+`moid/cmdid`发送到Backend thread loop1的队列,让其记录到ClientMap

后台线程Backend thread每隔10s清空`temp_pointer`指向的`RouterDataMap_B`,再加载`RouteData`表内容到`temp_pointer`指向的`RouterDataMap_B`,加载成功后交换指针`data_pointer`与`temp_pointer`指针内容,于是完成了路由数据的更新.

## **2) 数据库创建**

* 表`RouteData`: 保存了所有mod路由信息.

| 字段       | 数据类型         | 是否可以为空 | 主键 | 默认 | 附加   | 说明         |
| ---------- | ---------------- | ------------ | ---- | ---- | ------ | ------------ |
| id         | int(10) unsigned | No           | 是   | NULL | 自增长 | 该条数据ID   |
| modid      | int(10) unsigned | No           |      | NULL |        | 模块ID       |
| cmdid      | int(10) unsigned | No           |      | NULL |        | 指令ID       |
| serverip   | int(10) unsigned | No           |      | NULL |        | 服务器IP地址 |
| serverport | int(10) unsigned | No           |      | NULL |        | 服务器端口   |



* 表`RouteVersion`: 当前`RouteData`路由版本号,每次管理端修改某mod的路由,`RouteVersion`表中的版本号都被更新为当前时间戳

| 字段    | 数据类型         | 是否可以为空 | 主键 | 默认 | 附加   |
| ------- | ---------------- | ------------ | ---- | ---- | ------ |
| id      | int(10) unsigned | No           | 是   | NULL | 自增长 |
| version | int(10) unsigned | No           |      | NULL |        |

5.LARV0.6-tcp_server集成

* 表`RouteChange`: 每次管理端修改某mod的路由,会记录本次对哪个mod进行修改(增、删、改),以便指示最新的`RouteData`路由有哪些mod变更了。

| 字段    | 数据类型            | 是否可以为空 | 主键 | 默认 | 附加   |
| ------- | ------------------- | ------------ | ---- | ---- | ------ |
| id      | int(10) unsigned    | No           | 是   | NULL | 自增长 |
| modid   | int(10) unsigned    | No           |      | NULL |        |
| cmdid   | int(10) unsigned    | No           |      | NULL |        |
| version | bigint(20) unsigned | No           |      | NULL |        |



相关创建表格的sql语句如下`lars_dns.sql`

```sql
DROP DATABASE if exists lars_dns;
CREATE DATABASE lars_dns;
USE lars_dns;

DROP TABLE IF EXISTS `RouteData`;
CREATE TABLE `RouteData` (
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `modid` int(10) unsigned NOT NULL,
    `cmdid` int(10) unsigned NOT NULL,
    `serverip` int(10) unsigned NOT NULL,
    `serverport` int(10) unsigned NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=116064 DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `RouteVersion`;
CREATE TABLE RouteVersion (
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `version` int(10) unsigned NOT NULL,
    PRIMARY KEY (`id`)
);
INSERT INTO RouteVersion(version) VALUES(0);

DROP TABLE IF EXISTS `RouteChange`;
CREATE TABLE RouteChange (
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `modid` int(10) unsigned NOT NULL,
    `cmdid` int(10) unsigned NOT NULL,
    `version` bigint(20) unsigned NOT NULL,
    PRIMARY KEY (`id`)
);
```

6.LARV0.6-tcp_server集成消息路由分发机制总结

 

7.LARV0.6回顾

 

 

 

 

http://www.xdnf.cn/news/3474.html

相关文章:

  • python创建Directory和python package的区别
  • 【分享】数据恢复大师6.10[特殊字符]恢复手机误删的数据[特殊字符]
  • 运维工作中,Ansible常用模块有哪些?
  • 【云备份】服务端工具类实现
  • 解决 Oracle EXPDP 無法鎖定 NFS 相關錯誤: ORA-27086 flock: No locks available
  • ActiveMQ 性能优化与网络配置实战(一)
  • 2025MathorCup数学应用挑战赛B题
  • 机器视觉开发-打开摄像头
  • GAMES202-高质量实时渲染(Real-time Environment Mapping)
  • 【二】 数字图像的运算 (下)【数字图像处理】
  • Java学习手册:Spring 数据访问
  • 系统架构设计师:设计模式概述
  • Centos7.9 安装mysql5.7
  • 突破zero-RL 困境!LUFFY 如何借离线策略指引提升推理能力?
  • 日语学习-日语知识点小记-构建基础-JLPT-N4阶段(13): ておきます ています & てあります
  • C++11新特性_Lambda 表达式
  • 世纪华通:从财报数据看其在游戏领域的成功与未来
  • 使用Java正则表达式进行分组与匹配文本提取
  • OpenAI最新发布的GPT-4.1系列模型,性能体验如何?
  • Unity 几种主流的热更新方式
  • 【C++】类和对象(中)——默认成员函数详解(万字)
  • 存算一体架构下的新型AI加速范式:从Samsung HBM-PIM看近内存计算趋势
  • Umi-OCR项目(1)
  • 产品设计三板斧与抓住事物本质的关键意义
  • 【iview】icon样式
  • Vue 生命周期全解析:理解组件从创建到销毁的全过程
  • FPGA中级项目8———UART-RAM-TFT
  • 【Android】四大组件之BroadcastReceiver
  • Lucene并不是只有倒排索引一种数据结构,支持多种数据结构
  • react学习笔记3——基于React脚手架