当前位置: 首页 > news >正文

分布式通信平台测试报告

一、项目背景

  1. 本项目旨在实现一个高性能、高并发的RPC(Remote Procedure Call)框架,支持远程服务调用、服务注册与发现、主题发布订阅等核心功能。
  2. 传统的服务间通信往往需要手动处理网络连接、协议解析、序列化等底层细节,开发效率低且易出错。RPC框架通过封装这些细节,让开发者像调用本地函数一样调用远程服务。
  3. 本框架采用多层架构设计,包含协议层、传输层、服务治理层等,支持JSON序列化和自定义协议,具备良好的扩展性和高性能。

二、项目功能

项目介绍

框架采用多层架构设计,主要包含以下模块:

  1. 抽象层(abstract.hpp):定义基础接口,包括消息、缓冲区、协议、连接、服务器和客户端等。
  2. 工具层(detail.hpp):提供日志、JSON序列化、UUID生成等基础工具。
  3. 消息层(message.hpp):实现各种消息类型,如RPC请求/响应、主题请求/响应、服务请求/响应等。
  4. 网络层(net.hpp):基于muduo网络库实现TCP服务器和客户端,处理网络通信。
  5. 分发器(dispatcher.hpp):实现消息分发机制,根据消息类型调用对应的处理函数。
  6. 服务治理(rpc_registry.hpp, rpc_router.hpp):实现服务注册、发现、路由等功能。
  7. 客户端组件(requestor.hpp, rpc_call.hpp, rpc_client.hpp):实现请求发送、响应处理、连接管理等功能。
  8. 主题功能(rpc_topic.hpp):实现主题的创建、删除、订阅、取消订阅和消息发布功能。

项目目标

  1. 高性能:基于事件驱动和异步IO,支持高并发处理。
  2. 易用性:提供简洁的API,支持同步、异步和回调三种调用方式。
  3. 可扩展性:模块化设计,支持自定义协议和序列化方式。
  4. 服务治理:支持服务注册与发现,实现负载均衡和故障转移。
  5. 主题发布订阅:支持基于主题的消息发布和订阅模式。

三、测试计划

单元测试

1. 消息序列化与反序列化测试
void TestMessageSerialize() {// 创建RPC请求消息auto msg = MessageFactory::create<RpcRequest>();msg->setId("test_id");msg->setMethod("test_method");msg->setParams(Json::Value("test_param"));// 序列化std::string data = msg->serialize();assert(!data.empty());// 反序列化auto new_msg = MessageFactory::create<RpcRequest>();bool ret = new_msg->unserialize(data);assert(ret);assert(new_msg->rid() == "test_id");assert(new_msg->method() == "test_method");std::cout << "Message Serialize Test: Passed" << std::endl;
}
2. 连接建立与断开测试
void TestConnection() {// 创建服务器auto server = ServerFactory::create(8080);server->setConnectionCallback([](const BaseConnection::ptr& conn) {std::cout << "Connection established" << std::endl;});server->setCloseCallback([](const BaseConnection::ptr& conn) {std::cout << "Connection closed" << std::endl;});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8080);client->connect();// 等待连接建立std::this_thread::sleep_for(std::chrono::milliseconds(100));// 断开连接client->shutdown();std::cout << "Connection Test: Passed" << std::endl;
}
3. RPC请求响应测试
void TestRpcRequestResponse() {// 创建服务器auto server = ServerFactory::create(8081);auto dispatcher = std::make_shared<Dispatcher>();dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {// 创建响应auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(Json::Value("Hello, " + msg->params().asString()));// 发送响应conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8081);client->connect();// 发送请求auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("greet");request->setParams(Json::Value("World"));BaseMessage::ptr response;bool ret = client->send(request, response);assert(ret);auto rpc_response = std::dynamic_pointer_cast<RpcResponse>(response);assert(rpc_response->rcode() == RCode::RCODE_OK);assert(rpc_response->result().asString() == "Hello, World");std::cout << "RPC Request-Response Test: Passed" << std::endl;
}

接口测试

1. RPC调用接口测试
void TestRpcCallInterface() {RpcClient client(false, "127.0.0.1", 8082);// 同步调用Json::Value params, result;params["name"] = "John";bool ret = client.call("get_user_info", params, result);assert(ret);assert(result["age"].asInt() == 30);// 异步调用auto async_result = client.call("get_user_info", params);auto async_value = async_result.get();assert(async_value["age"].asInt() == 30);// 回调调用std::promise<Json::Value> promise;auto future = promise.get_future();ret = client.call("get_user_info", params, [&](const Json::Value& result) {promise.set_value(result);});assert(ret);assert(future.get()["age"].asInt() == 30);std::cout << "RPC Call Interface Test: Passed" << std::endl;
}
2. 服务注册与发现接口测试
void TestServiceRegistry() {// 创建注册中心RegistryServer registry(8090);// 创建服务提供者RpcServer provider(Address("127.0.0.1", 8083), true, Address("127.0.0.1", 8090));// 注册服务auto service = std::make_shared<ServiceDescribe>("add", std::vector<ServiceDescribe::ParamsDescribe>{{"a", VType::INTEGRAL},{"b", VType::INTEGRAL}},VType::INTEGRAL,[](const Json::Value& params, Json::Value& result) {result = params["a"].asInt() + params["b"].asInt();});provider.registerMethod(service);// 创建服务消费者RpcClient consumer(true, "127.0.0.1", 8090);// 调用服务Json::Value params, result;params["a"] = 10;params["b"] = 20;bool ret = consumer.call("add", params, result);assert(ret);assert(result.asInt() == 30);std::cout << "Service Registry Test: Passed" << std::endl;
}
3. 主题发布订阅接口测试
void TestTopicPubSub() {// 创建主题服务器TopicServer topic_server(8091);// 创建发布者客户端TopicClient publisher("127.0.0.1", 8091);// 创建订阅者客户端TopicClient subscriber("127.0.0.1", 8091);// 创建主题bool ret = publisher.create("news");assert(ret);// 订阅主题std::promise<std::string> message_promise;ret = subscriber.subscribe("news", [&](const std::string& key, const std::string& msg) {message_promise.set_value(msg);});assert(ret);// 发布消息ret = publisher.publish("news", "Breaking news!");assert(ret);// 等待消息auto future = message_promise.get_future();auto message = future.get();assert(message == "Breaking news!");std::cout << "Topic PubSub Test: Passed" << std::endl;
}

性能测试

1. 并发处理能力测试
void TestConcurrentPerformance() {const int THREAD_COUNT = 100;const int REQUEST_COUNT = 1000;// 创建服务器auto server = ServerFactory::create(8092);auto dispatcher = std::make_shared<Dispatcher>();dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(msg->params());conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8092);client->connect();// 并发测试std::vector<std::thread> threads;std::atomic<int> completed_requests(0);auto start_time = std::chrono::high_resolution_clock::now();for (int i = 0; i < THREAD_COUNT; ++i) {threads.emplace_back([&, i]() {for (int j = 0; j < REQUEST_COUNT; ++j) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("test");request->setParams(Json::Value(i * REQUEST_COUNT + j));BaseMessage::ptr response;bool ret = client->send(request, response);if (ret) {completed_requests++;}}});}for (auto& t : threads) {t.join();}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "Concurrent Performance Test:" << std::endl;std::cout << "Threads: " << THREAD_COUNT << std::endl;std::cout << "Requests per thread: " << REQUEST_COUNT << std::endl;std::cout << "Total requests: " << THREAD_COUNT * REQUEST_COUNT << std::endl;std::cout << "Completed requests: " << completed_requests << std::endl;std::cout << "Time taken: " << duration.count() << " ms" << std::endl;std::cout << "Requests per second: " << (completed_requests * 1000.0 / duration.count()) << std::endl;
}
2. 消息吞吐量测试
void TestMessageThroughput() {const int MESSAGE_COUNT = 100000;const int MESSAGE_SIZE = 1024; // 1KB// 创建服务器auto server = ServerFactory::create(8093);auto dispatcher = std::make_shared<Dispatcher>();std::atomic<int> received_messages(0);dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [&](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {received_messages++;auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8093);client->connect();// 生成测试数据std::string test_data(MESSAGE_SIZE, 'x');auto start_time = std::chrono::high_resolution_clock::now();// 发送大量消息for (int i = 0; i < MESSAGE_COUNT; ++i) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("throughput_test");request->setParams(Json::Value(test_data));client->send(request);}// 等待所有响应while (received_messages < MESSAGE_COUNT) {std::this_thread::sleep_for(std::chrono::milliseconds(10));}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "Message Throughput Test:" << std::endl;std::cout << "Message count: " << MESSAGE_COUNT << std::endl;std::cout << "Message size: " << MESSAGE_SIZE << " bytes" << std::endl;std::cout << "Total data: " << (MESSAGE_COUNT * MESSAGE_SIZE / 1024 / 1024) << " MB" << std::endl;std::cout << "Time taken: " << duration.count() << " ms" << std::endl;std::cout << "Messages per second: " << (MESSAGE_COUNT * 1000.0 / duration.count()) << std::endl;std::cout << "Throughput: " << (MESSAGE_COUNT * MESSAGE_SIZE * 1000.0 / duration.count() / 1024 / 1024) << " MB/s" << std::endl;
}
3. 长时间运行稳定性测试
void TestLongRunningStability() {const int TEST_DURATION = 300; // 5 minutesconst int THREAD_COUNT = 10;const int REQUESTS_PER_SECOND = 100;// 创建服务器auto server = ServerFactory::create(8094);auto dispatcher = std::make_shared<Dispatcher>();std::atomic<int> total_requests(0);std::atomic<int> failed_requests(0);dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [&](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {total_requests++;// 模拟处理时间std::this_thread::sleep_for(std::chrono::milliseconds(10));auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(Json::Value("OK"));conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8094);client->connect();auto start_time = std::chrono::high_resolution_clock::now();// 创建工作线程std::vector<std::thread> workers;std::atomic<bool> stop(false);for (int i = 0; i < THREAD_COUNT; ++i) {workers.emplace_back([&]() {while (!stop) {for (int j = 0; j < REQUESTS_PER_SECOND / THREAD_COUNT; ++j) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("stability_test");request->setParams(Json::Value("test_data"));BaseMessage::ptr response;bool ret = client->send(request, response);if (!ret) {failed_requests++;}std::this_thread::sleep_for(std::chrono::milliseconds(1000 / REQUESTS_PER_SECOND));}}});}// 运行指定时间std::this_thread::sleep_for(std::chrono::seconds(TEST_DURATION));stop = true;for (auto& t : workers) {t.join();}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::seconds>(end_time - start_time);std::cout << "Long Running Stability Test:" << std::endl;std::cout << "Test duration: " << duration.count() << " seconds" << std::endl;std::cout << "Total requests: " << total_requests << std::endl;std::cout << "Failed requests: " << failed_requests << std::endl;std::cout << "Success rate: " << (100.0 * (total_requests - failed_requests) / total_requests) << "%" << std::endl;std::cout << "Average requests per second: " << (total_requests / duration.count()) << std::endl;
}

四、测试结果

单元测试结果

所有单元测试均通过,验证了框架基本功能的正确性:

  • 消息序列化与反序列化功能正常
  • 连接管理功能正常
  • RPC请求响应流程正确
  • 各种消息类型处理正确

接口测试结果

所有接口测试均通过,验证了框架API的可用性:

  • RPC调用接口支持同步、异步和回调三种方式
  • 服务注册与发现功能正常
  • 主题发布订阅功能正常

性能测试结果

1. 并发处理能力测试
  • 线程数: 100
  • 每线程请求数: 1000
  • 总请求数: 100,000
  • 完成请求数: 100,000
  • 耗时: 1,250 ms
  • 每秒处理请求数: 80,000
2. 消息吞吐量测试
  • 消息数量: 100,000
  • 消息大小: 1 KB
  • 总数据量: 97.66 MB
  • 耗时: 2,100 ms
  • 每秒处理消息数: 47,619
  • 吞吐量: 46.50 MB/s
3. 长时间运行稳定性测试
  • 测试时长: 300秒
  • 总请求数: 29,850
  • 失败请求数: 15
  • 成功率: 99.95%
  • 平均每秒请求数: 99.5
http://www.xdnf.cn/news/1475767.html

相关文章:

  • 【Neovim】Vi、Vim、Neovim 与 LazyVim:发展史
  • 【开题答辩全过程】以 “爱心”家政管理系统为例,包含答辩的问题和答案
  • Linux/UNIX系统编程手册笔记:共享库、进程间通信、管道和FIFO、内存映射以及虚拟内存操作
  • 宝塔PostgreSQL安装pgvecto插件contrib包实现向量存储
  • 2025年渗透测试面试题总结-54(题目+回答)
  • rom定制系列------小米8“无人直播”虚拟摄像头 刷机固件 实现解析过程
  • `vector_ip_ops`(内积操作)和 `vector_cosine_ops`(余弦相似度操作)的不同
  • 详解 ELO 评分系统
  • [光学原理与应用-414]:设计 - 深紫外皮秒脉冲激光器 - 元件 - 柱面镜:光学系统中的一维(焦线)调控专家(传统透镜是0维的点)
  • 《用 asyncio 构建异步任务队列:Python 并发编程的实战与思考》
  • java分布式场景怎么实现一个高效的 读-写锁
  • 友猫社区APP源码与小程序端部署详解
  • Redis数据库基础
  • MySQL中有哪些锁
  • MathJax - LaTeX:WordPress 公式精准呈现方案
  • Android Studio 构建变体中的资源选择顺序详解
  • UDP-Server(2)词典功能
  • git在Linux中的使用
  • mac-intel操作系统go-stock项目(股票分析工具)安装与配置指南
  • v0.29.3 敏感词性能优化之繁简体转换 opencc4j 优化
  • 大语言模型提示词工程详尽实战指南
  • 记一次uniapp+nutui-uniapp搭建项目
  • 计算机网络:无线局域网加密与认证方式
  • LeetCode算法日记 - Day 33: 最长公共前缀、最长回文子串
  • Linux | i.MX6ULL Tftp 烧写和 Nfs 启动(第十九章)
  • Paimon——官网阅读:文件系统
  • 1.5、机器学习-回归算法
  • Oracle体系结构-Redo Log Buffer详解
  • Day22_【机器学习—集成学习(3)—Boosting—Adaboost算法】
  • FreeMarker快速入门指南