当前位置: 首页 > news >正文

C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(二)

目录

交换机数据管理

交换机数据类

交换机数据持久化类

交换机数据管理类

测试

 


交换机数据管理

  • 定义交换机数据类
  1. 交换机名称
  2. 交换机类型
  3. 是否持久化标志
  4. 是否自动删除标志
  5. 其他参数
  •  定义交换机数据持久化类(数据持久化的 sqlite3 数据库中)
  1. 创建/删除交换机数据表
  2. 新增交换机数据
  3. 移除交换机数据
  4. 查询所有交换机数据
  5. 查询指定交换机数据(根据名称)
  • 定义交换机数据管理类
  1. 声明交换机,并添加管理(存在则 OK,不存在则创建)
  2. 删除交换机
  3. 获取指定交换机
  4. 销毁所有交换机数据

交换机数据类

    struct Exchange{using ptr = std::shared_ptr<Exchange>;// 交换机名称std::string name;// 交换机类型ExchangeType type;// 持久化标志bool durable;// 自动删除标志bool auto_delete;// 其他参数std::unordered_map<std::string, std::string> args;Exchange() {}Exchange(const std::string &ename,ExchangeType etype,bool edurable,bool eauto_delete,const std::unordered_map<std::string, std::string> &eargs): name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs){}void setArgs(const std::string &str_args){// key=val&key=val.....std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &arg : sub_args){size_t pos = arg.find("=");std::string key = arg.substr(0, pos);std::string val = arg.substr(pos + 1);args.insert(std::make_pair(key, val));}}std::string getArgs(){if (args.empty())return "";std::string result;for (auto &arg : args){result += arg.first + "=" + arg.second + "&";}result.pop_back();return result;}};

需要说明的有:

  • 对于其他参数,我们用一个unordered_map容器存储,与相应的字符串转化的格式是:key=val&key=val&key=val....
  • 在getArgs函数中必须添加对args的判空操作,否则,args为空时,result会为空,调用pop_back函数会发生段错误。

交换机数据持久化类

    class ExchangeMapper{public:ExchangeMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());createTable();}void createTable(){
#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key, \type int, \durable int, \auto_delete int, \args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);if (!ret){ERROR("创建交换机数据库表失败");abort();}}void removeTable(){
#define DROP_TABLE "drop table if exists exchange_table;"bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);if (!ret){ERROR("删除交换机数据库表失败");abort();}}void insert(Exchange::ptr &exchange){std::stringstream ss;ss << "insert into exchange_table values('"<< exchange->name << "',"<< exchange->type << ","<< exchange->durable << ","<< exchange->auto_delete << ","<< "'" << exchange->getArgs() << "');";bool ret = _sql_helper.exec(ss.str(), nullptr, nullptr);if (!ret)return;}void remove(const std::string &name){std::stringstream ss;ss << "delete from exchange_table where name = "<< "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);}using ExchangMap = std::unordered_map<std::string, Exchange::ptr>;ExchangMap recovery(){ExchangMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){ExchangMap *result = (ExchangMap *)arg;auto exp = std::make_shared<Exchange>();exp->name = row[0];exp->type = (ExchangeType)std::stoi(row[1]);exp->durable = (bool)std::stoi(row[2]);exp->auto_delete = (bool)std::stoi(row[3]);if (row[4])exp->setArgs(row[4]);result->insert(std::make_pair(exp->name, exp));return 0;}private:SqliteHelper _sql_helper;};

说明:

  • 在构造对象时就把相应的数据库打开,如果没有就创建,但要注意一定要先创建好数据库所在目录,在创建表(如果不存在)。
  • 前四个函数就是对sql语言的一个使用。
  • recovery方法中我们将查询出来的结果以交换机名称为键值,存储在一个unordered_map容器中,以拿到所有交换机。

交换机数据管理类

    class ExchangeManager{public:using ptr = std::shared_ptr<ExchangeManager>;ExchangeManager(const std::string &dbfile) : _mapper(dbfile){_exchanges = _mapper.recovery();}// 声明交换机void declareExchange(const std::string &name,ExchangeType type,bool durable,bool auto_delete,std::unordered_map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it != _exchanges.end())return;// 新增交换机auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);_exchanges.insert(std::make_pair(name, exp));if (durable)_mapper.insert(exp);}// 删除交换机void deleteExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return;if (_exchanges[name]->durable)_mapper.remove(name);_exchanges.erase(name);}// 获取指定交换机Exchange::ptr selectExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return nullptr;return it->second;}// 判断交换机是否存在bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return false;return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();}// 清理所有交换机数据void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_exchanges.clear();}private:std::mutex _mutex;ExchangeMapper _mapper;std::unordered_map<std::string, Exchange::ptr> _exchanges;};

说明:

  • 此类是真正要让用户使用的类,包含有几个成员属性:
    • 互斥锁:支持多线程同时访问。
    • ExchangeMapper对象:对数据库进行操作,获取所有交换机。
    • _exchanges:存储所有交换机,方便用户获取。
  • 在构造时就调用_mapper的recovery方法拿到所有交换机。
  • 所有方法对于判断交换机是否存在的操作必须独自进行,不能复用exists方法,否则会导致死锁。

测试

 

#include "../mqserver/exchange.hpp"
#include <gtest/gtest.h>jiuqi::ExchangeManager::ptr emp;class ExchangeTest : public testing::Environment
{
public:virtual void SetUp() override{emp = std::make_shared<jiuqi::ExchangeManager>("./data/meta.db");}virtual void TearDown() override{// emp->clear();}
};TEST(ExchangeTest, insert_test)
{std::unordered_map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};std::unordered_map<std::string, std::string> map_empty;emp->declareExchange("exchange1", jiuqi::ExchangeType::DIRECT, true, false, map);emp->declareExchange("exchange2", jiuqi::ExchangeType::TOPIC, true, false, map);emp->declareExchange("exchange3", jiuqi::ExchangeType::FANOUT, true, false, map);emp->declareExchange("exchange4", jiuqi::ExchangeType::FANOUT, true, false, map_empty);emp->declareExchange("exchange5", jiuqi::ExchangeType::FANOUT, true, false, map_empty);emp->declareExchange("exchange6", jiuqi::ExchangeType::FANOUT, true, false, map_empty);ASSERT_EQ(emp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::Exchange::ptr exp = emp->selectExchange("exchange3");ASSERT_EQ(exp->name, "exchange3");ASSERT_EQ(exp->type, jiuqi::ExchangeType::FANOUT);ASSERT_EQ(exp->durable, true);ASSERT_EQ(exp->auto_delete, false);ASSERT_EQ(exp->getArgs(), std::string("k1=v1&k2=v2&k3=v3"));
}TEST(ExchangeTest, delete_test)
{emp->deleteExchange("exchange1");jiuqi::Exchange::ptr exp = emp->selectExchange("exchange1");ASSERT_EQ(exp.get(), nullptr);ASSERT_EQ(emp->exists("exchange1"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new ExchangeTest);return RUN_ALL_TESTS();
}

        select测试可能会失败,原因是在交换机中我们使用unordered_map存储其他参数,由于顺序随机,所有使用getArgs获取的字符串中键值对的顺序也是随机的,但是不影响功能。

http://www.xdnf.cn/news/1162261.html

相关文章:

  • docker磁盘空间不足解决办法
  • MongoDB 查询时区问题
  • linux定时器使用
  • 3、Spring AI_DeepSeek模型-多轮对话
  • 江苏思必驰科技25Java实习面经
  • HTTP,HTTPS
  • 服务器系统时间不准确怎么办?
  • 图论基本算法
  • 部署Zabbix企业级分布式监控
  • 【Unity基础】Unity中2D和3D项目开发流程对比
  • Unity 插件Resize Pro 最快的 Texture2D 调整大小工具
  • Elasticsearch 是 NVIDIA Enterprise AI Factory 验证设计中推荐的向量数据库
  • 数据结构堆的实现(C语言)
  • Web3.0 能为你带来哪些实质性的 改变与突破
  • Vue 脚手架——render函数
  • 【算法笔记】树状数组
  • Linux学习之Linux系统权限
  • 《C++》函数内联,auto关键字
  • 用基础模型构建应用(第十章)AI Engineering: Building Applications with Foundation Models学习笔记
  • 探索无广告音乐世界:MusicFree 免费播放器
  • 海康威视视觉算法岗位30问及详解
  • BERT 的“池化策略”
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 主页布局实现
  • Three.js 立方体贴图(CubeMap)完全指南:从加载到应用
  • 大模型高效适配:软提示调优 Prompt Tuning
  • Python高效入门指南
  • 深入详解随机森林在放射治疗计划优化中的应用及实现细节
  • 部署 Zabbix 企业级分布式监控
  • Levels checking (filtering) in logging module
  • 大腾智能国产3D CAD软件正式上架华为云云商店