基于C++、JsonCpp、Muduo库实现的分布式RPC通信框架

目录
- 项目介绍
- JsonCpp库简单介绍
- Muduo库简单介绍
- C++11异步操作——std::future
- 1. 使用 std::async 关联异步任务
- 2. std::packaged_task 配合 std::future
- 3. std::promise 配合 std::future
- 项目设计
- 理解项目功能
- 服务端模块划分
- Network:网络通信模块
- Protocol:应用层通信协议模块
- Dispatcher:消息分发处理模块
- RpcRouter:远端调用路由功能模块
- Publish-Subscribe:发布订阅功能模块
- Registry-Discovery:服务注册/发现/上线/下线功能模块
- Server:基于以上模块整合而出的服务端模块
- 客户端模块划分
- Requestor:请求管理模块
- RpcCaller:远端调用功能模块
- Publish-Subscribe:发布订阅功能模块
- Registry-Discovery:服务注册/发现/上线/下线功能模块
- Client:基于以上模块整合而出的客户端模块
- 框架设计
- 抽象层
- 具象层
- 业务层
- 请求字段宏定义
- 消息类型定义
- 响应码类型定义
- 抽象层实现
项目介绍
RPC(Remote Procedure Call,远程过程调用 )允许程序调用远程计算机上的服务或函数,而无需显式编写网络通信代码,就像调用本地函数一样方便地调用远程服务的函数。
本项目将基于C++、JsonCpp、muduo网络库实现一个简单、易用的RPC通信框架,它将实现同步调用、异步回调、异步futrue调用、服务注册/发现,服务上线/下线及发布订阅等功能。
我们将实现一个远程调用接口call,然后通过传入函数名参数来调用RPC接口。
JsonCpp库简单介绍
Json是一种数据交换格式,它使用完全独立于编程语言的文本格式来存储和表示数据。
例如表示张三同学的信息:
char *name = "张三";
int age = 18;
double score[3] = {88.8, 99.9, 66.6};
Json格式表示为:
{"姓名" : "张三", "年龄" : 18,"成绩" : [88.8, 99.9, 66.6],"爱好" : {"运行" : "打乒乓球","文学" : "红楼梦"}
}
Json 的数据类型包括对象,数组,字符串,数字。
- 对象:使用花括号 {} 括起来表示一个对象;
- 数组:使用中括号 [] 括起来表示一个数组;
- 字符串:使用常规双引号 “” 括起来表示一个字符串;
- 数字:包括整形和浮点型,直接使用。
Jsoncpp 库主要是用于实现Json 格式数据的序列化和反序列化,它实现了将多个数据对象组织成为Json格式字符串,以及将Json 格式字符串解析得到多个数据对象的功能。
Jsoncpp 库主要借助以下三个类以及其对应的少量成员函数完成序列化及反序列化。
Json::Value
类:中间数据存储类- 如果要将数据对象进行序列化,需要先存储到 Json::Value 对象中;
- 如果要将数据进行反序列化,需要将解析后的数据存到 Json::Value 对象中;
Json::StreamWriter
类:序列化类- Json::StreamWriter::write():序列化函数;
- Json::StreamWriterBuilder类:工厂类,用于生产Json::StreamWriter对象;
Json::CharReader
类:反序列化类- Json::CharReader::parse():反序列化函数;
- Json::CharReaderBuilder类:工厂类,用于生产Json::CharReader对象
Json数据对象类:
class Json::Value
{// Value重载了[]和=,因此所有的赋值和获取数据都可以通过// 简单的⽅式完成val["name"] = "xx";Value &operator=(const Value &other); Value& operator[](const std::string& key); Value& operator[](const char* key);Value removeMember(const char* key); //移除元素const Value& operator[](ArrayIndex index) const; //val["score"][0]Value& append(const Value& value); //添加数组元素val["score"].append(88); ArrayIndex size() const; //获取数组元素个数val["score"].size(); std::string asString() const; //转string string name = val["name"].asString();const char* asCString() const; //转char* char *name = val["name"].asCString();int asInt() const; //转int int age = val["age"].asInt(); float asFloat() const; //转float float weight = val["weight"].asFloat(); bool asBool() const; //转bool bool ok = val["ok"].asBool();
};
序列化接口:
class JSON_API StreamWriter
{virtual int write(Value const& root, std::ostream* sout) = 0;
}
class JSON_API StreamWriterBuilder : public StreamWriter::Factory
{virtual StreamWriter* newStreamWriter() const;
}
反序列化接口:
class JSON_API CharReader
{virtual bool parse(char const* beginDoc, char const* endDoc, Value* root, std::string* errs) = 0;
}
class JSON_API CharReaderBuilder : public CharReader::Factory
{virtual CharReader* newCharReader() const;
}
Json序列化实践测试:
#include <iostream>
#include <string>
#include <memory>
#include <sstream>
#include <jsoncpp/json/json.h>// 实现数据的序列化
bool Serialize(const Json::Value& val, std::string& body)
{std::stringstream ss;// 先实例化一个工厂类对象Json::StreamWriterBuilder swb;// 通过工厂类对象来生产派生类对象std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());// 开始序列化bool ret = sw->write(val, &ss);if (ret != 0){std::cout << "json serialize failed!" << std::endl;return false;}body = ss.str();return true;
}// 实现json字符串的序列化
bool Unserialize(std::string &body, Json::Value &val)
{// 实例化一个工厂类对象Json::CharReaderBuilder crb;// 生产派生类对象std::unique_ptr<Json::CharReader> cr(crb.newCharReader());std::string err;bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &err);if (!ret){std::cout << "json unserialize failed: " << err << std::endl;return false;}return true;
}int main()
{const char* name = "小明";int age = 18;const char* sex = "男";float score[3] = {88, 77.7f, 66};Json::Value student;student["姓名"] = name;student["年龄"] = age;student["性别"] = sex;student["成绩"].append(score[0]);student["成绩"].append(score[1]);student["成绩"].append(score[2]);Json::Value fav;fav["书籍"] = "红楼梦";fav["运动"] = "乒乓球";student["爱好"] = fav;std::string body;if (Serialize(student, body))std::cout << body << std::endl;std::string str = R"({"姓名" : "小黑", "年龄" : 19, "成绩" : [66, 77, 88]})";Json::Value stu;bool ret = Unserialize(str, stu);if (ret){std::cout << "姓名: " << stu["姓名"].asString() << std::endl;std::cout << "年龄: " << stu["年龄"].asInt() << std::endl;for (auto e : stu["成绩"]){std::cout << "成绩: " << e.asFloat() << std::endl;}}return 0;
}
Muduo库简单介绍
Muduo库是由陈硕大佬开发的一个基于非阻塞 IO 和事件驱动的C++高并发TCP网络编程库。
- 事件驱动机制:Muduo使用Reactor模式实现事件驱动机制,通过事件循环和回调函数机制来处理并发连接和请求;
- 多线程支持:Muduo采用多线程模型,并使用线程池来管理多个IO线程和工作线程,这种设计使得Muduo能够实现高并发处理,满足大型服务器应用程序的需求;
- 定时器功能:Muduo提供了定时器功能,用户可以轻松地设置定时任务和超时处理;
- 缓冲区管理:Muduo提供了高效的缓冲区管理机制,包括固定大小的缓冲区池以及自动增长的缓冲区,这有助于优化内存使用和提高数据处理效率;
- 日志系统:Muduo内置了日志系统,支持不同级别的日志记录,并采用异步写入方式以减少对性能的影响,这使得开发者能够方便地追踪和分析系统运行状态。
Muduo是基于主从Reactor模型的网络库,其使用的线程模型是one loop per thread
,它指的是一个线程只能有一个事件循环(EventLoop), 用于响应计时器和IO事件;一个文件描述符只能由一个线程进行读写,也就是一个TCP连接必须归属于某个EventLoop管理。
Muduo库常见接口:
TcpServer
{void start(); // 启动服务器void setConnectionCallback(); // 设置连接建立/关闭时的回调函数void setMessageCallback(); // 设置消息处理回调函数
}EventLoop
{void loop(); // 开始事件循环监控void quit(); // 停止循环Timerld runAfter(delay, cb); // 定时任务
}TcpConnection
{void send(std::string &msg); // 发送数据bool connected(); // 判断当前连接是否正常void shutdown(); // 关闭连接
};Buffer
{size_t readableBytes(); // 获取缓冲区中可读数据大小const char* peek(); // 获取缓冲区中数据的起始地址int32_t peekInt32(); // 尝试从缓冲区中获取4字节数据,// 进行网络字节序的转换,但不从缓冲区中删除 void retrieveInt32(); // 数据读取位置向后偏移4字节,本质就是删除前4字节数据int32_t readInt32(); // peekInt32() + retrieveInt32()string retrieveAllAsString(); // 从缓冲区中取出所有数据,作为字符串返回,并删除string retrieveAsString(size_t len); // 从缓冲区中取出len长度的数据,并删除
}/*
需要注意的是,因为muduo库不管是服务端还是客⼾端都是异步操作,
对于客⼾端来说如果我们在连接还没有完全建⽴成功的时候发送数据,这是不被允许的。
因此我们可以使⽤内置的CountDownLatch类进⾏同步控制
*/TcpClient
{void connect(); // 连接服务器void disconnect(); // 关闭连接TcpConnectionPtr connection(); // 获取客户端对应的TcpConnection连接// muduo库的客户端也是通过EventLoop进行IO事件监控处理的setConnectionCallback(); // 设置连接建立/关闭时的回调函数setMessageCallback(); // 设置消息处理回调函数
}// 做计数同步
CountDownLatch
{void wait(); // 计数大于0则阻塞void countDown(); // 计数--
}
Muduo库快速上手:简单英汉互译。
server:
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>class DictServer
{
public:DictServer(int port): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),"DictServer", muduo::net::TcpServer::kReusePort){// 设置连接事件的回调_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));// 设置连接消息的回调_server.setMessageCallback(std::bind(&DictServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void Start(){_server.start();// 先开始监听_baseloop.loop();// 开始死循环事件监控}~DictServer(){}
private:void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){std::cout << "连接建立!" << std::endl;}else {std::cout << "连接断开!" << std::endl;}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){static std::unordered_map<std::string, std::string> dict_map = {{"hello", "你好"},{"world", "世界"},{"apple", "苹果"},{"banana", "香蕉"}};std::string msg = buf->retrieveAllAsString();std::string res;auto iter = dict_map.find(msg);if (iter != dict_map.end()){res = iter->second;}else{res = "未知单词";}}
private:muduo::net::EventLoop _baseloop; // 事件循环监控muduo::net::TcpServer _server; // 通信连接管理
};int main()
{DictServer ds(8888);ds.Start();return 0;
}
client:
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <muduo/base/CountDownLatch.h>
#include <iostream>
#include <string>class DictClient
{
public:DictClient(const std::string &sip, int port): _baseloop(_loopthread.startLoop()), _downlatch(1) // 初始化计数器为1,为0时唤醒, _client(_baseloop, muduo::net::InetAddress(sip, port), "DictClient"){// 设置连接事件的回调_client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));// 设置连接消息的回调_client.setMessageCallback(std::bind(&DictClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 连接服务器_client.connect();_downlatch.wait();}bool Send(const std::string &msg){if (_conn->connected() == false){std::cout << "连接已断开, 发送数据失败!" << std::endl;return false;}_conn->send(msg);return true;}
private:void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){std::cout << "连接建立!" << std::endl;_downlatch.countDown();// --, 计数为0时唤醒阻塞_conn = conn;}else {std::cout << "连接断开!" << std::endl;_conn.reset();}}void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){std::string res = buf->retrieveAllAsString();std::cout << res << std::endl;}
private:muduo::net::TcpConnectionPtr _conn;muduo::CountDownLatch _downlatch;muduo::net::EventLoopThread _loopthread;muduo::net::EventLoop *_baseloop;muduo::net::TcpClient _client;
};int main()
{DictClient client("127.0.0.1", 9090);while (1){std::string s;std::cin >> s;client.Send(s);}return 0;
}
C++11异步操作——std::future
什么是异步操作? 区分异步和同步就看这个任务是否是当前进程或执行流自身完成的,还是别人帮我们完成的。
- 异步:别人帮我们完成的;
- 同步:我们自己完成的没有其他人参与。
如果这个任务是别人帮我们完成的,我们怎么得到结果?
C++11提供了 std::future ,用于管理和获取异步任务的结果,实现异步编程。
std::future
是一个模板类,它表示一个异步操作的结果,或者说可以用于同步保存一个异步任务的结果。在多线程中使用异步操作时,它可以帮我们在需要的时候获取任务的执行结果。std::future
的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。
应⽤场景
- 异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future可以用来表示这些异步任务的结果,通过将任务与主线程分离,我们可以实现任务的并行处理,从而提高程序的执行效率;
- 并发控制:在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其他操作。通过使用std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续操作;
- 结果获取:使用
std::future::get()
函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样做可以确保在调用get()
时一定能获取到所需的结果。
std::future
不是一个异步任务,而是一个帮助我们获取异步任务结果的东西。
std::future
并不能单独使用,需要搭配一些能够执行异步任务的模版类或函数一起使用。
1. 使用 std::async 关联异步任务
std::async
函数模版:创建线程异步执行一个任务,返回一个future对象用于获取函数结果。std::async是否启动一个新线程,或者在等待future时,任务是否同步运行都取决于我们给的参数,这个参数为 std::launch
类型:
launch参数 | 描述 |
---|---|
deferred | 同步策略,获取结果的时候再去执行任务 |
async | 异步策略,内部创建一个线程执行任务,结果通过future获取 |
deferred | async | 内部通过系统等条件自动选择策略 |
| 使用方法:
2. std::packaged_task 配合 std::future
std::packaged_task
类模版:将任务和 std::feature
绑定在一起形成一个任务包,是对一个函数进行二次封装成为一个可调用对象作为任务放到其他线程中执行。任务包封装好了后,可以在任意位置调用,通过关联的 future 获取执行结果。
可以把
std::future
和std::async
看成是分开的,而std::packaged_task
则是一个整体。
std::packaged_task
内部包含了对共享状态(用于存储任务执行结果或异常 )以及被包装任务的管理,因此禁用了拷贝构造和拷贝赋值,只允许移动构造和移动赋值(std::move()
),或使用智能指针等方式。
| 使用方法:
3. std::promise 配合 std::future
std::promise
类模版:实例化的对象可以返回一个future,在其他线程中向promise对象设置数据,其他线程的关联future就可以获取数据。换种说法就是之前 std::future 可以读取一个异步函数的返回值,但是要等待就绪,而 std::promise 提供一种方式手动让 std::future 就绪。
| 使用方法:
项目设计
理解项目功能
简单来说我们要实现的 rpc 思想就是:客户端想要完成某个任务的处理,但是这个处理的过程并不由客户端完成,而是将请求发送到服务器上,让服务器来帮其完成处理过程,并返回结果,客户端拿到结果后返回。
如果单服务器负载过高,一旦服务端宕机则所有客户端不可用。因此我们要实现分布式架构的rpc,其核心组件是增加一个注册中心:
-
服务提供者(节点)注册自身能力(如IP+服务列表)
-
客户端调用前通过注册中心发现可用服务节点
而其次的发布订阅功能,则是依托于多个客户端围绕服务端进行消息的转发。不过单纯的消息转发功能并不能满足大部分场景的需要,因此会在其基础上实现基于主题订阅的转发。
并且我们可以让每一个Server作为备用注册中心形成分布式架构,一旦某个注册中心下线,则可以向备用中心进行注册以及请求,且在此基础上客户端在请求Rpc服务的时候,因为可以有多个rpc-provider可选,因此可以实现简单的负载均衡策略,且基于注册中心可以更简便实现发布订阅的功能。
总结项目的三个主要功能:
- rpc调用;
- 服务的注册与发现以及服务的下线/上线通知;
- 消息的发布订阅。
服务端模块划分
首先我们要清楚服务端的功能需求:基于网络通信接收客户端的请求,提供rpc服务、服务注册与发现,上线&下线通知、提供主题操作(创建/删除/订阅/取消),消息发布等。
Network:网络通信模块
因为网络通信模块也是一个复杂庞大的模块,而本项目主要是实现rpc功能,所以该模块我们将使用muduo库来进行搭建,muduo库的优势:
-
高性能:基于Reactor模式,适合高并发网络通信(如10万+连接)
-
异步IO:避免阻塞业务线程,提升吞吐量
-
成熟稳定:经过大规模项目验证(如知乎早期架构)
Protocol:应用层通信协议模块
该模块主要是解决通信中有可能存在的粘包问题,保证能够获取到一条完整的消息。
解决粘包主要有三种方式:特殊字符间隔、定长、LV格式。本项目中使用LV格式来定义应用层的通信协议格式,其优势有:
-
简单高效:Length-Value格式直接标定数据边界
-
兼容性强:适用于二进制/文本协议(如4字节长度头 + JSON体)
- Length:固定4字节,表示后续消息数据长度;
- MType:固定4字节,表示消息类型;
- IDLength:固定4字节,描述后续ID字段实际长度;
- MID:唯一标识消息,ID字段长度不固定;
- Body:请求或响应的实际内容字段。
Dispatcher:消息分发处理模块
该模块用于区分消息类型,根据不同的类型,调用不同的业务处理函数进行消息处理。
比如前面当muduo库底层通信收到数据后,在onMessage回调函数中对数据进行应用层协议解析,得到一条实际消息后,我们就要判断这条消息代表该客户端的什么请求,以及应该如何处理。
因此,我们需要设计Dispatcher模块作为一个分发模块,这个模块内部会保存有一个hashmap<消息类型,回调函数>,并由使用者来决定哪条消息用哪个业务函数进行处理,当收到消息后,在该模块找到其对应的处理回调函数进行调用即可。
class Dispatcher
{using Handler = std::function<void(const Message&)>;std::unordered_map<MessageType, Handler> handlers;
public:// 注册处理函数void registerHandler(MessageType type, Handler handler) {handlers[type] = handler;}// 分发消息void dispatch(const Message& msg){auto it = handlers.find(msg.type());if (it != handlers.end()) {it->second(msg); // 执行回调} else {// 默认处理}}
};
RpcRouter:远端调用路由功能模块
该模块提供rpc请求的处理回调函数,以及内部所要实现的功能,分辨出客户端请求的服务进行处理得到结果进行响应。
rpc请求中最关键的两个点:请求方法名称和请求对应要处理的参数信息。
在Rpc远端调用过程中,首先将客户端到服务端的通信链路打通,然后将自己所需要调用的服务名称以及参数信息传递给服务端,由服务端进行接收处理,并返回结果。
而不管是客户端要传递给服务端,还是服务端返回的结果,都是在上边Protocol中定义的Body字段中,因此Body字段中就存在了另一层的正文序列化 / 反序列化过程。
序列化方式有很多种,这里我们使用json序列化来进行,定义格式如下:
//RPC-request
{"method" : "Add","parameters" : {"num1" : 11,"num2" : 22}
}
//RPC-response
{"rcode" : OK,"result": 33
}
{"rcode" : ERROR_INVALID_PARAMETERS
}
需要注意的是,在服务端,当接收到这么一条消息后,Dispatcher模块会找到该Rpc请求类型的回调处理函数进行业务处理,但是在进行业务处理的时候,也是只会将param参数字段传入回调函数中进行处理。
而对服务端来说,应该从传入的Json::Value对象中,有什么样的参数,以及参数信息是否符合自己所提供的服务的要求,都应该有一个检测,是否符合要求,符合要求了再取出指定字段的数据进行处理。
这就像一家快递代收点(服务端),客户(客户端)给你寄包裹(RPC请求)。包裹里有一张纸条写明:
{"method": "代收快递","parameters": {"快递单号": "YT123456","收件人姓名": "张三"}
}
服务端需要做三件事:1、检查包裹是否合格(参数校验)2、找到对应的处理人员(路由分发)3、只给专员必要的信息(安全隔离)。
因此,对服务端来说,在进行服务注册的时候,必须有一个服务描述,有了这个描述,在回调函数中就可以先对传入的参数进行校验,没问题了再取出指定字段数据进行处理并返回结果。因此在实现该模块时,应该有以下设计:
- 该模块必须具备一个rpc路由管理,其中包含对于每个服务的参数校验功能;
- 该模块必须具备一个方法名称和方法业务回调的映射;
- 该模块必须向外提供rpc请求的业务处理函数。
在Rpc请求中,可能会有大量不同的Rpc请求,作为服务端,首先需要对自己所能提供的服务进行管理,以便收到请求后,能够明确判断自己能否提供客户端所请求的服务。
| 核心数据结构:
struct ServiceDescriptor
{string method_name;vector<ParamDescriptor> params; // {name, type, required}ReturnType return_type;
};using Handler = function<Json::Value(const Json::Value&)>;
unordered_map<string, pair<ServiceDescriptor, Handler>> service_map;
Publish-Subscribe:发布订阅功能模块
该模块针对发布订阅请求进行处理,提供一个回调函数设置给Dispatcher模块。
发布订阅包含的请求操作有:主题的创建、删除、订阅、取消订阅,以及消息的发布。
本项目是围绕多个客户端和一个服务端来展开的。即任意一个客户端在发布或订阅之前先创建一个主题,比如音乐新闻主题,哪些客户端想收到音乐新闻相关的消息,就订阅这个主题,服务端会建立起该主题与客户端之间的联系。
当某个客户端向服务端发布消息,且发布消息的目标主题是音乐新闻主题,服务端就会找到所有订阅了该主题的客户端,将消息推送出去。
这个过程涉及到网络通信,通信消息的正文可以是:
//Topic-request
{"key" : "music", //主题名称 // 主题操作类型 "optype":TOPIC_CRAETE/TOPIC_REMOVE/TOPIC_SUBSCRIBE/TOPIC_CANCEL/TOPIC_PUBLISH,//只有TOPIC_PUBLISH请求才会包含有message字段 "message" : "Hello World"
}
//Topic-response
{"rcode" : OK,
}
{"rcode" : ERROR_INVALID_PARAMETERS,
}
-
该模块必须具备一个主题管理,且主题中需要保存订阅了该主题的客户端连接(主题收到一条消息,需要将这条消息推送给订阅了该主题的所有客户端);
-
该模块必须具备一个订阅者管理,且每个订阅者描述中都必须保存自己所订阅的主题名称(当一个订阅客户端断开连接时,需要找到订阅信息的关联关系,进行删除);
-
该模块必须向外提供主题创建、销毁、订阅、取消订阅、消息发布等业务处理函数。
Registry-Discovery:服务注册/发现/上线/下线功能模块
该模块针对服务注册与发现请求的处理。
- 服务注册:服务provider告诉中转中心自己能提供哪些服务;
- 服务发现:服务caller询问中转中心,谁能提供指定服务;
- 服务上线:一个provider上线了指定服务后,会通知发现过该服务的客户端有个provider可以提供该服务;
- 服务下线:在一个provider断开连接后,会通知发现过该服务的caller,谁下线了哪个服务。
服务注册模块,该模块主要是为了实现分布式架构而存在,让每一个rpc客户端能够从不同的节点主机上获取自己所需的服务,让业务更具扩展性,系统更具健壮性。
而为了能够让rpc-caller知道有哪些rpc-provider能提供自己所需服务,那么就需要有一个注册中心让这些rpc-provider去注册登记自己的服务,让rpc-caller来发现这些服务。
因此,在我们的服务端功能中,还需实现服务的注册/发现,以及服务的上线/下线功能。
//RD--request
{//SERVICE_REGISTRY-Rpc-provider进⾏服务注册 //SERVICE_DISCOVERY - Rpc-caller进⾏服务发现//SERVICE_ONLINE/SERVICE_OFFLINE 在provider下线后对caller进⾏服务上下线通知 "optype" :SERVICE_REGISTRY/SERVICE_DISCOVERY/SERVICE_ONLINE/SERVICE_OFFLINE,"method" : "Add",//服务注册/上线/下线有host字段,发现则⽆host字段 "host" : {"ip" : "127.0.0.1","port" : 9090}
}
//Registry/Online/Offline-response
{"rcode" : OK,
}
//error-response
{"rcode" : ERROR_INVALID_PARAMETERS,
}
//Discovery-response
{"method" : "Add","host" : [{"ip" : "127.0.0.1","port" : 9090},{"ip" : "127.0.0.2","port" : 8080}]
}
该模块的设计如下:
- 必须具备一个服务发现者的管理:
- 方法与发现者:当一个客户端进行服务发现的时候,进行记录谁发现过该服务,当有一个新的提供者上线的时候,可以通知该发现者;
- 连接与发现者:当一个发现者断开连接了,删除关联关系,往后就不需要通知了。
- 必须具备一个服务提供者的管理:
- 连接与提供者:当一个提供者断开连接的时候,能够通知该提供者提供的服务对应的发现者,该主机的该服务下线了;
- 方法与提供者:能够知道谁的哪些方法下线了,然后通知发现过该方法的客户端。
- 必须向Dispatcher模块提供一个服务注册/发现的业务处理回调函数。
Server:基于以上模块整合而出的服务端模块
客户端模块划分
客户端模块划分和服务端基本一致,其中 Protocol 模块、Network 模块、Dispatcher 模块和服务端是一样的。
Requestor:请求管理模块
该模块是为了解决异步网络通信中的请求–响应匹配问题,确保多线程下能正确关联请求和响应。
而在多线程的网络通信中,多个请求进行响应可能会存在时序的问题,我们无法保证一个线程发送一个请求后,接下来接收到的响应就是针对自己这条请求的响应。
且Muduo库这种异步IO网络通信库,通常IO操作都是异步操作,即发送数据就是把数据放入发送缓冲区,但是什么时候会发送由底层的网络库来进行协调,并且也并不会提供recv接口,而是在连接触发可读事件后,IO读取数据完成后调用处理回调进行数据处理,因此也无法直接在发送请求后去等待该条请求的响应。
针对以上问题,我们创建出当前的请求管理模块来解决,给每一个请求都设定一个请求ID,服务端进行响应的时候标识响应针对的是哪个请求(也就是响应信息中会包含请求ID),因此客户端这边我们不管收到哪条请求的响应,都将数据存储到hash_map中,以请求ID作为映射,并向外提供获取指定请求ID响应的阻塞接口,这样只要在发送请求的时候知道自己的请求ID,那么就能获取到自己想要的响应,而不会出现异常。
问题场景 | 解决方案 |
---|---|
多线程请求乱序 | 通过请求ID精确匹配 |
异步IO无法及时等待响应 | Future/Callback机制 |
响应超时 | 定时清理未完成的Promise |
并且还可以将每个请求进一步封装描述,添加入异步的future控制,或者设置回调函数的方式,不仅可以阻塞获取响应,也可以实现异步获取响应以及回调处理响应。
-
同步阻塞:线程等待指定ID的响应到达
-
异步回调:注册回调函数自动触发
RpcCaller:远端调用功能模块
该模块向用户提供进行rpc调用的模块。
Rpc服务调⽤模块,这个模块相对简单,只需要向外提供⼏个rpc调⽤的接⼝,内部实现向服务端发送
请求,等待获取结果即可,稍微⿇烦⼀些的是Rpc调⽤我们需要提供多种不同⽅式的调⽤:
- 同步调⽤:发起调⽤后,等收到响应结果后返回
- 异步调⽤:发起调⽤后⽴即返回,在想获取结果的时候进⾏获取
- 回调调⽤:发起调⽤的同时设置结果的处理回调,收到响应后⾃动对结果进⾏回调处理
Publish-Subscribe:发布订阅功能模块
该模块向用户提供发布订阅所需的接口,针对推送过来的消息进行处理。
客户端可能是消息的发布者,也可能是消息的订阅者。一个客户端可能会订阅多个主题,每订阅一个主题都会设置一个回调函数,用来处理该主题推送过来的消息。因此该模块应将不同主题的处理回调用 hashmap 管理起来。
Registry-Discovery:服务注册/发现/上线/下线功能模块
服务注册和发现模块需要实现的功能会稍微复杂⼀些,因为分为两个⻆⾊来完成其功能
- 注册者:作为Rpc服务的提供者,需要向注册中⼼注册服务,因此需要实现向服务器注册服务的功
能 - 发现者:作为Rpc服务的调⽤者,需要先进⾏服务发现,也就是向服务器发送请求获取能够提供指
定服务的主机地址,获取地址后需要管理起来留⽤,且作为发现者,需要关注注册中⼼发送过来的
服务上线/下线消息,以及时对已经下线的服务和主机进⾏管理。
Client:基于以上模块整合而出的客户端模块
框架设计
本项目可大致划分为以下三层:
- 抽象层:将底层网络通信、应用层通信协议、请求响应进行抽象,使项目更具扩展性和灵活性;
- 具象层:针对抽象的功能进行具体实现;
- 业务层:基于抽象的框架在上层实现项目所需功能。
抽象层
在本项目中,网络通信部分使用第三方库Muduo库,LV格式的通信协议解决粘包问题,以及Json格式进行序列化和反序列化,而这几方面后续都可能会持续优化,比如用替换JSON为Protobuf,迁移到gRPC框架等。
因此我们可以在设计项目框架的时候,对于底层通信部分相关功能进行抽象,上层业务部分根据抽象层来完成功能,这样做的好处是在具体的底层功能实现部分,我们可以实现插拔式的模块化替换,以此来提高项目的灵活性和可扩展性。
优化场景 | 无抽象层 | 有抽象层 |
---|---|---|
替换JSON为Protobuf | 需修改所有业务代码 | 仅替换序列化模块实现 |
迁移到gRPC框架 | 重构网络通信逻辑 | 实现新抽象层适配器即可 |
增加压缩功能 | 侵入式修改协议处理流程 | 在抽象层插入压缩/解压中间件 |
具象层
针对抽象层的具体实现,从抽象类中派生出具体功能的派生类,然后在内部实现各个接口的功能。
• 基于Muduo库实现⽹络通信部分抽象
• 基于LV通信协议实现Protocol部分抽象
在这一层中,我们需要针对不同的请求,从 BaseMessage 中派生出不同的请求和响应类型,以便在针对指定消息的处理时,能够更加轻松的获取或设置请求及响应中的各项数据元素。
业务层
基于底层通信框架,针对项目中具体的业务功能进行实现,比如rpc请求的处理,发布订阅请求的处理以及服务注册与发现的处理等。
<iomanip> 中的输出流操作符 | 说明 |
---|---|
setw(2) | 设置输出宽度为 2 个字符,如果不足会按设置字符填充 |
setfill(‘0’) | 指定填充字符为 ‘0’ ,配合 setw 使用 |
hex | 将后续输出的整数转换为十六进制格式显示 |
请求字段宏定义
意义:便于后期维护。
#define KEY_METHOD "method" // 方法名称
#define KEY_PARAMS "parameters" // 方法参数
#define KEY_TOPIC_KEY "topic_key" // 主题名称
#define KEY_TOPIC_MSG "topic_msg" // 主题消息
#define KEY_OPTYPE "optype" // 操作类型
#define KEY_HOST "host" // 主机信息
#define KEY_HOST_IP "ip" // IP地址
#define KEY_HOST_PORT "port" // 端口号
#define KEY_RCODE "rcode" // 响应码
#define KEY_RESULT "result" // 调用结果
消息类型定义
enum class MType
{REQ_RPC = 0, // rpc请求RSP_RPC, // rpc响应REQ_TOPIC, // 主题操作请求RSP_TOPIC, // 主题操作响应REQ_SERVICE, // 服务操作请求RSP_SERVICE // 服务操作响应
};
响应码类型定义
enum class RCode
{RCODE_OK = 0, // 成功处理RCODE_PARSE_FAILED, // 消息解析失败RCODE_INVALID_MSG, // 无效信息RCODE_DISCONNECTED, // 连接断开RCODE_INVALID_PARAMS, // 无效的rpc参数RCODE_NOT_FOUND_SERVICE, // 没有找到该服务RCODE_INVALID_OPTYPE, // 无效的操作类型RCODE_NOT_FOUND_TOPIC // 没有找到该主题
};
rpc请求类型定义
- 同步请求:等待收到响应后返回;
- 异步请求:返回异步对象,在需要的时候通过异步对象获取响应结果,还未收到结果会阻塞;
- 回调请求:设置回调函数,通过回调函数对响应进行处理。
enum class RType
{REQ_SYNC = 0, // 同步请求REQ_ASYNC, // 异步请求REQ_CALLBACK // 回调请求
};
主题操作类型定义
enum class TopicOpType
{TOPIC_CREATE = 0, // 主题创建TOPIC_REMOVE, // 主题删除TOPIC_SUBSCRIBE, // 主题订阅TOPIC_CANCEL, // 主题取消订阅TOPIC_PUBLISH // 主题消息发布
};
服务操作类型定义
enum class ServiceOpTyep
{SERVICE_REGISTRY = 0, // 服务注册SERVICE_DISCOVERY, // 服务发现SERVICE_ONLINE, // 服务上线SERVICE_OFFLINE // 服务下线
};
抽象层实现
BaseMessage
class BaseMessage
{
public:using ptr = std::shared_ptr<BaseMessage>;virtual ~BaseMessage(){}virtual void setId(const std::string& id) { _id = id; }virtual std::string getId() { return _id; }virtual void setMType(MType mtype) { _mtype = mtype; }virtual MType getMType() { return _mtype; }virtual std::string serialize() = 0;virtual bool unserialize(const std::string& msg) = 0;virtual bool check() = 0;
private:std::string _id;MType _mtype;
};
BaseBuffer
class BaseBuffer
{
public:using ptr = std::shared_ptr<BaseBuffer>;virtual size_t readableSize() = 0;virtual int32_t peekInt32() = 0; // 尝试取出4字节不删除virtual void retrieveInt32() = 0; // 删除缓冲区中前4字节virtual void readInt32() = 0; // 取出4字节并删除virtual std::string retrieveAsString(size_t len) = 0; // 取出指定长度数据
};
BaseProtocol
class BaseProtocol
{
public:using ptr = std::shared_ptr<BaseProtocol>;virtual bool canProcessed(const BaseBuffer::ptr& buf) = 0;virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) = 0;virtual std::string serialize(const BaseMessage::ptr& msg) = 0;
};
BaseConnection
class BaseConnection
{
public:using ptr = std::shared_ptr<BaseConnection>;virtual void send(const BaseMessage::ptr& msg) = 0;virtual void shutdown() = 0;virtual bool connected() = 0;
};
BaseServer
using ConnectionCallBack = std::function<void(const BaseConnection::ptr&)>;
using CloseCallBack = std::function<void(const BaseConnection::ptr&)>;
using MessageCallBack = std::function<void(const BaseConnection::ptr&, BaseBuffer::ptr&)>;class BaseServer
{
public:using ptr = std::shared_ptr<BaseServer>;virtual void setConnectionCallBack(const ConnectionCallBack& connection){_connection = connection;}virtual void setCloseCallBack(const CloseCallBack& close){_close = close;}virtual void setMessageCallBack(const MessageCallBack& message){_message = message;}virtual void start() = 0;
private:ConnectionCallBack _connection;CloseCallBack _close;MessageCallBack _message;
};
BaseClient
class BaseClient
{
public:using ptr = std::shared_ptr<BaseClient>;virtual void setConnectionCallBack(const ConnectionCallBack& connection){_connection = connection;}virtual void setCloseCallBack(const CloseCallBack& close){_close = close;}virtual void setMessageCallBack(const MessageCallBack& message){_message = message;}virtual void connect() = 0;virtual void shutdown() = 0;virtual void send(const BaseMessage::ptr& msg) = 0;virtual BaseConnection::ptr getConnect() = 0;virtual bool connected() = 0;
private:ConnectionCallBack _connection;CloseCallBack _close;MessageCallBack _message;
};
1、为什么要服务注册,服务注册是要做什么?
服务注册主要是实现分布式的系统,增强系统的健壮性。
一个节点主机,将自己所能提供的服务,在注册中心进行注册登记。
2、为什么要服务发现,服务发现是要做什么?
Rpc调用者需要知道那个节点主机能提供自己需要的服务。
服务发现就是Rpc调用者询问注册中心,谁能为自己提供指定的服务,将节点信息保存起来以待后用。
3、服务下线
当前使用长连接进行服务主机是否在线判断。一旦服务提供方断开连接,就要查询这个主机提供了哪些服务,接着查询哪些调用者对这些服务进行过服务发现,然后对这些调用者进行服务下线通知。
4、服务上线
因为Rpc调用者只会进行一次服务发现,那如果后续又有新的服务上线,这些调用者是不知道的。因此一旦某个服务上线,就需要对发现过这个服务的调用者进行服务上线通知,进而实现负载均衡。
本模块要做的就是:
- 将服务注册、发现功能集合到客户端中;
- 将服务信息管理集合到服务端中。
本篇文章的分享就到这里了,如果您觉得在本文有所收获,还请留下您的三连支持哦~
