C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(三)
目录
队列数据管理
代码实现
测试代码
绑定信息(交换机-队列)管理
代码实现
测试代码
队列数据管理
当前队列数据的管理,本质上是队列描述信息的管理,描述当前服务器上有哪些队列。
- 定义队列描述数据类
- 队列名称
- 是否持久化标志
是否独占标志是否自动删除标志其他参数
- 定义队列数据持久化类(数据持久化的 sqlite3 数据库中)
- 创建/删除队列数据表
- 新增队列数据
- 移除队列数据
- 查询所有队列数据
- 定义队列数据管理类
- 创建队列,并添加管理(存在则 OK,不存在则创建)
- 删除队列
- 获取指定队列
- 获取所有队列
- 判断指定队列是否存在
- 获取队列数量
- 销毁所有队列数据
代码实现
与交换机数据管理的实现非常相似,只需要修改表结构即可
#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>namespace jiuqi
{struct MsgQueue{using ptr = std::shared_ptr<MsgQueue>;std::string name;bool durable;bool exclusive;bool auto_delete;std::unordered_map<std::string, std::string> args;MsgQueue() {}MsgQueue(const std::string &qname,bool qdurable,bool qexclusive,bool qauto_delete,const std::unordered_map<std::string, std::string> qargs): name(qname), durable(qdurable), exclusive(qexclusive), auto_delete(qauto_delete), args(qargs) {}void setArgs(const std::string &str_args){// key=val&key=val.....std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &arg : sub_args){size_t pos = arg.find("=");std::string key = arg.substr(0, pos);std::string val = arg.substr(pos + 1);args.insert(std::make_pair(key, val));}}std::string getArgs(){if (args.empty())return "";std::string result;for (auto &arg : args){result += arg.first + "=" + arg.second + "&";}result.pop_back();return result;}};using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;class QueueMapper{public:QueueMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path); assert(_sql_helper.open());createTable();}void createTable(){std::stringstream ss;ss << "create table if not exists queue_table("<< "name varchar(32) primary key, "<< "durable int, "<< "exclusive int, "<< "auto_delete int, "<< "args varchar(128));";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("创建队列数据库表失败");abort();}}void removeTable(){std::string sql = "drop table if exists queue_table;";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("删除交换机数据库表失败");abort();}}bool insert(MsgQueue::ptr &queue){std::stringstream ss;ss << "insert into queue_table values('"<< queue->name << "', "<< queue->durable << ", "<< queue->exclusive << ", "<< queue->auto_delete << ", '"<< queue->getArgs() << "');";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool remove(const std::string &name){std::stringstream ss;ss << "delete from queue_table where "<< "name = " << "'" << name << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}QueueMap recovery(){QueueMap result;std::string sql = "select name, durable, exclusive, auto_delete, args from queue_table";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){QueueMap *result = (QueueMap *)arg;MsgQueue::ptr mqp = std::make_shared<MsgQueue>();mqp->name = row[0];mqp->durable = (bool)std::stoi(row[1]);mqp->exclusive = (bool)std::stoi(row[2]);mqp->auto_delete = (bool)std::stoi(row[3]);if (row[4])mqp->setArgs(row[4]);result->insert(std::make_pair(mqp->name, mqp));return 0;}private:SqliteHelper _sql_helper;};class QueueManager{public:using ptr = std::shared_ptr<QueueManager>;QueueManager(const std::string &dbfile) : _mapper(dbfile){_queues = _mapper.recovery();}void declareQueue(const std::string &name,bool durable,bool exclusive,bool auto_delete,std::unordered_map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it != _queues.end())return;auto queue = std::make_shared<MsgQueue>(name, durable, exclusive, auto_delete, args);_queues.insert(std::make_pair(name, queue));if (durable)_mapper.insert(queue);}void deleteQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return;if (it->second->durable)_mapper.remove(name);_queues.erase(name);}MsgQueue::ptr selectQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return nullptr;return it->second;}QueueMap allQueue(){std::unique_lock<std::mutex> lock(_mutex);return _queues;}bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return false;return true; }size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _queues.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_queues.clear();}private:std::mutex _mutex;QueueMapper _mapper;QueueMap _queues;};
}
测试代码
#include "../mqserver/queue.hpp"
#include <gtest/gtest.h>jiuqi::QueueManager::ptr qmp;class ExchangeTest : public testing::Environment
{
public:virtual void SetUp() override{qmp = std::make_shared<jiuqi::QueueManager>("./data/queue.db");}virtual void TearDown() override{qmp->clear();}
};TEST(ExchangeTest, insert_test)
{std::unordered_map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};std::unordered_map<std::string, std::string> map_empty;qmp->declareQueue("queue1", true, false, false, map);qmp->declareQueue("queue2", true, false, false, map);qmp->declareQueue("queue3", true, false, false, map);qmp->declareQueue("queue4", true, false, false,map_empty);qmp->declareQueue("queue5", true, false, false,map_empty);qmp->declareQueue("queue6", true, false, false,map_empty);ASSERT_EQ(qmp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::MsgQueue::ptr mqp = qmp->selectQueue("queue3");ASSERT_EQ(mqp->name, "queue3");ASSERT_EQ(mqp->durable, true);ASSERT_EQ(mqp->exclusive, false);ASSERT_EQ(mqp->auto_delete, false);ASSERT_EQ(mqp->getArgs(), std::string("k1=v1&k2=v2&k3=v3"));
}TEST(ExchangeTest, delete_test)
{qmp->deleteQueue("queue1");jiuqi::MsgQueue::ptr mqp = qmp->selectQueue("queue1");ASSERT_EQ(mqp.get(), nullptr);ASSERT_EQ(qmp->exists("queue1"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new ExchangeTest);return RUN_ALL_TESTS();
}
绑定信息(交换机-队列)管理
绑定信息,本质上就是一个交换机关联了哪些队列的描述。
- 定义绑定信息类
- 交换机名称
- 队列名称
- binding_key(分发匹配规则-决定了哪些数据能被交换机放入队列)
- 定义绑定信息数据持久化类(数据持久化的 sqlite3 数据库中)
- 创建/删除绑定信息数据表
- 新增绑定信息数据
- 移除指定绑定信息数据
- 移除指定交换机相关绑定信息数据:移除交换机的时候会被调用
- 移除指定队列相关绑定信息数据:移除队列的时候会被调用f. 查询所有绑定信息数据:用于重启服务器时进行历史数据恢复
- 定义绑定信息数据管理类
- 创建绑定信息,并添加管理(存在则 OK,不存在则创建)
- 解除指定的绑定信息
- 删除指定队列的所有绑定信息
- 删除交换机相关的所有绑定信息
- 获取交换机相关的所有绑定信息:交换机收到消息后,需要分发给自己关联的队列
- 判断指定绑定信息是否存在
- 获取当前绑定信息数量
- 销毁所有绑定信息数据
代码实现
同样与上述类的实现类似
#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>namespace jiuqi
{struct Binding{using ptr = std::shared_ptr<Binding>;std::string exchangeName;std::string queueName;std::string bindingKey;Binding() {}Binding(const std::string &ename, const std::string &qname, const std::string &key): exchangeName(ename), queueName(qname), bindingKey(key){}};using QueueBindingMap = std::unordered_map<std::string, Binding::ptr>;using BindingMap = std::unordered_map<std::string, QueueBindingMap>;class BindingMapper{public:BindingMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);_sql_helper.open();createTable();}void createTable(){std::string sql = "create table if not exists binding_table(exchangeName varchar(32), queueName varchar(32), bindingKey varchar(128), PRIMARY KEY (exchangeName, queueName));";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("创建绑定信息数据库表失败");abort();}}void removeTable(){std::string sql = "drop table if exists binding_table;";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("删除绑定信息数据库表失败");abort();}}bool insert(Binding::ptr &binding){std::stringstream ss;ss << "insert into binding_table values('"<< binding->exchangeName << "', '"<< binding->queueName << "', '"<< binding->bindingKey << "');";std::string sql = ss.str();if (!_sql_helper.exec(sql, nullptr, nullptr)){ERROR("插入绑定记录失败");return false;}return true;}bool remove(const std::string &ename, const std::string &qname){std::stringstream ss;ss << "delete from binding_table where "<< "exchangeName = '" << ename << "' "<< "and queueName = '" << qname << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool removeExchangeBindings(const std::string &ename){std::stringstream ss;ss << "delete from binding_table where "<< "exchangeName = '" << ename << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool removeQueueBindings(const std::string &qname){std::stringstream ss;ss << "delete from binding_table where "<< "queueName = '" << qname << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}BindingMap recovery(){BindingMap result;std::string sql = "select exchangeName, queueName, bindingKey from binding_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){BindingMap *result = (BindingMap *)arg;Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);// 为了防止交换机相关绑定信息已经存在,不能直接创建队列映射// 因此要先获取交换机对应的映射对象// 使用引用的好处, 如果不存在就会创建QueueBindingMap &qbm = (*result)[bp->exchangeName];qbm.insert(std::make_pair(bp->queueName, bp));return 0;}private:SqliteHelper _sql_helper;};class BindingManager{public:using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string &dbfile) : _mapper(dbfile){_bindings = _mapper.recovery();}bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit != _bindings.end() && eit->second.find(qname) != eit->second.end())return true;Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);if (durable){if(!_mapper.insert(bp))return false;}QueueBindingMap &qbm = _bindings[ename];qbm.insert(std::make_pair(qname, bp));return true;}void unbind(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return;auto qit = eit->second.find(qname);if (qit == eit->second.end())return;_bindings[ename].erase(qname);if (eit->second.empty())_bindings.erase(ename);_mapper.remove(ename, qname);}void removeExchangeBinding(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return;_mapper.removeExchangeBindings(ename);_bindings.erase(ename);}void removeQueueBinding(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeQueueBindings(qname);// 遍历所有交换机的绑定for (auto it = _bindings.begin(); it != _bindings.end();){// 删除该队列在当前交换机的绑定it->second.erase(qname);// 如果当前交换机的绑定为空,则删除该交换机的条目if (it->second.empty()){it = _bindings.erase(it); // erase返回下一个有效的迭代器}else{++it;}}}QueueBindingMap getExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto it = _bindings.find(ename);if (it == _bindings.end())return QueueBindingMap();return _bindings[ename];}Binding::ptr getBinding(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return nullptr;auto qit = eit->second.find(qname);if (qit == eit->second.end())return nullptr;return qit->second;}bool exists(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return false;auto qit = eit->second.find(qname);if (qit == eit->second.end())return false;return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);size_t size = 0;for (auto start = _bindings.begin(); start != _bindings.end(); start++)size += start->second.size();return size;}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_bindings.clear();}private:std::mutex _mutex;BindingMapper _mapper;BindingMap _bindings;};
}
值得注意的是:在使用unordered_map保存绑定信息的时候,插入和删除的方式与之前不同,具体在注释中给出了解释。
测试代码
#include "../mqserver/binding.hpp"
#include <gtest/gtest.h>jiuqi::BindingManager::ptr bmp;class BindingTest : public testing::Environment
{
public:virtual void SetUp() override{bmp = std::make_shared<jiuqi::BindingManager>("./data/binding.db");}virtual void TearDown() override{bmp->clear();}
};TEST(ExchangeTest, insert_test)
{bmp->bind("exchange1", "queue1", "651", 1);bmp->bind("exchange1", "queue2", "651", 1);bmp->bind("exchange1", "queue3", "651", 1);bmp->bind("exchange2", "queue1", "651", 1);bmp->bind("exchange2", "queue2", "651", 1);bmp->bind("exchange2", "queue3", "651", 1);ASSERT_EQ(bmp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::Binding::ptr bp = bmp->getBinding("exchange2", "queue1");ASSERT_EQ(bp->exchangeName, "exchange2");ASSERT_EQ(bp->queueName, "queue1");ASSERT_EQ(bp->bindingKey, "651");
}TEST(ExchangeTest, delete_test)
{bmp->unbind("exchange1", "queue3");jiuqi::Binding::ptr bp = bmp->getBinding("exchange1", "queue3");ASSERT_EQ(bp.get(), nullptr);ASSERT_EQ(bmp->exists("exchange1", "queue3"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new BindingTest);return RUN_ALL_TESTS();
}
在测试的过程中,发现了一种错误,就是创建数据库表时发生了out of memory错误,开始还以为是系统内存不足,后来发现在构造mapper时忘记了打开数据库,所以得知如果没有打开数据库就创建表就会发生out of memory错误。