C++负载均衡远程调用学习之实时监测与自动发布功能
目录
1.LarsDns-V0.3BackenThread后端实时监控线程流程
2.LarsDns-V0.3加载当前Route版本号方法实现
3.LarsDns-V0.3加载RouteChange修改表的信息
4.LarsDns-V0.3实现实时监控流程线程业务
5.LarsDnsV0.3编译bug修改和功能测试
6.Lars Web管理界面的集成
7.LarsDnsV0.3 Dns模块总结
1.LarsDns-V0.3BackenThread后端实时监控线程流程
### 4.4 完成Lars Reactor V0.3开发
我们将lars_reactor/example/lars_reactor_0.2的代码复制一份到 lars_reactor/example/lars_reactor_0.3中。
> lars_reactor/example/lars_reactor_0.3/lars_reactor.cpp
```cpp
#include "tcp_server.h"
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
loop.event_process();
return 0;
}
```
2.LarsDns-V0.3加载当前Route版本号方法实现
编译。
启动服务器
```bash
$ ./lars_reactor
```
分别启动2个客户端
client1
```bash
$ nc 127.0.0.1 7777
hello Iam client1
hello Iam client1 回显
```
client2
```bash
$ nc 127.0.0.1 7777
hello Iam client2
hello Iam client2 回显
```
3.LarsDns-V0.3加载RouteChange修改表的信息
服务端打印
```bash
$ ./lars_reactor
begin accept
ibuf.length() = 18
recv data = hello Iam client1
begin accept
ibuf.length() = 18
recv data = hello Iam client2
```
目前我们已经成功将`event_loop`机制加入到reactor中了,接下来继续添加功能。
4.LarsDns-V0.3实现实时监控流程线程业务
## 5) tcp链接与Message消息封装
好了,现在我们来将服务器的连接做一个简单的封装,在这之前,我们要将我我们所发的数据做一个规定,采用TLV的格式,来进行封装。目的是解决TCP传输的粘包问题。
### 5.1 Message消息封装

先创建一个message.h头文件
> lars_reactor/include/message.h
```h
#pragma once
//解决tcp粘包问题的消息头
struct msg_head
{
int msgid;
int msglen;
};
//消息头的二进制长度,固定数
#define MESSAGE_HEAD_LEN 8
//消息头+消息体的最大长度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
```
5.LarsDnsV0.3编译bug修改和功能测试
### 5.2 创建一个tcp_conn连接类
> lars_reactor/include/tcp_conn.h
```h
#pragma once
#include "reactor_buf.h"
#include "event_loop.h"
//一个tcp的连接信息
class tcp_conn
{
public:
//初始化tcp_conn
tcp_conn(int connfd, event_loop *loop);
//处理读业务
void do_read();
//处理写业务
void do_write();
//销毁tcp_conn
void clean_conn();
//发送消息的方法
int send_message(const char *data, int msglen, int msgid);
private:
//当前链接的fd
int _connfd;
//该连接归属的event_poll
event_loop *_loop;
//输出buf
output_buf obuf;
//输入buf
input_buf ibuf;
};
```
6.Lars Web管理界面的集成
简单说明一下里面的成员和方法:
**成员**:
`_connfd`:server刚刚accept成功的套接字
`_loop`:当前链接所绑定的事件触发句柄.
`obuf`:链接输出缓冲,向对端写数据
`ibuf`:链接输入缓冲,从对端读数据
**方法**:
`tcp_client()`:构造,主要在里面实现初始化及创建链接链接的connect过程。
`do_read()`:读数据处理业务,主要是EPOLLIN事件触发。
`do_write()`:写数据处理业务,主要是EPOLLOUT事件触发。
`clean_conn()`:清空链接资源。
`send_message()`:将消息打包成TLV格式发送给对端。
接下来,实现以下`tcp_conn`类.
>lars_reactor/src/tcp_conn.cpp
```cpp
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include "tcp_conn.h"
#include "message.h"
//回显业务
void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn)
{
conn->send_message(data, len, msgid);
}
//连接的读事件回调
static void conn_rd_callback(event_loop *loop, int fd, void *args)
{
tcp_conn *conn = (tcp_conn*)args;
conn->do_read();
}
//连接的写事件回调
static void conn_wt_callback(event_loop *loop, int fd, void *args)
{
tcp_conn *conn = (tcp_conn*)args;
conn->do_write();
}
//初始化tcp_conn
tcp_conn::tcp_conn(int connfd, event_loop *loop)
{
_connfd = connfd;
_loop = loop;
//1. 将connfd设置成非阻塞状态
int flag = fcntl(_connfd, F_GETFL, 0);
fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
//2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟
int op = 1;
setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
//3. 将该链接的读事件让event_loop监控
_loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
//4 将该链接集成到对应的tcp_server中
//TODO
}
7.LarsDnsV0.3 Dns模块总结
//处理读业务
void tcp_conn::do_read()
{
//1. 从套接字读取数据
int ret = ibuf.read_data(_connfd);
if (ret == -1) {
fprintf(stderr, "read data from socket\n");
this->clean_conn();
return ;
}
else if ( ret == 0) {
//对端正常关闭
printf("connection closed by peer\n");
clean_conn();
return ;
}
//2. 解析msg_head数据
msg_head head;
//[这里用while,可能一次性读取多个完整包过来]
while (ibuf.length() >= MESSAGE_HEAD_LEN) {
//2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN
memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
this->clean_conn();
break;
}
if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
//缓存buf中剩余的数据,小于实际上应该接受的数据
//说明是一个不完整的包,应该抛弃
break;
}
//2.2 再根据头长度读取数据体,然后针对数据体处理 业务
//TODO 添加包路由模式
//头部处理完了,往后偏移MESSAGE_HEAD_LEN长度
ibuf.pop(MESSAGE_HEAD_LEN);
//处理ibuf.data()业务数据
printf("read data: %s\n", ibuf.data());
//回显业务
callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
//消息体处理完了,往后便宜msglen长度
ibuf.pop(head.msglen);
}
ibuf.adjust();
return ;
}