当前位置: 首页 > news >正文

C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(三)

目录

队列数据管理

代码实现

测试代码

绑定信息(交换机-队列)管理

代码实现

测试代码


队列数据管理

当前队列数据的管理,本质上是队列描述信息的管理,描述当前服务器上有哪些队列。

  • 定义队列描述数据类
  1. 队列名称
  2. 是否持久化标志
  3. 是否独占标志
  4. 是否自动删除标志
  5. 其他参数
  • 定义队列数据持久化类(数据持久化的 sqlite3 数据库中)
  1. 创建/删除队列数据表
  2. 新增队列数据
  3. 移除队列数据
  4. 查询所有队列数据
  • 定义队列数据管理类
  1. 创建队列,并添加管理(存在则 OK,不存在则创建)
  2. 删除队列
  3. 获取指定队列
  4. 获取所有队列
  5. 判断指定队列是否存在
  6. 获取队列数量
  7. 销毁所有队列数据

代码实现

与交换机数据管理的实现非常相似,只需要修改表结构即可

#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>namespace jiuqi
{struct MsgQueue{using ptr = std::shared_ptr<MsgQueue>;std::string name;bool durable;bool exclusive;bool auto_delete;std::unordered_map<std::string, std::string> args;MsgQueue() {}MsgQueue(const std::string &qname,bool qdurable,bool qexclusive,bool qauto_delete,const std::unordered_map<std::string, std::string> qargs): name(qname), durable(qdurable), exclusive(qexclusive), auto_delete(qauto_delete), args(qargs) {}void setArgs(const std::string &str_args){// key=val&key=val.....std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &arg : sub_args){size_t pos = arg.find("=");std::string key = arg.substr(0, pos);std::string val = arg.substr(pos + 1);args.insert(std::make_pair(key, val));}}std::string getArgs(){if (args.empty())return "";std::string result;for (auto &arg : args){result += arg.first + "=" + arg.second + "&";}result.pop_back();return result;}};using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;class QueueMapper{public:QueueMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);            assert(_sql_helper.open());createTable();}void createTable(){std::stringstream ss;ss << "create table if not exists queue_table("<< "name varchar(32) primary key, "<< "durable int, "<< "exclusive int, "<< "auto_delete int, "<< "args varchar(128));";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("创建队列数据库表失败");abort();}}void removeTable(){std::string sql = "drop table if exists queue_table;";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("删除交换机数据库表失败");abort();}}bool insert(MsgQueue::ptr &queue){std::stringstream ss;ss << "insert into queue_table values('"<< queue->name << "', "<< queue->durable << ", "<< queue->exclusive << ", "<< queue->auto_delete << ", '"<< queue->getArgs() << "');";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool remove(const std::string &name){std::stringstream ss;ss << "delete from queue_table where "<< "name = " << "'" << name << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}QueueMap recovery(){QueueMap result;std::string sql = "select name, durable, exclusive, auto_delete, args from queue_table";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){QueueMap *result = (QueueMap *)arg;MsgQueue::ptr mqp = std::make_shared<MsgQueue>();mqp->name = row[0];mqp->durable = (bool)std::stoi(row[1]);mqp->exclusive = (bool)std::stoi(row[2]);mqp->auto_delete = (bool)std::stoi(row[3]);if (row[4])mqp->setArgs(row[4]);result->insert(std::make_pair(mqp->name, mqp));return 0;}private:SqliteHelper _sql_helper;};class QueueManager{public:using ptr = std::shared_ptr<QueueManager>;QueueManager(const std::string &dbfile) : _mapper(dbfile){_queues = _mapper.recovery();}void declareQueue(const std::string &name,bool durable,bool exclusive,bool auto_delete,std::unordered_map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it != _queues.end())return;auto queue = std::make_shared<MsgQueue>(name, durable, exclusive, auto_delete, args);_queues.insert(std::make_pair(name, queue));if (durable)_mapper.insert(queue);}void deleteQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return;if (it->second->durable)_mapper.remove(name);_queues.erase(name);}MsgQueue::ptr selectQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return nullptr;return it->second;}QueueMap allQueue(){std::unique_lock<std::mutex> lock(_mutex);return _queues;}bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return false;return true;            }size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _queues.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_queues.clear();}private:std::mutex _mutex;QueueMapper _mapper;QueueMap _queues;};
}

测试代码

#include "../mqserver/queue.hpp"
#include <gtest/gtest.h>jiuqi::QueueManager::ptr qmp;class ExchangeTest : public testing::Environment
{
public:virtual void SetUp() override{qmp = std::make_shared<jiuqi::QueueManager>("./data/queue.db");}virtual void TearDown() override{qmp->clear();}
};TEST(ExchangeTest, insert_test)
{std::unordered_map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};std::unordered_map<std::string, std::string> map_empty;qmp->declareQueue("queue1", true, false, false, map);qmp->declareQueue("queue2", true, false, false, map);qmp->declareQueue("queue3", true, false, false, map);qmp->declareQueue("queue4", true, false, false,map_empty);qmp->declareQueue("queue5", true, false, false,map_empty);qmp->declareQueue("queue6", true, false, false,map_empty);ASSERT_EQ(qmp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::MsgQueue::ptr mqp = qmp->selectQueue("queue3");ASSERT_EQ(mqp->name, "queue3");ASSERT_EQ(mqp->durable, true);ASSERT_EQ(mqp->exclusive, false);ASSERT_EQ(mqp->auto_delete, false);ASSERT_EQ(mqp->getArgs(), std::string("k1=v1&k2=v2&k3=v3"));
}TEST(ExchangeTest, delete_test)
{qmp->deleteQueue("queue1");jiuqi::MsgQueue::ptr mqp = qmp->selectQueue("queue1");ASSERT_EQ(mqp.get(), nullptr);ASSERT_EQ(qmp->exists("queue1"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new ExchangeTest);return RUN_ALL_TESTS();
}

绑定信息(交换机-队列)管理

绑定信息,本质上就是一个交换机关联了哪些队列的描述。

  • 定义绑定信息类
  1. 交换机名称
  2. 队列名称
  3. binding_key(分发匹配规则-决定了哪些数据能被交换机放入队列)
  • 定义绑定信息数据持久化类(数据持久化的 sqlite3 数据库中)
  1. 创建/删除绑定信息数据表
  2. 新增绑定信息数据
  3. 移除指定绑定信息数据
  4. 移除指定交换机相关绑定信息数据:移除交换机的时候会被调用
  5. 移除指定队列相关绑定信息数据:移除队列的时候会被调用f. 查询所有绑定信息数据:用于重启服务器时进行历史数据恢复
  • 定义绑定信息数据管理类
  1. 创建绑定信息,并添加管理(存在则 OK,不存在则创建)
  2. 解除指定的绑定信息
  3. 删除指定队列的所有绑定信息
  4. 删除交换机相关的所有绑定信息
  5. 获取交换机相关的所有绑定信息:交换机收到消息后,需要分发给自己关联的队列
  6. 判断指定绑定信息是否存在
  7. 获取当前绑定信息数量
  8. 销毁所有绑定信息数据

代码实现

同样与上述类的实现类似

 

#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>namespace jiuqi
{struct Binding{using ptr = std::shared_ptr<Binding>;std::string exchangeName;std::string queueName;std::string bindingKey;Binding() {}Binding(const std::string &ename, const std::string &qname, const std::string &key): exchangeName(ename), queueName(qname), bindingKey(key){}};using QueueBindingMap = std::unordered_map<std::string, Binding::ptr>;using BindingMap = std::unordered_map<std::string, QueueBindingMap>;class BindingMapper{public:BindingMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);_sql_helper.open();createTable();}void createTable(){std::string sql = "create table if not exists binding_table(exchangeName varchar(32), queueName varchar(32), bindingKey varchar(128), PRIMARY KEY (exchangeName, queueName));";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("创建绑定信息数据库表失败");abort();}}void removeTable(){std::string sql = "drop table if exists binding_table;";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("删除绑定信息数据库表失败");abort();}}bool insert(Binding::ptr &binding){std::stringstream ss;ss << "insert into binding_table values('"<< binding->exchangeName << "', '"<< binding->queueName << "', '"<< binding->bindingKey << "');";std::string sql = ss.str();if (!_sql_helper.exec(sql, nullptr, nullptr)){ERROR("插入绑定记录失败");return false;}return true;}bool remove(const std::string &ename, const std::string &qname){std::stringstream ss;ss << "delete from binding_table where "<< "exchangeName = '" << ename << "' "<< "and queueName = '" << qname << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool removeExchangeBindings(const std::string &ename){std::stringstream ss;ss << "delete from binding_table where "<< "exchangeName = '" << ename << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool removeQueueBindings(const std::string &qname){std::stringstream ss;ss << "delete from binding_table where "<< "queueName = '" << qname << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}BindingMap recovery(){BindingMap result;std::string sql = "select exchangeName, queueName, bindingKey from binding_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){BindingMap *result = (BindingMap *)arg;Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);// 为了防止交换机相关绑定信息已经存在,不能直接创建队列映射// 因此要先获取交换机对应的映射对象// 使用引用的好处, 如果不存在就会创建QueueBindingMap &qbm = (*result)[bp->exchangeName];qbm.insert(std::make_pair(bp->queueName, bp));return 0;}private:SqliteHelper _sql_helper;};class BindingManager{public:using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string &dbfile) : _mapper(dbfile){_bindings = _mapper.recovery();}bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit != _bindings.end() && eit->second.find(qname) != eit->second.end())return true;Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);if (durable){if(!_mapper.insert(bp))return false;}QueueBindingMap &qbm = _bindings[ename];qbm.insert(std::make_pair(qname, bp));return true;}void unbind(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return;auto qit = eit->second.find(qname);if (qit == eit->second.end())return;_bindings[ename].erase(qname);if (eit->second.empty())_bindings.erase(ename);_mapper.remove(ename, qname);}void removeExchangeBinding(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return;_mapper.removeExchangeBindings(ename);_bindings.erase(ename);}void removeQueueBinding(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeQueueBindings(qname);// 遍历所有交换机的绑定for (auto it = _bindings.begin(); it != _bindings.end();){// 删除该队列在当前交换机的绑定it->second.erase(qname);// 如果当前交换机的绑定为空,则删除该交换机的条目if (it->second.empty()){it = _bindings.erase(it); // erase返回下一个有效的迭代器}else{++it;}}}QueueBindingMap getExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto it = _bindings.find(ename);if (it == _bindings.end())return QueueBindingMap();return _bindings[ename];}Binding::ptr getBinding(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return nullptr;auto qit = eit->second.find(qname);if (qit == eit->second.end())return nullptr;return qit->second;}bool exists(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return false;auto qit = eit->second.find(qname);if (qit == eit->second.end())return false;return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);size_t size = 0;for (auto start = _bindings.begin(); start != _bindings.end(); start++)size += start->second.size();return size;}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_bindings.clear();}private:std::mutex _mutex;BindingMapper _mapper;BindingMap _bindings;};
}

        值得注意的是:在使用unordered_map保存绑定信息的时候,插入和删除的方式与之前不同,具体在注释中给出了解释。 

测试代码

#include "../mqserver/binding.hpp"
#include <gtest/gtest.h>jiuqi::BindingManager::ptr bmp;class BindingTest : public testing::Environment
{
public:virtual void SetUp() override{bmp = std::make_shared<jiuqi::BindingManager>("./data/binding.db");}virtual void TearDown() override{bmp->clear();}
};TEST(ExchangeTest, insert_test)
{bmp->bind("exchange1", "queue1", "651", 1);bmp->bind("exchange1", "queue2", "651", 1);bmp->bind("exchange1", "queue3", "651", 1);bmp->bind("exchange2", "queue1", "651", 1);bmp->bind("exchange2", "queue2", "651", 1);bmp->bind("exchange2", "queue3", "651", 1);ASSERT_EQ(bmp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::Binding::ptr bp = bmp->getBinding("exchange2", "queue1");ASSERT_EQ(bp->exchangeName, "exchange2");ASSERT_EQ(bp->queueName, "queue1");ASSERT_EQ(bp->bindingKey, "651");
}TEST(ExchangeTest, delete_test)
{bmp->unbind("exchange1", "queue3");jiuqi::Binding::ptr bp = bmp->getBinding("exchange1", "queue3");ASSERT_EQ(bp.get(), nullptr);ASSERT_EQ(bmp->exists("exchange1", "queue3"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new BindingTest);return RUN_ALL_TESTS();
}

        在测试的过程中,发现了一种错误,就是创建数据库表时发生了out of memory错误,开始还以为是系统内存不足,后来发现在构造mapper时忘记了打开数据库,所以得知如果没有打开数据库就创建表就会发生out of memory错误。

http://www.xdnf.cn/news/1181251.html

相关文章:

  • 基于深度学习的图像分类:使用MobileNet实现高效分类
  • Python进阶第三方库之Matplotlib
  • 深度学习(鱼书)day01--感知机
  • LeetCode 23:合并 K 个升序链表
  • 【C++】使用中值滤波算法过滤数据样本中的尖刺噪声
  • rust-方法语法
  • C++STL系列之set和map系列
  • 基于python django的农业可视化系统,以奶牛牧场为例
  • 用 Function Call 让 AI 主动调用函数(超入门级示例)|保姆级大模型应用开发实战
  • SpringBoot航空订票系统的设计与实现
  • 进阶系统策略
  • 技术赋能多元探索:我的技术成长与行业洞察
  • Linux应用开发基础知识——进程学习2(exec函数、system函数、popen函数)(三)
  • 斐波那契数列策略
  • 人形机器人_双足行走动力学:Maxwell模型及在拟合肌腱特性中的应用
  • Java学习----原型模式
  • 使用Claude Code从零到一打造一个现代化的GitHub Star项目管理器
  • day46day47 通道注意力
  • 无源域自适应综合研究【2】
  • C++ 性能优化
  • 力扣 hot100 Day54
  • pytest中使用skip跳过某个函数
  • 无人机速度模块技术要点分析
  • 第三章:掌握 Redis 存储与获取数据的核心命令
  • MNIST 手写数字识别模型分析
  • 秋叶sd-webui频繁出现生成后无反应的问题
  • 【Web APIs】JavaScript 节点操作 ⑧ ( 删除节点 - removeChild 函数 | 删除节点 - 代码示例 | 删除网页评论案例 )
  • 算法竞赛阶段二-数据结构(34)数据结构链表STL vector
  • 【PyTorch】图像二分类项目-部署
  • Spring Boot 3整合Spring AI实战:9轮面试对话解析AI应用开发