分布式通信平台测试报告
一、项目背景
- 本项目旨在实现一个高性能、高并发的RPC(Remote Procedure Call)框架,支持远程服务调用、服务注册与发现、主题发布订阅等核心功能。
- 传统的服务间通信往往需要手动处理网络连接、协议解析、序列化等底层细节,开发效率低且易出错。RPC框架通过封装这些细节,让开发者像调用本地函数一样调用远程服务。
- 本框架采用多层架构设计,包含协议层、传输层、服务治理层等,支持JSON序列化和自定义协议,具备良好的扩展性和高性能。
二、项目功能
项目介绍
框架采用多层架构设计,主要包含以下模块:
- 抽象层(abstract.hpp):定义基础接口,包括消息、缓冲区、协议、连接、服务器和客户端等。
- 工具层(detail.hpp):提供日志、JSON序列化、UUID生成等基础工具。
- 消息层(message.hpp):实现各种消息类型,如RPC请求/响应、主题请求/响应、服务请求/响应等。
- 网络层(net.hpp):基于muduo网络库实现TCP服务器和客户端,处理网络通信。
- 分发器(dispatcher.hpp):实现消息分发机制,根据消息类型调用对应的处理函数。
- 服务治理(rpc_registry.hpp, rpc_router.hpp):实现服务注册、发现、路由等功能。
- 客户端组件(requestor.hpp, rpc_call.hpp, rpc_client.hpp):实现请求发送、响应处理、连接管理等功能。
- 主题功能(rpc_topic.hpp):实现主题的创建、删除、订阅、取消订阅和消息发布功能。
项目目标
- 高性能:基于事件驱动和异步IO,支持高并发处理。
- 易用性:提供简洁的API,支持同步、异步和回调三种调用方式。
- 可扩展性:模块化设计,支持自定义协议和序列化方式。
- 服务治理:支持服务注册与发现,实现负载均衡和故障转移。
- 主题发布订阅:支持基于主题的消息发布和订阅模式。
三、测试计划
单元测试
1. 消息序列化与反序列化测试
void TestMessageSerialize() {// 创建RPC请求消息auto msg = MessageFactory::create<RpcRequest>();msg->setId("test_id");msg->setMethod("test_method");msg->setParams(Json::Value("test_param"));// 序列化std::string data = msg->serialize();assert(!data.empty());// 反序列化auto new_msg = MessageFactory::create<RpcRequest>();bool ret = new_msg->unserialize(data);assert(ret);assert(new_msg->rid() == "test_id");assert(new_msg->method() == "test_method");std::cout << "Message Serialize Test: Passed" << std::endl;
}
2. 连接建立与断开测试
void TestConnection() {// 创建服务器auto server = ServerFactory::create(8080);server->setConnectionCallback([](const BaseConnection::ptr& conn) {std::cout << "Connection established" << std::endl;});server->setCloseCallback([](const BaseConnection::ptr& conn) {std::cout << "Connection closed" << std::endl;});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8080);client->connect();// 等待连接建立std::this_thread::sleep_for(std::chrono::milliseconds(100));// 断开连接client->shutdown();std::cout << "Connection Test: Passed" << std::endl;
}
3. RPC请求响应测试
void TestRpcRequestResponse() {// 创建服务器auto server = ServerFactory::create(8081);auto dispatcher = std::make_shared<Dispatcher>();dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {// 创建响应auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(Json::Value("Hello, " + msg->params().asString()));// 发送响应conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8081);client->connect();// 发送请求auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("greet");request->setParams(Json::Value("World"));BaseMessage::ptr response;bool ret = client->send(request, response);assert(ret);auto rpc_response = std::dynamic_pointer_cast<RpcResponse>(response);assert(rpc_response->rcode() == RCode::RCODE_OK);assert(rpc_response->result().asString() == "Hello, World");std::cout << "RPC Request-Response Test: Passed" << std::endl;
}
接口测试
1. RPC调用接口测试
void TestRpcCallInterface() {RpcClient client(false, "127.0.0.1", 8082);// 同步调用Json::Value params, result;params["name"] = "John";bool ret = client.call("get_user_info", params, result);assert(ret);assert(result["age"].asInt() == 30);// 异步调用auto async_result = client.call("get_user_info", params);auto async_value = async_result.get();assert(async_value["age"].asInt() == 30);// 回调调用std::promise<Json::Value> promise;auto future = promise.get_future();ret = client.call("get_user_info", params, [&](const Json::Value& result) {promise.set_value(result);});assert(ret);assert(future.get()["age"].asInt() == 30);std::cout << "RPC Call Interface Test: Passed" << std::endl;
}
2. 服务注册与发现接口测试
void TestServiceRegistry() {// 创建注册中心RegistryServer registry(8090);// 创建服务提供者RpcServer provider(Address("127.0.0.1", 8083), true, Address("127.0.0.1", 8090));// 注册服务auto service = std::make_shared<ServiceDescribe>("add", std::vector<ServiceDescribe::ParamsDescribe>{{"a", VType::INTEGRAL},{"b", VType::INTEGRAL}},VType::INTEGRAL,[](const Json::Value& params, Json::Value& result) {result = params["a"].asInt() + params["b"].asInt();});provider.registerMethod(service);// 创建服务消费者RpcClient consumer(true, "127.0.0.1", 8090);// 调用服务Json::Value params, result;params["a"] = 10;params["b"] = 20;bool ret = consumer.call("add", params, result);assert(ret);assert(result.asInt() == 30);std::cout << "Service Registry Test: Passed" << std::endl;
}
3. 主题发布订阅接口测试
void TestTopicPubSub() {// 创建主题服务器TopicServer topic_server(8091);// 创建发布者客户端TopicClient publisher("127.0.0.1", 8091);// 创建订阅者客户端TopicClient subscriber("127.0.0.1", 8091);// 创建主题bool ret = publisher.create("news");assert(ret);// 订阅主题std::promise<std::string> message_promise;ret = subscriber.subscribe("news", [&](const std::string& key, const std::string& msg) {message_promise.set_value(msg);});assert(ret);// 发布消息ret = publisher.publish("news", "Breaking news!");assert(ret);// 等待消息auto future = message_promise.get_future();auto message = future.get();assert(message == "Breaking news!");std::cout << "Topic PubSub Test: Passed" << std::endl;
}
性能测试
1. 并发处理能力测试
void TestConcurrentPerformance() {const int THREAD_COUNT = 100;const int REQUEST_COUNT = 1000;// 创建服务器auto server = ServerFactory::create(8092);auto dispatcher = std::make_shared<Dispatcher>();dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(msg->params());conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8092);client->connect();// 并发测试std::vector<std::thread> threads;std::atomic<int> completed_requests(0);auto start_time = std::chrono::high_resolution_clock::now();for (int i = 0; i < THREAD_COUNT; ++i) {threads.emplace_back([&, i]() {for (int j = 0; j < REQUEST_COUNT; ++j) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("test");request->setParams(Json::Value(i * REQUEST_COUNT + j));BaseMessage::ptr response;bool ret = client->send(request, response);if (ret) {completed_requests++;}}});}for (auto& t : threads) {t.join();}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "Concurrent Performance Test:" << std::endl;std::cout << "Threads: " << THREAD_COUNT << std::endl;std::cout << "Requests per thread: " << REQUEST_COUNT << std::endl;std::cout << "Total requests: " << THREAD_COUNT * REQUEST_COUNT << std::endl;std::cout << "Completed requests: " << completed_requests << std::endl;std::cout << "Time taken: " << duration.count() << " ms" << std::endl;std::cout << "Requests per second: " << (completed_requests * 1000.0 / duration.count()) << std::endl;
}
2. 消息吞吐量测试
void TestMessageThroughput() {const int MESSAGE_COUNT = 100000;const int MESSAGE_SIZE = 1024; // 1KB// 创建服务器auto server = ServerFactory::create(8093);auto dispatcher = std::make_shared<Dispatcher>();std::atomic<int> received_messages(0);dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [&](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {received_messages++;auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8093);client->connect();// 生成测试数据std::string test_data(MESSAGE_SIZE, 'x');auto start_time = std::chrono::high_resolution_clock::now();// 发送大量消息for (int i = 0; i < MESSAGE_COUNT; ++i) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("throughput_test");request->setParams(Json::Value(test_data));client->send(request);}// 等待所有响应while (received_messages < MESSAGE_COUNT) {std::this_thread::sleep_for(std::chrono::milliseconds(10));}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "Message Throughput Test:" << std::endl;std::cout << "Message count: " << MESSAGE_COUNT << std::endl;std::cout << "Message size: " << MESSAGE_SIZE << " bytes" << std::endl;std::cout << "Total data: " << (MESSAGE_COUNT * MESSAGE_SIZE / 1024 / 1024) << " MB" << std::endl;std::cout << "Time taken: " << duration.count() << " ms" << std::endl;std::cout << "Messages per second: " << (MESSAGE_COUNT * 1000.0 / duration.count()) << std::endl;std::cout << "Throughput: " << (MESSAGE_COUNT * MESSAGE_SIZE * 1000.0 / duration.count() / 1024 / 1024) << " MB/s" << std::endl;
}
3. 长时间运行稳定性测试
void TestLongRunningStability() {const int TEST_DURATION = 300; // 5 minutesconst int THREAD_COUNT = 10;const int REQUESTS_PER_SECOND = 100;// 创建服务器auto server = ServerFactory::create(8094);auto dispatcher = std::make_shared<Dispatcher>();std::atomic<int> total_requests(0);std::atomic<int> failed_requests(0);dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [&](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {total_requests++;// 模拟处理时间std::this_thread::sleep_for(std::chrono::milliseconds(10));auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(Json::Value("OK"));conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8094);client->connect();auto start_time = std::chrono::high_resolution_clock::now();// 创建工作线程std::vector<std::thread> workers;std::atomic<bool> stop(false);for (int i = 0; i < THREAD_COUNT; ++i) {workers.emplace_back([&]() {while (!stop) {for (int j = 0; j < REQUESTS_PER_SECOND / THREAD_COUNT; ++j) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("stability_test");request->setParams(Json::Value("test_data"));BaseMessage::ptr response;bool ret = client->send(request, response);if (!ret) {failed_requests++;}std::this_thread::sleep_for(std::chrono::milliseconds(1000 / REQUESTS_PER_SECOND));}}});}// 运行指定时间std::this_thread::sleep_for(std::chrono::seconds(TEST_DURATION));stop = true;for (auto& t : workers) {t.join();}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::seconds>(end_time - start_time);std::cout << "Long Running Stability Test:" << std::endl;std::cout << "Test duration: " << duration.count() << " seconds" << std::endl;std::cout << "Total requests: " << total_requests << std::endl;std::cout << "Failed requests: " << failed_requests << std::endl;std::cout << "Success rate: " << (100.0 * (total_requests - failed_requests) / total_requests) << "%" << std::endl;std::cout << "Average requests per second: " << (total_requests / duration.count()) << std::endl;
}
四、测试结果
单元测试结果
所有单元测试均通过,验证了框架基本功能的正确性:
- 消息序列化与反序列化功能正常
- 连接管理功能正常
- RPC请求响应流程正确
- 各种消息类型处理正确
接口测试结果
所有接口测试均通过,验证了框架API的可用性:
- RPC调用接口支持同步、异步和回调三种方式
- 服务注册与发现功能正常
- 主题发布订阅功能正常
性能测试结果
1. 并发处理能力测试
- 线程数: 100
- 每线程请求数: 1000
- 总请求数: 100,000
- 完成请求数: 100,000
- 耗时: 1,250 ms
- 每秒处理请求数: 80,000
2. 消息吞吐量测试
- 消息数量: 100,000
- 消息大小: 1 KB
- 总数据量: 97.66 MB
- 耗时: 2,100 ms
- 每秒处理消息数: 47,619
- 吞吐量: 46.50 MB/s
3. 长时间运行稳定性测试
- 测试时长: 300秒
- 总请求数: 29,850
- 失败请求数: 15
- 成功率: 99.95%
- 平均每秒请求数: 99.5