C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(二)
目录
交换机数据管理
交换机数据类
交换机数据持久化类
交换机数据管理类
测试
交换机数据管理
- 定义交换机数据类
- 交换机名称
- 交换机类型
- 是否持久化标志
- 是否自动删除标志
- 其他参数
- 定义交换机数据持久化类(数据持久化的 sqlite3 数据库中)
- 创建/删除交换机数据表
- 新增交换机数据
- 移除交换机数据
- 查询所有交换机数据
- 查询指定交换机数据(根据名称)
- 定义交换机数据管理类
- 声明交换机,并添加管理(存在则 OK,不存在则创建)
- 删除交换机
- 获取指定交换机
- 销毁所有交换机数据
交换机数据类
struct Exchange{using ptr = std::shared_ptr<Exchange>;// 交换机名称std::string name;// 交换机类型ExchangeType type;// 持久化标志bool durable;// 自动删除标志bool auto_delete;// 其他参数std::unordered_map<std::string, std::string> args;Exchange() {}Exchange(const std::string &ename,ExchangeType etype,bool edurable,bool eauto_delete,const std::unordered_map<std::string, std::string> &eargs): name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs){}void setArgs(const std::string &str_args){// key=val&key=val.....std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &arg : sub_args){size_t pos = arg.find("=");std::string key = arg.substr(0, pos);std::string val = arg.substr(pos + 1);args.insert(std::make_pair(key, val));}}std::string getArgs(){if (args.empty())return "";std::string result;for (auto &arg : args){result += arg.first + "=" + arg.second + "&";}result.pop_back();return result;}};
需要说明的有:
- 对于其他参数,我们用一个unordered_map容器存储,与相应的字符串转化的格式是:key=val&key=val&key=val....
- 在getArgs函数中必须添加对args的判空操作,否则,args为空时,result会为空,调用pop_back函数会发生段错误。
交换机数据持久化类
class ExchangeMapper{public:ExchangeMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());createTable();}void createTable(){
#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key, \type int, \durable int, \auto_delete int, \args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);if (!ret){ERROR("创建交换机数据库表失败");abort();}}void removeTable(){
#define DROP_TABLE "drop table if exists exchange_table;"bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);if (!ret){ERROR("删除交换机数据库表失败");abort();}}void insert(Exchange::ptr &exchange){std::stringstream ss;ss << "insert into exchange_table values('"<< exchange->name << "',"<< exchange->type << ","<< exchange->durable << ","<< exchange->auto_delete << ","<< "'" << exchange->getArgs() << "');";bool ret = _sql_helper.exec(ss.str(), nullptr, nullptr);if (!ret)return;}void remove(const std::string &name){std::stringstream ss;ss << "delete from exchange_table where name = "<< "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);}using ExchangMap = std::unordered_map<std::string, Exchange::ptr>;ExchangMap recovery(){ExchangMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){ExchangMap *result = (ExchangMap *)arg;auto exp = std::make_shared<Exchange>();exp->name = row[0];exp->type = (ExchangeType)std::stoi(row[1]);exp->durable = (bool)std::stoi(row[2]);exp->auto_delete = (bool)std::stoi(row[3]);if (row[4])exp->setArgs(row[4]);result->insert(std::make_pair(exp->name, exp));return 0;}private:SqliteHelper _sql_helper;};
说明:
- 在构造对象时就把相应的数据库打开,如果没有就创建,但要注意一定要先创建好数据库所在目录,在创建表(如果不存在)。
- 前四个函数就是对sql语言的一个使用。
- recovery方法中我们将查询出来的结果以交换机名称为键值,存储在一个unordered_map容器中,以拿到所有交换机。
交换机数据管理类
class ExchangeManager{public:using ptr = std::shared_ptr<ExchangeManager>;ExchangeManager(const std::string &dbfile) : _mapper(dbfile){_exchanges = _mapper.recovery();}// 声明交换机void declareExchange(const std::string &name,ExchangeType type,bool durable,bool auto_delete,std::unordered_map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it != _exchanges.end())return;// 新增交换机auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);_exchanges.insert(std::make_pair(name, exp));if (durable)_mapper.insert(exp);}// 删除交换机void deleteExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return;if (_exchanges[name]->durable)_mapper.remove(name);_exchanges.erase(name);}// 获取指定交换机Exchange::ptr selectExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return nullptr;return it->second;}// 判断交换机是否存在bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return false;return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();}// 清理所有交换机数据void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_exchanges.clear();}private:std::mutex _mutex;ExchangeMapper _mapper;std::unordered_map<std::string, Exchange::ptr> _exchanges;};
说明:
- 此类是真正要让用户使用的类,包含有几个成员属性:
- 互斥锁:支持多线程同时访问。
- ExchangeMapper对象:对数据库进行操作,获取所有交换机。
- _exchanges:存储所有交换机,方便用户获取。
- 在构造时就调用_mapper的recovery方法拿到所有交换机。
- 所有方法对于判断交换机是否存在的操作必须独自进行,不能复用exists方法,否则会导致死锁。
测试
#include "../mqserver/exchange.hpp"
#include <gtest/gtest.h>jiuqi::ExchangeManager::ptr emp;class ExchangeTest : public testing::Environment
{
public:virtual void SetUp() override{emp = std::make_shared<jiuqi::ExchangeManager>("./data/meta.db");}virtual void TearDown() override{// emp->clear();}
};TEST(ExchangeTest, insert_test)
{std::unordered_map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};std::unordered_map<std::string, std::string> map_empty;emp->declareExchange("exchange1", jiuqi::ExchangeType::DIRECT, true, false, map);emp->declareExchange("exchange2", jiuqi::ExchangeType::TOPIC, true, false, map);emp->declareExchange("exchange3", jiuqi::ExchangeType::FANOUT, true, false, map);emp->declareExchange("exchange4", jiuqi::ExchangeType::FANOUT, true, false, map_empty);emp->declareExchange("exchange5", jiuqi::ExchangeType::FANOUT, true, false, map_empty);emp->declareExchange("exchange6", jiuqi::ExchangeType::FANOUT, true, false, map_empty);ASSERT_EQ(emp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::Exchange::ptr exp = emp->selectExchange("exchange3");ASSERT_EQ(exp->name, "exchange3");ASSERT_EQ(exp->type, jiuqi::ExchangeType::FANOUT);ASSERT_EQ(exp->durable, true);ASSERT_EQ(exp->auto_delete, false);ASSERT_EQ(exp->getArgs(), std::string("k1=v1&k2=v2&k3=v3"));
}TEST(ExchangeTest, delete_test)
{emp->deleteExchange("exchange1");jiuqi::Exchange::ptr exp = emp->selectExchange("exchange1");ASSERT_EQ(exp.get(), nullptr);ASSERT_EQ(emp->exists("exchange1"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new ExchangeTest);return RUN_ALL_TESTS();
}
select测试可能会失败,原因是在交换机中我们使用unordered_map存储其他参数,由于顺序随机,所有使用getArgs获取的字符串中键值对的顺序也是随机的,但是不影响功能。