当前位置: 首页 > news >正文

手撕基于AMQP协议的简易消息队列-6(服务端模块的编写)

在MQServer中编写服务端模块代码

在MQServer中编写makefile文件来编译服务端模块
.PHONY: server
CFLAG= -I../ThirdLib/lib/include
LFLAG= -L../ThirdLib/lib/lib -lgtest -lprotobuf -lsqlite3 -pthread -lmuduo_net -lmuduo_base -lz
server:server.cpp ../MQCommon/message.pb.cc ../MQCommon/request.pb.cc ../ThirdLib/lib/include/muduo/protobuf/codec.ccg++ -g -std=c++11 $(CFLAG) $^ -o $@  $(LFLAG)
在MQServer中编写Exchange.hpp文件实现交换机数据管理
  • 在此文件中要实现的功能:

    1. 定义交换机数据类

      • 交换机名称

      • 交换机类型

      • 是否持久化标志

      • 是否⾃动删除标志

      • 其他参数

    2. 定义交换机数据持久化类(数据持久化的sqlite3数据库中)

      • 创建/删除交换机数据表
      • 新增交换机数据
      • 移除交换机数据
      • 查询所有交换机数据
      • 查询指定交换机数据(根据名称)
    3. 定义交换机数据管理类

      • 声明交换机,并添加管理(存在则OK,不存在则创建)
      • 删除交换机
      • 获取指定交换机
      • 销毁所有交换机数据
  • 实现代码

    #ifndef __M_EXCHANGE_H__
    #define __M_EXCHANGE_H__
    #include "../MQCommon/Logger.hpp"
    #include "../MQCommon/Helper.hpp"
    #include "../MQCommon/message.pb.h"
    #include <google/protobuf/map.h>
    #include <iostream>
    #include <unordered_map>
    #include <mutex>
    #include <memory>
    namespace MQ {//1. 定义交换机类struct Exchange {using ptr = std::shared_ptr<Exchange>;//1. 交换机名称std::string _name;//2. 交换机类型MQ::ExchangeType _type;//3. 交换机持久化标志bool _durable;//4. 是否自动删除标志bool _auto_delete;//5. 其他参数google::protobuf::Map<std::string, std::string> _args;Exchange() {}Exchange(const std::string &ename, MQ::ExchangeType etype, bool edurable,bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs):_name(ename), _type(etype), _durable(edurable), _auto_delete(eauto_delete), _args(eargs) {}//args存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=val&key=val....//内部解析str_args字符串,将内容存储到成员中void setArgs(const std::string &str_args) {//key=val&key=val&std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &str : sub_args) {size_t pos = str.find("=");std::string key = str.substr(0, pos);std::string val = str.substr(pos + 1);_args[key] = val;}}//将args中的内容进行序列化后,返回一个字符串std::string getArgs() {std::string result;for (auto start = _args.begin(); start != _args.end(); ++start) {result += start->first + "=" + start->second + "&";}return result;}};using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;//2. 定义交换机数据持久化管理类--数据存储在sqlite数据库中class ExchangeMapper {public:ExchangeMapper(const std::string &dbfile):_sql_helper(dbfile) {std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());createTable();}void createTable() {#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key, \type int, \durable int, \auto_delete int, \args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);if (ret == false) {DLOG("创建交换机数据库表失败!!");abort();//直接异常退出程序}}void removeTable() {#define DROP_TABLE "drop table if exists exchange_table;"bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);if (ret == false) {DLOG("删除交换机数据库表失败!!");abort();//直接异常退出程序}}bool insert(Exchange::ptr &exp) {std::stringstream ss;ss << "insert into exchange_table values(";ss << "'" << exp->_name << "', ";ss << exp->_type << ", ";ss << exp->_durable << ", ";ss << exp->_auto_delete << ", ";ss << "'" << exp->getArgs() << "');";return _sql_helper.exec(ss.str(), nullptr, nullptr);}void remove(const std::string &name) {std::stringstream ss;ss << "delete from exchange_table where name=";ss << "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);}ExchangeMap recovery() {ExchangeMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields) {ExchangeMap *result = (ExchangeMap*)arg;auto exp = std::make_shared<Exchange>();exp->_name = row[0];exp->_type = ( MQ::ExchangeType)std::stoi(row[1]);exp->_durable = (bool)std::stoi(row[2]);exp->_auto_delete = (bool)std::stoi(row[3]);if (row[4]) exp->setArgs(row[4]);result->insert(std::make_pair(exp->_name, exp));return 0;}private:SqliteHelper _sql_helper;};//3. 定义交换机数据内存管理类 class ExchangeManager {public:using ptr = std::shared_ptr<ExchangeManager>;ExchangeManager(const std::string &dbfile) : _mapper(dbfile){_exchanges = _mapper.recovery();}//声明交换机bool declareExchange(const std::string &name,MQ::ExchangeType type, bool durable, bool auto_delete,const google::protobuf::Map<std::string, std::string> &args) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it != _exchanges.end()) {//如果交换机已经存在,那就直接返回,不需要重复新增。return true;}auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);if (durable == true) {bool ret = _mapper.insert(exp);if (ret == false) return false;}_exchanges.insert(std::make_pair(name, exp));return true;}//删除交换机void deleteExchange(const std::string &name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) {return;}if(it->second->_durable == true) _mapper.remove(name);_exchanges.erase(name);}//获取指定交换机对象Exchange::ptr selectExchange(const std::string &name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) {return Exchange::ptr();}return it->second;}//判断交换机是否存在bool exists(const std::string &name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) {return false;}return true;}size_t size() {std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();}//清理所有交换机数据void clear() {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_exchanges.clear();}private:std::mutex _mutex;ExchangeMapper _mapper;ExchangeMap _exchanges;};
    }
    #endif
    
在MQServer中编写Queue.hpp文件实现队列数据管理
  • 在此文件中要实现的功能:

    1. 定义队列描述数据类

      • 队列名称
      • 是否持久化标志
      • 是否独占标志
      • 是否⾃动删除标志
    2. 定义队列数据持久化类(数据持久化的sqlite3数据库中)

      • 创建/删除队列数据表
      • 新增队列数据
      • 移除队列数据
      • 查询所有队列数据
    3. 定义队列数据管理类

      • 创建队列,并添加管理(存在则OK,不存在则创建)
      • 删除队列
      • 获取指定队列
      • 获取所有队列
      • 判断指定队列是否存在
      • 获取队列数量
      • 销毁所有队列数据
  • 实现代码

    #ifndef __M_Queue_H__
    #define __M_Queue_H__
    #include "../MQCommon/Helper.hpp"
    #include "../MQCommon/Logger.hpp"
    #include "../MQCommon/message.pb.h"
    #include <google/protobuf/map.h>
    #include <iostream>
    #include <memory>
    #include <mutex>
    #include <unordered_map>namespace MQ
    {
    //基础队列结构struct Queue{using ptr = std::shared_ptr<Queue>;std::string _name;bool _durable;bool _exclusive;bool _auto_delete;google::protobuf::Map<std::string, std::string> _args;Queue() {}Queue(const std::string &name, bool durable, bool exclusive, bool auto_delete, const google::protobuf::Map<std::string, std::string> &args): _name(name),_durable(durable),_exclusive(exclusive),_auto_delete(auto_delete),_args(args){}bool setArgs(const std::string &str_args){std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &str : sub_args){auto pos = str.find("=");if (pos == std::string::npos){ELOG("Invalid args: ");return false;}std::string key = str.substr(0, pos);std::string value = str.substr(pos + 1);_args[key] = value;}return true;}std::string getArgs(){std::string str_args;for (auto &arg : _args){str_args += arg.first + "=" + arg.second + "&";}return str_args;}};using QueueMap = std::unordered_map<std::string, std::shared_ptr<Queue>>;//队列数据持久化类class QueueMapper{public:QueueMapper() {}QueueMapper(const std::string &db_file) : _sqliteHelper(db_file){// 首先创建数据库文件的父级目录// 随后打开数据库文件,创建数据库表std::string path = FileHelper::parentDirectory(db_file);FileHelper::createDirectory(path);assert(_sqliteHelper.open());createTable();}// 创建队列表bool createTable(){// create table if not exists queue_table(name varchar(255) not null primary key,durable int,exclusive int,auto_delete int,args varchar(255));std::stringstream ss;ss << "create table if not exists queue_table(";ss << "name varchar(255) not null primary key,";ss << "durable int,";ss << "exclusive int,";ss << "auto_delete int,";ss << "args varchar(255));";if (!_sqliteHelper.exec(ss.str(), nullptr, nullptr)){ELOG("创建表: queue_table 失败");abort(); // 如果创建表失败,则直接退出程序,并报异常错误return false;}return true;}bool removeTable(){// drop table if exists queue_table;std::string sql = "drop table if exists queue_table;";if (!_sqliteHelper.exec(sql, nullptr, nullptr)){ELOG("删除表: queue_table 失败");return false;}return true;}// 向数据库中插入一个队列bool insert(const Queue::ptr &queue){// insert into queue_table(name,durable,exclusive,auto_delete,args);std::stringstream ss;ss << "insert into queue_table values(";ss << "'" << queue->_name << "',";ss << queue->_durable << ",";ss << queue->_exclusive << ",";ss << queue->_auto_delete << ",";ss << "'" << queue->getArgs() << "');";if (!_sqliteHelper.exec(ss.str(), nullptr, nullptr)){ELOG("插入队列:%s  失败", queue->_name.c_str());return false;}return true;}// 根据名字删除一个队列bool deleteQueue(const std::string &name){// delete from queue_table where name = 'name';std::stringstream ss;ss << "delete from queue_table where name = '";ss << name << "';";if (!_sqliteHelper.exec(ss.str(), nullptr, nullptr)){ELOG("删除队列:%s  失败", name.c_str());return false;}return true;}QueueMap recovery(){// select name, durable, exclusive, auto_delete, args from queue_table;QueueMap result;std::string sql = "select name, durable, exclusive, auto_delete, args from queue_table;";_sqliteHelper.exec(sql, selectCallback, &result);return result;}~QueueMapper(){_sqliteHelper.close();}private:// 查询时传递的回调函数static int selectCallback(void *arg, int numcol, char **row, char **fields){QueueMap *result = (QueueMap *)arg;Queue::ptr queue = std::make_shared<Queue>();queue->_name = row[0];queue->_durable = (bool)std::stoi(row[1]);queue->_exclusive = (bool)std::stoi(row[2]);queue->_auto_delete = (bool)std::stoi(row[3]);if (row[4])queue->setArgs(row[4]);result->insert(std::make_pair(queue->_name, queue));return 0;}private:SqliteHelper _sqliteHelper;};class QueueManager{public:using ptr = std::shared_ptr<QueueManager>;QueueManager(const std::string &db_file) : _queueMapper(db_file){_queues = _queueMapper.recovery();}bool declareQueue(const std::string &name,bool durable,bool exclusive,bool auto_delete,const google::protobuf::Map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex);// 查找该队列是否存在auto it = _queues.find(name);if (it != _queues.end()){return true;}// 如果该队列不存在,则创建该队列Queue::ptr pqueue = std::make_shared<Queue>(name, durable, exclusive, auto_delete, args);// 如果标记为持久化存储,这里需要将该队列插入数据库if (durable == true){bool ret = _queueMapper.insert(pqueue);if (ret == false)return false;}// 将该队列插入到队列管理器中_queues.insert(std::make_pair(name, pqueue));return true;}bool deleteQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);// 如果没找到,说明无需删除if (it == _queues.end()){return true;}// 如果该队列是持久化的,则在数据库中也删除该队列if (it->second->_durable){_queueMapper.deleteQueue(name);}// 删除该队列_queues.erase(it);return true;}Queue::ptr selectQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end()){return Queue::ptr();}return it->second;}bool exist(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end()){return false;}return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _queues.size();}QueueMap allQueues(){std::unique_lock<std::mutex> lock(_mutex);return _queues;}void clear(){std::unique_lock<std::mutex> lock(_mutex);_queues.clear();_queueMapper.removeTable();}private:std::mutex _mutex;QueueMap _queues;QueueMapper _queueMapper;};
    }
    #endif
    
在MQServer中编写Binding.hpp文件实现绑定信息(交换机-队列)管理
  • 在此文件中要实现的功能:

    1. 定义绑定信息类

      • 交换机名称
      • 队列名称
      • binding_key(分发匹配规则-决定了哪些数据能被交换机放⼊队列)
    2. 定义绑定信息数据持久化类(数据持久化的sqlite3数据库中)

      • 创建/删除绑定信息数据表
      • 新增绑定信息数据
      • 移除指定绑定信息数据
      • 移除指定交换机相关绑定信息数据:移除交换机的时候会被调⽤
      • 移除指定队列相关绑定信息数据:移除队列的时候会被调⽤
      • 查询所有绑定信息数据:⽤于重启服务器时进⾏历史数据恢复
    3. 定义绑定信息数据管理类

      • 创建绑定信息,并添加管理(存在则OK,不存在则创建)
      • 解除指定的绑定信息
      • 删除指定队列的所有绑定信息
      • 删除交换机相关的所有绑定信息
      • 获取交换机相关的所有绑定信息:交换机收到消息后,需要分发给⾃⼰关联的队列
      • 判断指定绑定信息是否存在
      • 获取当前绑定信息数量
      • 销毁所有绑定信息数据
  • 实现代码

    #ifndef __M_Bingding_H__
    #define __M_Bingding_H__
    #include "../MQCommon/Helper.hpp"
    #include "../MQCommon/Logger.hpp"
    #include "../MQCommon/message.pb.h"
    #include <google/protobuf/map.h>
    #include <iostream>
    #include <memory>
    #include <mutex>
    #include <unordered_map>namespace MQ
    {struct Binding{using ptr = std::shared_ptr<Binding>;Binding(const std::string &name_exchange, const std::string &name_queue, const std::string &binding_key): name_exchange(name_exchange),name_queue(name_queue),binding_key(binding_key){}std::string name_exchange;std::string name_queue;std::string binding_key;};// 队列与绑定关系的映射using QueueBindingMap = std::unordered_map<std::string, Binding::ptr>;// 交换机与队列的映射,方便后续删除绑定信息using ExchangeQueueMap = std::unordered_map<std::string, QueueBindingMap>;class BindingMapper{public:BindingMapper(const std::string &db_file_path): _sqlite_helper(db_file_path){std::string parent_path = FileHelper::parentDirectory(db_file_path);FileHelper::createDirectory(parent_path);_sqlite_helper.open();creatTable();}bool creatTable(){std::stringstream ss;ss << "create table if not exists binding_table(";ss << "name_exchange varchar(255) not null,";ss << " name_queue varchar(255) not null,";ss << " binding_key varchar(255) not null);";if (!_sqlite_helper.exec(ss.str(), nullptr, nullptr)){// 如果创建表失败就直接报异常错误ELOG("创建binding_table失败");abort();}return true;}bool removeTable(){std::stringstream ss;ss << "drop table if exists binding_table";if (!_sqlite_helper.exec(ss.str(), nullptr, nullptr)){ELOG("删除binding_table失败");return false;}return true;}bool insert(const Binding::ptr &binding){// insert into binding_table values(name_exchange,name_queue,binding_key)std::stringstream ss;ss << "insert into binding_table values(";ss << "'" << binding->name_exchange << "',";ss << "'" << binding->name_queue << "',";ss << "'" << binding->binding_key << "');";if (!_sqlite_helper.exec(ss.str(), nullptr, nullptr)){ELOG("插入binding_table失败");return false;}return true;}bool remove(const std::string &name_exchange, const std::string &name_queue){// delete from binding_table where name_exchange=xxx and name_queue=xxxstd::stringstream ss;ss << "delete from binding_table where name_exchange='" << name_exchange << "' and name_queue='" << name_queue << "';";if (!_sqlite_helper.exec(ss.str(), nullptr, nullptr)){// 删除绑定信息失败ELOG("remove删除绑定信息失败");return false;}return true;}bool removeByExchange(const std::string &name_exchange){std::stringstream ss;ss << "delete from binding_table where name_exchange='" << name_exchange << "' ;";if (!_sqlite_helper.exec(ss.str(), nullptr, nullptr)){// 删除绑定信息失败ELOG("removeByExchange删除绑定信息失败");return false;}return true;}bool removeByQueue(const std::string &name_queue){std::stringstream ss;ss << "delete from binding_table where name_queue='" << name_queue << "';";if (!_sqlite_helper.exec(ss.str(), nullptr, nullptr)){// 删除绑定信息失败ELOG("removeByQueue删除绑定信息失败");return false;}return true;}ExchangeQueueMap recover(){// select name_exchange,name_queue,binding_key from binding_tableExchangeQueueMap result;std::stringstream ss;ss << "select name_exchange,name_queue,binding_key from binding_table";_sqlite_helper.exec(ss.str(), selectCallback, &result);return result;}~BindingMapper() {}static int selectCallback(void *arg, int numcol, char **row, char **fields){ExchangeQueueMap *result = (ExchangeQueueMap *)arg;Binding::ptr binding = std::make_shared<Binding>(row[0], row[1], row[2]);// 防止交换机信息已经存在,导致原信息被覆盖// 所以利用unordered_map的[]运算符重载,有则拿出对应交换机所对应的队列哈希表,若没有则创建QueueBindingMap &queue_binding_map = (*result)[binding->name_exchange];queue_binding_map.insert(std::make_pair(binding->name_queue, binding));return 0;}private:SqliteHelper _sqlite_helper;};class BindingManager{public:using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string &db_file_path): _binding_mapper(db_file_path){_exchange_queue_map = _binding_mapper.recover();}bool bind(const std::string &name_exchange, const std::string &name_queue, const std::string &binding_key, bool durable){std::unique_lock<std::mutex> lock(_mutex);auto binding = std::make_shared<Binding>(name_exchange, name_queue, binding_key);auto exchange_it = _exchange_queue_map.find(name_exchange);// 如果绑定信息存在,则不用绑定// 先查交换机是否存在,再查队列是否存在if (exchange_it != _exchange_queue_map.end()){auto queue_it = exchange_it->second.find(name_queue);if (queue_it != exchange_it->second.end()){DLOG("binding信息已经存在,不用绑定");return true;}}QueueBindingMap &queue_bingding_map = _exchange_queue_map[binding->name_exchange];queue_bingding_map.insert(std::make_pair(binding->name_queue, binding));if (durable)_binding_mapper.insert(binding);return true;}void unbind(const std::string &name_exchange, const std::string &name_queue){std::unique_lock<std::mutex> lock(_mutex);auto exchange_it = _exchange_queue_map.find(name_exchange);if (exchange_it == _exchange_queue_map.end()){return;}auto queue_it = exchange_it->second.find(name_queue);if (queue_it == exchange_it->second.end()){return;}_exchange_queue_map[name_exchange].erase(name_queue);_binding_mapper.remove(name_exchange, name_queue);return;}void unbindByExchange(const std::string &name_exchange){std::unique_lock<std::mutex> lock(_mutex);auto exchange_it = _exchange_queue_map.find(name_exchange);if (exchange_it == _exchange_queue_map.end()){return;}_exchange_queue_map.erase(name_exchange);_binding_mapper.removeByExchange(name_exchange);return;}void unbindByQueue(const std::string &name_queue){std::unique_lock<std::mutex> lock(_mutex);for (auto &exchange_queue_binding : _exchange_queue_map){exchange_queue_binding.second.erase(name_queue);_binding_mapper.removeByQueue(name_queue);}}bool exist(const std::string &name_exchange, const std::string &name_queue){std::unique_lock<std::mutex> lock(_mutex);auto exchange_it = _exchange_queue_map.find(name_exchange);if (exchange_it == _exchange_queue_map.end()){return false;}auto queue_it = exchange_it->second.find(name_queue);if (queue_it == exchange_it->second.end()){return false;}return true;}void clear(){_binding_mapper.removeTable();_exchange_queue_map.clear();}size_t size(){std::unique_lock<std::mutex> lock(_mutex);size_t count = 0;for(auto &exchange_queue_binding : _exchange_queue_map){count += exchange_queue_binding.second.size();}return count;}Binding::ptr getBinding(const std::string &name_exchange, const std::string &name_queue){std::unique_lock<std::mutex> lock(_mutex);auto exchange_it = _exchange_queue_map.find(name_exchange);if (exchange_it == _exchange_queue_map.end()){return Binding::ptr();}auto queue_it = exchange_it->second.find(name_queue);if (queue_it == exchange_it->second.end()){return Binding::ptr();}return queue_it->second;}QueueBindingMap getExchangeBindings(const std::string &name_exchange){std::unique_lock<std::mutex> lock(_mutex);auto exchange_it = _exchange_queue_map.find(name_exchange);if(exchange_it == _exchange_queue_map.end()){ELOG("交换机不存在");return QueueBindingMap();}return exchange_it->second;}~BindingManager(){}private:std::mutex _mutex;ExchangeQueueMap _exchange_queue_map;BindingMapper _binding_mapper;};
    }
    #endif
    
在MQServer中编写Message.hpp文件实现绑定消息管理
  • 需要在此文件中实现的功能:

    1. 消息的持久化管理

      • 管理数据

        1. 队列消息⽂件存储的路径
        2. 队列消息的存储⽂件名
        3. 队列消息的临时交换⽂件名
      • 管理操作

        1. ⽇志消息存储在⽂件中(4B⻓度+(属性+内容+有效位)序列化消息,连续存储即可)
        2. 提供队列消息⽂件创建/删除功能
        3. 提供队列消息的新增持久化/删除持久化
        4. 提供持久化内容的垃圾回收(其实就是重新加载出所有有效消息返回,并重新⽣成新的消息存储⽂件)
    2. 消息的管理(以队列为单位进⾏管理)

      • 队列消息管理数据

        1. 队列名称
        2. 待推送消息链表
        3. 持久化消息hash
        4. 待确认消息hash
        5. 有效消息数量
        6. 已经持久化消息总量
        7. 持久化管理句柄
      • 队列管理操作

        1. 新增消息
        2. 获取队⾸消息(获取的同时将消息加⼊待确认队列)
        3. 移除指定待确认消息
        4. 获取队列待消费&待确认消息数量
        5. 恢复队列历史消息。
        6. 销毁队列所有消息
        7. 判断队列消息是否为空
    3. 消息的总体对外管理

      • 管理的操作
        1. 初始化新建队列的消息管理结构,并创建消息存储⽂件
        2. 删除队列的消息管理结构,以及消息存储⽂件
        3. 向指定队列新增消息
        4. 获取指定队列队⾸消息
        5. 确认指定队列待确认消息(删除)
        6. 判断指定队列消息是否为空
  • 实现代码

    #ifndef __M_MESSAGE_H__
    #define __M_MESSAGE_H__
    #include "../MQCommon/Helper.hpp"
    #include "../MQCommon/Logger.hpp"
    #include "../MQCommon/message.pb.h"
    #include <google/protobuf/map.h>
    #include <iostream>
    #include <list>
    #include <memory>
    #include <mutex>
    #include <unordered_map>namespace MQ
    {
    #define DATAFILE_SUBFIX ".message_data"
    #define TMPFILE_SUBFIX ".message_data.tmp"using MessagePtr = std::shared_ptr<MQ::Message>;class MessageMapper{public:MessageMapper(std::string &path, const std::string &queue_name): _name_queue(queue_name){if (path.back() != '/')path += '/';_name_data_file = path + queue_name + DATAFILE_SUBFIX;_name_temp_file = path + queue_name + TMPFILE_SUBFIX;if (!FileHelper(path).exists()){assert(FileHelper::createDirectory(path));}if (!FileHelper(_name_data_file).exists())creatFile();}bool insertDataFile(MessagePtr &message){return insert(_name_data_file, message);}bool insertTempFile(MessagePtr &message){return insert(_name_temp_file, message);}bool remove(MessagePtr &message){std::string filename = _name_data_file;size_t offset = message->offset();size_t length = message->length();// 修改文件的有效标志message->mutable_payload()->set_valid("0");std::string new_load = message->payload().SerializeAsString();// 判断新的载荷与原载荷长度是否相同,来防止覆盖掉别的消息if (new_load.size() != length){ELOG("消息载荷长度与原载荷长度不同");return false;}// 将修改合法标志的数据写入文件(覆盖原载荷的位置)FileHelper file_helper(filename);file_helper.write(new_load.c_str(), offset, new_load.size());return true;}std::list<MessagePtr> garbageCollection(){bool ret;std::list<MessagePtr> result;ret = load(result);if (ret == false){DLOG("加载有效数据失败!\n");return result;}// DLOG("垃圾回收,得到有效消息数量:%d", result.size());// 2. 将有效数据,进行序列化存储到临时文件中FileHelper::createFile(_name_temp_file);for (auto &msg : result){DLOG("向临时文件写入数据: %s", msg->payload().body().c_str());ret = insert(_name_temp_file, msg);if (ret == false){DLOG("向临时文件写入消息数据失败!!");return result;}}// DLOG("垃圾回收后,向临时文件写入数据完毕,临时文件大小: %ld", FileHelper(_name_temp_file).size());// 3. 删除源文件ret = FileHelper::removeFile(_name_data_file);if (ret == false){DLOG("删除源文件失败!");return result;}// 4. 修改临时文件名,为源文件名称ret = FileHelper(_name_temp_file).rename(_name_data_file);if (ret == false){DLOG("修改临时文件名称失败!");return result;}// 5. 返回新的有效数据return result;}void removeFile(){FileHelper::removeFile(_name_data_file);FileHelper::removeFile(_name_temp_file);}private:bool load(std::list<MessagePtr> &result){// 挑选出有效信息FileHelper file_helper(_name_data_file);size_t offset = 0, length = 0;while (offset < file_helper.size()){// 读取消息长度file_helper.read((char *)&length, offset, sizeof(size_t));offset += sizeof(size_t);// 读取消息std::string load_str(length, '\0');file_helper.read(&load_str[0], offset, length);offset += length;// 反序列化消息MessagePtr message = std::make_shared<MQ::Message>();message->mutable_payload()->ParseFromString(load_str);// 判断消息是否有效if (message->payload().valid() == std::string("0")){DLOG("该消息无效,不用插入队列");continue;}// 将有效消息插入队列result.push_back(message);}return true;}bool creatFile(){if (FileHelper(_name_data_file).exists()){return true;}return FileHelper::createFile(_name_data_file);}bool insert(const std::string &filename, MessagePtr &message){bool ret = true;// 将消息中的消息载荷序列化std::string load = message->payload().SerializeAsString();// 找到消息位置的偏移量FileHelper file_helper(filename);// 偏移量size_t offset = file_helper.size();// 消息的长度size_t length = load.size();// 先将消息的长度写入文件ret = file_helper.write((char *)&length, offset, sizeof(size_t));if (ret == false){ELOG("写入消息长度失败");return false;}// 将消息插入文件ret = file_helper.write(load.c_str(), offset + sizeof(size_t), length);if (ret == false){ELOG("写入消息失败");return false;}// 设置message的偏移量和长度message->set_offset(offset + sizeof(size_t));message->set_length(length);return true;}private:std::string _name_data_file;std::string _name_temp_file;std::string _name_queue;};// 队列消息类,主要是负责消息与队列之间的关系class QueueMessage{public:using ptr = std::shared_ptr<QueueMessage>;QueueMessage(std::string &path, const std::string &qname) : _qname(qname), _valid_count(0), _total_count(0), _mapper(path, qname){}~QueueMessage() {}// 传入队列消息的属性、消息体、是否持久化bool insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable){// 1. 构造消息对象MessagePtr msg = std::make_shared<MQ::Message>();msg->mutable_payload()->set_body(body);// 如果消息属性不为空,则使用传入的属性,设置,否则使用默认属性if (bp != nullptr){DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(bp->id());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());}else{DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key("");}std::unique_lock<std::mutex> lock(_mutex);// 2. 判断消息是否需要持久化if (msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE){msg->mutable_payload()->set_valid("1"); // 在持久化存储中表示数据有效// 3. 进行持久化存储bool ret = _mapper.insertDataFile(msg);if (ret == false){DLOG("持久化存储消息:%s 失败了!", body.c_str());return false;}_valid_count += 1; // 持久化信息中的数据量+1_total_count += 1;_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}// 4. 内存的管理_msgs.push_back(msg);return true;}bool remove(const std::string &msg_id){std::unique_lock<std::mutex> lock(_mutex);// 1. 从待确认队列中查找消息auto it = _waitack_msgs.find(msg_id);if (it == _waitack_msgs.end()){DLOG("没有找到要删除的消息:%s!", msg_id.c_str());return true;}// 2. 根据消息的持久化模式,决定是否删除持久化信息if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE){// 3. 删除持久化信息_mapper.remove(it->second);_durable_msgs.erase(msg_id);_valid_count -= 1;   // 持久化文件中有效消息数量 -1garbageCollection(); // 内部判断是否需要垃圾回收,需要的话则回收一下}// 4. 删除内存中的信息_waitack_msgs.erase(msg_id);// DLOG("确认消息后,删除消息的管理成功:%s", it->second->payload().body().c_str());return true;}bool recovery(){// 恢复历史消息std::unique_lock<std::mutex> lock(_mutex);_msgs = _mapper.garbageCollection();for (auto &msg : _msgs){_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}_valid_count = _total_count = _msgs.size();return true;}// 从队首取出消息MessagePtr front(){std::unique_lock<std::mutex> lock(_mutex);if (_msgs.empty()){return MessagePtr();}// 获取一条队首消息:从_msgs中取出数据MessagePtr msg = _msgs.front();_msgs.pop_front();// 将该消息对象,向待确认的hash表中添加一份,等到收到消息确认后进行删除_waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));return msg;}size_t getAbleCount(){std::unique_lock<std::mutex> lock(_mutex);return _msgs.size();}size_t getTotalCount(){std::unique_lock<std::mutex> lock(_mutex);return _total_count;}size_t getDurableCount(){std::unique_lock<std::mutex> lock(_mutex);return _durable_msgs.size();}size_t getWaitackCount(){std::unique_lock<std::mutex> lock(_mutex);return _waitack_msgs.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeFile();_msgs.clear();_durable_msgs.clear();_waitack_msgs.clear();_valid_count = 0;_total_count = 0;}private:bool garbageCollectionCheck(){// 持久化的消息总量大于2000, 且其中有效比例低于50%则需要持久化if (_total_count > 2000 && _valid_count * 10 / _total_count < 5){return true;}return false;}void garbageCollection(){// 1. 进行垃圾回收,获取到垃圾回收后,有效的消息信息链表if (garbageCollectionCheck() == false)return;std::list<MessagePtr> msgs = _mapper.garbageCollection();for (auto &msg : msgs){auto it = _durable_msgs.find(msg->payload().properties().id());if (it == _durable_msgs.end()){DLOG("垃圾回收后,有一条持久化消息,在内存中没有进行管理!");_msgs.push_back(msg); /// 做法:重新添加到推送链表的末尾_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));continue;}// 2. 更新每一条消息的实际存储位置it->second->set_offset(msg->offset());it->second->set_length(msg->length());}// 3. 更新当前的有效消息数量 & 总的持久化消息数量_valid_count = _total_count = msgs.size();}private:std::mutex _mutex;std::string _qname;size_t _valid_count;size_t _total_count;//文件里的总数据量,内存里的不计入其中MessageMapper _mapper;std::list<MessagePtr> _msgs;                               // 待推送消息std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息hashstd::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息hash};class MessageManager{public:using ptr = std::shared_ptr<MessageManager>;MessageManager(const std::string &basedir) : _basedir(basedir) {}~MessageManager() {}void initQueueMessage(const std::string &qname){QueueMessage::ptr qmp;{// 查找是否已经存在该队列的消息管理对象std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it != _queue_msgs.end()){return;}// 如果没找到,说明要新增qmp = std::make_shared<QueueMessage>(_basedir, qname);_queue_msgs.insert(std::make_pair(qname, qmp));}// 恢复历史消息qmp->recovery();}void clear(){std::unique_lock<std::mutex> lock(_mutex);for (auto &qmsg : _queue_msgs){qmsg.second->clear();}}void destroyQueueMessage(const std::string &qname){QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);// 没找到,说明无需删除if (it == _queue_msgs.end()){return;}qmp = it->second;_queue_msgs.erase(it);}// 销毁消息管理对象qmp->clear();}bool insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool queue_is_durable){QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){ELOG("向队列%s新增消息失败:没有找到消息管理句柄!", qname.c_str());return false;}qmp = it->second;}return qmp->insert(bp, body, queue_is_durable);}MessagePtr front(const std::string &qname){QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("获取队列%s队首消息失败:没有找到消息管理句柄!", qname.c_str());return MessagePtr();}qmp = it->second;}return qmp->front();}// 确认消息,实际上就是确认消息之后,删除待确认消息里的对应消息void ack(const std::string &qname, const std::string &msg_id){QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){ELOG("确认队列%s消息%s失败:没有找到消息管理句柄!", qname.c_str(), msg_id.c_str());return;}qmp = it->second;}qmp->remove(msg_id);return;}size_t getAbleCount(const std::string &qname){QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("获取队列%s待推送消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->getAbleCount();}size_t getTotalCount(const std::string &qname){QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("获取队列%s总持久化消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->getTotalCount();}size_t getDurableCount(const std::string &qname){QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("获取队列%s有效持久化消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->getDurableCount();}size_t getWaitAckCount(const std::string &qname){QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()){DLOG("获取队列%s待确认消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->getWaitackCount();}private:std::mutex _mutex;std::string _basedir;std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;};
    }#endif
    
在MQServer中编写VirtualHost.hpp文件实现虚拟机管理
  • 虚拟机模块是对上述几个数据管理模块的整合,并基于数据之间的关联关系进⾏联合操作。

  • 在此文件要实现的功能有:

    • 虚拟机需要管理的数据:

      1. 交换机数据管理模块句柄
      2. 队列数据管理模块句柄
      3. 绑定数据管理模块句柄
      4. 消息数据管理模块句柄
    • 虚拟机需要管理的操作:

      1. 提供声明交换机的功能(存在则OK,不存在则创建)
      2. 提供删除交换机的功能(删除交换机的同时删除关联绑定信息)
      3. 提供声明队列的功能(存在则OK,不存在则创建,创建的同时创建队列关联消息管理对象)
      4. 提供删除队列的功能(删除队列的同时删除关联绑定信息,删除关联消息管理对象及队列所有消息)
      5. 提供交换机-队列绑定的功能
      6. 提供交换机-队列解绑的功能
      7. 提供获取交换机相关的所有绑定信息功能
      8. 提供新增消息的功能
      9. 提供获取指定队列队⾸消息的功能
      10. 提供消息确认删除的功能
  • 实现代码

    #ifndef __M_Virtualhost_H__
    #define __M_Virtualhost_H__#include "../MQCommon/Helper.hpp"
    #include "../MQCommon/Logger.hpp"
    #include "Binding.hpp"
    #include "Exchange.hpp"
    #include "Message.hpp"
    #include "Queue.hpp"
    #include <google/protobuf/map.h>
    #include <iostream>
    #include <memory>
    #include <mutex>
    #include <unordered_map>namespace MQ
    {class VirtualHost{public:using ptr = std::shared_ptr<VirtualHost>;VirtualHost(const std::string &host_name, const std::string &base_dir, const std::string &db_file): _host_name(host_name),_exchange_manager_pointer(std::make_shared<ExchangeManager>(db_file)),_queue_manager_pointer(std::make_shared<QueueManager>(db_file)),_message_manager_pointer(std::make_shared<MessageManager>(base_dir)),_binding_manager_pointer(std::make_shared<BindingManager>(db_file)){QueueMap queue_map = _queue_manager_pointer->allQueues();for (auto &queue_pair : queue_map){_message_manager_pointer->initQueueMessage(queue_pair.first);}}bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,const google::protobuf::Map<std::string, std::string> &args){return _exchange_manager_pointer->declareExchange(name, type, durable, auto_delete, args);}void deleteExchange(const std::string &name){// 删除交换机的时候,需要将交换机相关的绑定信息也删除掉。_binding_manager_pointer->unbindByExchange(name);return _exchange_manager_pointer->deleteExchange(name);}bool existExchange(const std::string &name){return _exchange_manager_pointer->exists(name);}Exchange::ptr selectExchange(const std::string &ename){return _exchange_manager_pointer->selectExchange(ename);}bool declareQueue(const std::string &qname,bool qdurable,bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs){// 初始化队列的消息句柄(消息的存储管理)// 队列的创建_message_manager_pointer->initQueueMessage(qname);return _queue_manager_pointer->declareQueue(qname, qdurable, qexclusive, qauto_delete, qargs);}bool deleteQueue(const std::string &name){// 删除的时候队列相关的数据有两个:队列的消息,队列的绑定信息_message_manager_pointer->destroyQueueMessage(name);_binding_manager_pointer->unbindByQueue(name);return _queue_manager_pointer->deleteQueue(name);}bool existQueue(const std::string &name){return _queue_manager_pointer->exist(name);}QueueMap allQueues(){return _queue_manager_pointer->allQueues();}bool bind(const std::string &ename, const std::string &qname, const std::string &key){Exchange::ptr ep = _exchange_manager_pointer->selectExchange(ename);if (ep.get() == nullptr){DLOG("进行队列绑定失败,交换机%s不存在!", ename.c_str());return false;}Queue::ptr mqp = _queue_manager_pointer->selectQueue(qname);if (mqp.get() == nullptr){DLOG("进行队列绑定失败,队列%s不存在!", qname.c_str());return false;}return _binding_manager_pointer->bind(ename, qname, key, ep->_durable && mqp->_durable);}void unBind(const std::string &ename, const std::string &qname){return _binding_manager_pointer->unbind(ename, qname);}QueueBindingMap exchangeBindings(const std::string &ename){return _binding_manager_pointer->getExchangeBindings(ename);}bool existBinding(const std::string &ename, const std::string &qname){return _binding_manager_pointer->exist(ename, qname);}bool basicPublish(const std::string &qname, BasicProperties *bp, const std::string &body){Queue::ptr mqp = _queue_manager_pointer->selectQueue(qname);if (mqp.get() == nullptr){DLOG("发布消息失败,队列%s不存在!", qname.c_str());return false;}return _message_manager_pointer->insert(qname, bp, body, mqp->_durable);}MessagePtr basicConsume(const std::string &qname){return _message_manager_pointer->front(qname);}void basicAck(const std::string &qname, const std::string &msgid){return _message_manager_pointer->ack(qname, msgid);}void clear(){_exchange_manager_pointer->clear();_queue_manager_pointer->clear();_binding_manager_pointer->clear();_message_manager_pointer->clear();}~VirtualHost() {}private:std::string _host_name;ExchangeManager::ptr _exchange_manager_pointer;QueueManager::ptr _queue_manager_pointer;MessageManager::ptr _message_manager_pointer;BindingManager::ptr _binding_manager_pointer;};
    }
    #endif
    
在MQServer中编写Route.hpp文件实现交换机路由管理
  • 路由模块负责判断生产者发布的消息应该被推送到那个队列中区

  • 在此文件需要实现的功能:

    • 键验证功能:验证RoutingKey和BindingKey是否合法
    • 路由匹配功能:通过比较RoutingKey和BindingKey来进行分配
  • 实现代码

    #ifndef __M_Route_H__
    #define __M_Route_H__#include "../MQCommon/Helper.hpp"
    #include "../MQCommon/message.pb.h"
    #include <string>
    #include <vector>
    namespace MQ
    {class RouteManager{public:// 判断是否是合法的routingkey,routingkey是设置在交换机里的static bool isValidRoutingKey(const std::string &routing_key){// routing_key:只需要判断是否包含有非法字符即可, 合法字符( a~z, A~Z, 0~9, ., _)for (int i = 0; i < routing_key.size(); i++){if ((routing_key[i] >= 'a' && routing_key[i] <= 'z') ||(routing_key[i] >= 'A' && routing_key[i] <= 'Z') ||(routing_key[i] >= '0' && routing_key[i] <= '9') ||(routing_key[i] == '_') ||(i - 1 >= 0 && routing_key[i] == '.' && routing_key[i - 1] != routing_key[i])){continue;}return false;}return true;}// 判断是否是合法的bindingkey,bindingkey是发布客户端给的static bool isValidBindingKey(const std::string &binding_key){// 1. 判断是否包含有非法字符, 合法字符:a~z, A~Z, 0~9, ., _, *, #for (auto &ch : binding_key){if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.') ||(ch == '*' || ch == '#')){continue;}return false;}// 2. *和#必须独立存在:  news.music#.*.#std::vector<std::string> sub_words;StrHelper::split(binding_key, ".", sub_words);for (std::string &word : sub_words){if (word.size() > 1 &&(word.find("*") != std::string::npos ||word.find("#") != std::string::npos)){return false;}}// 3. *和#不能连续出现for (int i = 1; i < sub_words.size(); i++){if (sub_words[i] == "#" && sub_words[i - 1] == "*"){return false;}if (sub_words[i] == "#" && sub_words[i - 1] == "#"){return false;}if (sub_words[i] == "*" && sub_words[i - 1] == "#"){return false;}}return true;}static bool route(ExchangeType type, const std::string &routing_key, const std::string &binding_key){// 如果是直接交换机,只需要判断routing_key是否与binding_key相同if (type == ExchangeType::DIRECT){return (routing_key == binding_key);}else if (type == ExchangeType::FANOUT) // 如果是广播交换机,则无需操作{return true;}else if (type == ExchangeType::TOPIC) // 主题交换:要进行模式匹配    news.#   &   news.music.pop{// 1. 将binding_key与routing_key进行字符串分割,得到各个的单词数组std::vector<std::string> bkeys, rkeys;int count_binding_key = StrHelper::split(binding_key, ".", bkeys);int count_routing_key = StrHelper::split(routing_key, ".", rkeys);// 2. 定义标记数组,并初始化[0][0]位置为true,其他位置为falsestd::vector<std::vector<bool>> dp(count_binding_key + 1, std::vector<bool>(count_routing_key + 1, false));dp[0][0] = true;// 3. 如果binding_key以#起始,则将#对应行的第0列置为1.for (int i = 1; i <= count_binding_key; i++){if (bkeys[i - 1] == "#"){dp[i][0] = true;continue;}break;}// 4. 使用routing_key中的每个单词与binding_key中的每个单词进行匹配并标记数组for (int i = 1; i <= count_binding_key; i++){for (int j = 1; j <= count_routing_key; j++){// 如果当前bkey是个*,或者两个单词相同,表示单词匹配成功,则从左上方继承结果if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*"){dp[i][j] = dp[i - 1][j - 1];}else if (bkeys[i - 1] == "#"){// 如果当前bkey是个#,则需要从左上,左边,上边继承结果dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}}}return dp[count_binding_key][count_routing_key];}}};
    }#endif
    
在MQServer中编写Consumer.hpp文件实现队列消费者/订阅者管理
  • 消费者管理–以队列为单元进⾏管理-队列消费者管理结构

  • 需要管理的操作:

    1. 新增消费者:信道提供的服务是订阅队列消息的时候创建
    2. 删除消费者:取消订阅 / 信道关闭 / 连接关闭 的时候删除
    3. 获取消费者:从队列所有的消费者中按序取出⼀个消费者进⾏消息的推送
    4. 判断队列消费者是否为空
    5. 判断指定消费者是否存在
    6. 清理队列所有消费者
  • 需要管理的数据

    1. 消费者管理结构:vector
    2. 轮转序号:⼀个队列可能会有多个消费者,但是⼀条消息只需要被⼀个消费者消费即可,因此采⽤RR轮转
    3. 互斥锁:保证线程安全
    4. 队列名称
  • 对消费者进⾏统⼀管理结构

    1. 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)
    2. 向指定队列新增消费者(客⼾端订阅指定队列消息的时候):新增完成的时候返回消费者对象
    3. 从指定队列移除消费者(客⼾端取消订阅的时候)
    4. 移除指定队列的所有消费者(队列被删除时销毁):删除消费者的队列管理单元对象
    5. 从指定队列获取⼀个消费者(轮询获取-消费者轮换消费起到负载均衡的作⽤)
    6. 判断队列中消费者是否为空
    7. 判断队列中指定消费者是否存在
    8. 清理所有消费者
  • 实现代码

    #ifndef __M_CONSUMER_H__
    #define __M_CONSUMER_H__
    #include "../MQCommon/Helper.hpp"
    #include "../MQCommon/Logger.hpp"
    #include "../MQCommon/message.pb.h"
    #include <google/protobuf/map.h>
    #include <iostream>
    #include <memory>
    #include <mutex>
    #include <unordered_map>namespace MQ
    {// 回调函数// 第一个参数为消息标识,第二个参数为消息属性,第三个参数为要处理的消息体using ConsumerCallback = std::function<void(const std::string, const BasicProperties *bp, const std::string)>;struct Consumer{// 自动应答标志bool _auto_ack;// 订阅的队列名称std::string _subscribe_queue_name;// 消费者标识std::string _consumer_tag;// 消费者回调函数ConsumerCallback _callback;// 指针using ptr = std::shared_ptr<Consumer>;// 构造函数Consumer() {}Consumer(const std::string &consumer_tag, const std::string &subscribe_queue_name, bool auto_ack, const ConsumerCallback &callback): _auto_ack(auto_ack),_subscribe_queue_name(subscribe_queue_name),_consumer_tag(consumer_tag),_callback(callback){}// 析构函数virtual ~Consumer() {}};// 以队列为单元的消费者管理结构class QueueConsumer{public:using ptr = std::shared_ptr<QueueConsumer>;QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0) {}// 队列新增消费者Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag,  ConsumerCallback& cb){// 1. 加锁std::unique_lock<std::mutex> lock(_mutex);// 2. 判断消费者是否重复for (auto &consumer : _consumers){if (consumer->_consumer_tag == ctag){return Consumer::ptr();}}// 3. 没有重复则新增--构造对象auto consumer = std::make_shared<Consumer>(ctag, queue_name, ack_flag, cb);// 4. 添加管理后返回对象_consumers.push_back(consumer);return consumer;}// 队列移除消费者void remove(const std::string &ctag){// 1. 加锁std::unique_lock<std::mutex> lock(_mutex);// 2. 遍历查找-删除for (auto it = _consumers.begin(); it != _consumers.end(); ++it){if ((*it)->_consumer_tag == ctag){_consumers.erase(it);return;}}return;}// 队列获取消费者:RR轮转获取Consumer::ptr choose(){// 1. 加锁std::unique_lock<std::mutex> lock(_mutex);if (_consumers.size() == 0){return Consumer::ptr();}// 2. 获取当前轮转到的下标int idx = _rr_seq % _consumers.size();_rr_seq++;// 3. 获取对象,返回return _consumers[idx];}// 是否为空bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _consumers.size() == 0;}// 判断指定消费者是否存在bool exists(const std::string &ctag){std::unique_lock<std::mutex> lock(_mutex);// 2. 遍历查找for (auto it = _consumers.begin(); it != _consumers.end(); ++it){if ((*it)->_consumer_tag == ctag){return true;}}return false;}// 清理所有消费者void clear(){std::unique_lock<std::mutex> lock(_mutex);_consumers.clear();_rr_seq = 0;}private:std::string _qname;std::mutex _mutex;uint64_t _rr_seq; // 轮转序号std::vector<Consumer::ptr> _consumers;};class ConsumerManager{public:using ptr = std::shared_ptr<ConsumerManager>;ConsumerManager() {}void initQueueConsumer(const std::string &qname){// 1. 加锁std::unique_lock<std::mutex> lock(_mutex);// 2. 重复判断auto it = _qconsumers.find(qname);if (it != _qconsumers.end()){return;}// 3. 新增auto qconsumers = std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname, qconsumers));}void destroyQueueConsumer(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);_qconsumers.erase(qname);}Consumer::ptr createConsumer(const std::string &ctag, const std::string &queue_name, bool ack_flag,  ConsumerCallback cb){// 获取队列的消费者管理单元句柄,通过句柄完成新建QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->create(ctag, queue_name, ack_flag, cb);}void removeConsumer(const std::string &ctag, const std::string &queue_name){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return;}qcp = it->second;}return qcp->remove(ctag);}Consumer::ptr chooseConsumer(const std::string &queue_name){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->choose();}bool isEmpty(const std::string &queue_name){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false;}qcp = it->second;}return qcp->empty();}bool isExist(const std::string &ctag, const std::string &queue_name){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false;}qcp = it->second;}return qcp->exists(ctag);}void clear(){std::unique_lock<std::mutex> lock(_mutex);_qconsumers.clear();}private:std::mutex _mutex;std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;};
    }
    #endif
    
在MQServer中编写Channel.hpp文件实现队列信道管理
  • 在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴

  • ⽽信道模块就是再次将上述模块进⾏整合提供服务的模块

  • 在该文件中要实现的功能:

    • 信道管理的信息:

      1. 信道ID:信道的唯⼀标识
      2. 信道关联的消费者:⽤于消费者信道在关闭的时候取消订阅,删除订阅者信息
      3. 信道关联的连接:⽤于向客⼾端发送数据(响应,推送的消息)
      4. protobuf协议处理句柄:⽹络通信前的协议处理
      5. 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息
      6. 虚拟机句柄:交换机/队列/绑定/消息数据管理
      7. ⼯作线程池句柄(⼀条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)
    • 信道管理的操作:

      1. 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)
      2. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
      3. 提供绑定&解绑队列操作
      4. 提供订阅&取消订阅队列消息操作
      5. 提供发布&确认消息操作
  • 实现代码

    #ifndef __M_Channel_H__
    #define __M_Channel_H__#include "../MQCommon/Helper.hpp"
    #include "../MQCommon/Logger.hpp"
    #include "../MQCommon/ThreadPool.hpp"
    #include "../MQCommon/message.pb.h"
    #include "../MQCommon/request.pb.h"
    #include "Consumer.hpp"
    #include "Route.hpp"
    #include "VirtualHost.hpp"
    #include "muduo/net/TcpConnection.h"
    #include "muduo/protobuf/codec.h"namespace MQ
    {// 指针的定义using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;// 开/关信道请求using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;// 声明/删除交换机请求using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;// 声明/删除队列请求using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;// 绑定/解绑队列请求using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;// 发布/确认/消费/取消消息请求using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;class Channel{public:using ptr = std::shared_ptr<Channel>;Channel(const std::string &id_channel,const VirtualHost::ptr &virtualhost_ptr,const ConsumerManager::ptr &consumer_manager_ptr,const ProtobufCodecPtr &codec_ptr,const muduo::net::TcpConnectionPtr &connection_ptr,const ThreadPool::ptr &threadpool_ptr): _id_channel(id_channel),_virtualhost_ptr(virtualhost_ptr),_consumer_manager_ptr(consumer_manager_ptr),_connection_ptr(connection_ptr),_threadpool_ptr(threadpool_ptr){DLOG("new Channel: %p", this);}~Channel(){if (_consumer_ptr.get() != nullptr){_consumer_manager_ptr->removeConsumer(_consumer_ptr->_consumer_tag, _consumer_ptr->_subscribe_queue_name);}DLOG("del Channel: %p", this);}// 交换机的声明与删除void declareExchange(const declareExchangeRequestPtr &req){bool ret = _virtualhost_ptr->declareExchange(req->exchange_name(),req->exchange_type(), req->durable(),req->auto_delete(), req->args());return basicResponse(ret, req->rid(), req->cid());}void deleteExchange(const deleteExchangeRequestPtr &req){_virtualhost_ptr->deleteExchange(req->exchange_name());return basicResponse(true, req->rid(), req->cid());}// 队列的声明与删除void declareQueue(const declareQueueRequestPtr &req){bool ret = _virtualhost_ptr->declareQueue(req->queue_name(),req->durable(), req->exclusive(),req->auto_delete(), req->args());if (ret == false){return basicResponse(false, req->rid(), req->cid());}_consumer_manager_ptr->initQueueConsumer(req->queue_name()); // 初始化队列的消费者管理句柄return basicResponse(true, req->rid(), req->cid());}void deleteQueue(const deleteQueueRequestPtr &req){_consumer_manager_ptr->destroyQueueConsumer(req->queue_name());_virtualhost_ptr->deleteQueue(req->queue_name());return basicResponse(true, req->rid(), req->cid());}// 队列的绑定与解除绑定void queueBind(const queueBindRequestPtr &req){bool ret = _virtualhost_ptr->bind(req->exchange_name(),req->queue_name(), req->binding_key());return basicResponse(ret, req->rid(), req->cid());}void queueUnBind(const queueUnBindRequestPtr &req){_virtualhost_ptr->unBind(req->exchange_name(), req->queue_name());return basicResponse(true, req->rid(), req->cid());}// 消息的发布void basicPublish(const basicPublishRequestPtr &req){// 1. 判断交换机是否存在auto ep = _virtualhost_ptr->selectExchange(req->exchange_name());if (ep.get() == nullptr){return basicResponse(false, req->rid(), req->cid());}// 2. 进行交换路由,判断消息可以发布到交换机绑定的哪个队列中QueueBindingMap mqbm = _virtualhost_ptr->exchangeBindings(req->exchange_name());BasicProperties *properties = nullptr;std::string routing_key;if (req->has_properties()){properties = req->mutable_properties();routing_key = properties->routing_key();}for (auto &binding : mqbm){if (RouteManager::route(ep->_type, routing_key, binding.second->binding_key)){// 3. 将消息添加到队列中(添加消息的管理)_virtualhost_ptr->basicPublish(binding.first, properties, req->body());// 4. 向线程池中添加一个消息消费任务(向指定队列的订阅者去推送消息--线程池完成)auto task = std::bind(&Channel::consume, this, binding.first);_threadpool_ptr->push(task);}}return basicResponse(true, req->rid(), req->cid());}// 消息的确认void basicAck(const basicAckRequestPtr &req){_virtualhost_ptr->basicAck(req->queue_name(), req->message_id());return basicResponse(true, req->rid(), req->cid());}// 订阅队列消息void basicConsume(const basicConsumeRequestPtr &req){// 1. 判断队列是否存在bool ret = _virtualhost_ptr->existQueue(req->queue_name());if (ret == false){return basicResponse(false, req->rid(), req->cid());}// 2. 创建队列的消费者auto callback_func = std::bind(&Channel::callback, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3);// 创建了消费者之后,当前的channel角色就是个消费者_consumer_ptr = _consumer_manager_ptr->createConsumer(req->consumer_tag(), req->queue_name(), req->auto_ack(), callback_func);return basicResponse(true, req->rid(), req->cid());}// 取消订阅void basicCancel(const basicCancelRequestPtr &req){_consumer_manager_ptr->removeConsumer(req->consumer_tag(), req->queue_name());return basicResponse(true, req->rid(), req->cid());}private:void callback(const std::string tag, const BasicProperties *base_properties, const std::string &body){// 针对参数组织出推送消息请求,将消息推送给channel对应的客户端basicConsumeResponse resp;resp.set_cid(_id_channel);resp.set_body(body);resp.set_consumer_tag(tag);if (base_properties){resp.mutable_properties()->set_id(base_properties->id());resp.mutable_properties()->set_delivery_mode(base_properties->delivery_mode());resp.mutable_properties()->set_routing_key(base_properties->routing_key());}_codec_ptr->send(_connection_ptr, resp);}void consume(const std::string &qname){// 指定队列消费消息// 1. 从队列中取出一条消息MessagePtr mp = _virtualhost_ptr->basicConsume(qname);if (mp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消息!", qname.c_str());return;}// 2. 从队列订阅者中取出一个订阅者Consumer::ptr cp = _consumer_manager_ptr->chooseConsumer(qname);if (cp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消费者!", qname.c_str());return;}// 3. 调用订阅者对应的消息处理函数,实现消息的推送cp->_callback(cp->_consumer_tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());// 4. 判断如果订阅者是自动确认---不需要等待确认,直接删除消息,否则需要外部收到消息确认后再删除if (cp->_auto_ack)_virtualhost_ptr->basicAck(qname, mp->payload().properties().id());}void basicResponse(bool ok, const std::string &rid, const std::string &cid){basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec_ptr->send(_connection_ptr, resp);}private:// 信道IDstd::string _id_channel;// 消费者指针Consumer::ptr _consumer_ptr;// 链接指针muduo::net::TcpConnectionPtr _connection_ptr;// 协议处理器ProtobufCodecPtr _codec_ptr;// 消费者管理器ConsumerManager::ptr _consumer_manager_ptr;// 虚拟主机VirtualHost::ptr _virtualhost_ptr;// 线程池ThreadPool::ptr _threadpool_ptr;};class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager() {}bool openChannel(const std::string &id,const VirtualHost::ptr &host,const ConsumerManager::ptr &cmp,const ProtobufCodecPtr &codec,const muduo::net::TcpConnectionPtr &conn,const ThreadPool::ptr &pool){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it != _channels.end()){DLOG("信道:%s 已经存在!", id.c_str());return false;}auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);_channels.insert(std::make_pair(id, channel));return true;}void closeChannel(const std::string &id){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(id);}Channel::ptr getChannel(const std::string &id){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it == _channels.end()){return Channel::ptr();}return it->second;}private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};
    }#endif
在MQServer中编写Connection.hpp文件实现链接管理
  • 向⽤⼾提供⼀个⽤于实现⽹络通信的Connection对象,从其内部可创建出粒度更轻的Channel对象,⽤于与客⼾端进⾏⽹络通信

  • 在该文件中要实现的功能:

    • 需要管理的信息:

      1. 连接关联的信道管理句柄(实现信道的增删查)
      2. 连接关联的实际⽤于通信的muduo::net::Connection连接
      3. protobuf协议处理的句柄(ProtobufCodec对象)
      4. 消费者管理句柄
      5. 虚拟机句柄
      6. 异步⼯作线程池句柄
    • 连接操作:

      1. 提供创建Channel信道的操作
      2. 提供删除Channel信道的操作
  • 实现代码

    #ifndef __M_Connection_H__
    #define __M_Connection_H__
    #include "Channel.hpp"namespace MQ {/*** @class Connection* @brief 表示单个客户端连接的管理类* * 负责管理连接生命周期内的信道操作,包括:* - 信道创建/关闭* - 请求响应处理* - 与虚拟主机、消费者管理器的交互*/
    class Connection
    {
    public:using ptr = std::shared_ptr<Connection>;/*** @brief 构造函数(依赖注入核心组件)* @param host 所属虚拟主机管理接口* @param cmp 消费者管理器* @param codec Protobuf编解码器* @param conn 底层TCP连接对象* @param pool 线程池(用于异步任务处理)*/Connection(const VirtualHost::ptr &host,const ConsumerManager::ptr &cmp,const ProtobufCodecPtr &codec,const muduo::net::TcpConnectionPtr &conn,const ThreadPool::ptr &pool): _host_ptr(host),_consumer_manager_ptr(cmp),_codec_ptr(codec),_tcp_connection_ptr(conn),_threadpool_ptr(pool),_channels_ptr(std::make_shared<ChannelManager>()) // 初始化信道管理器{}/*** @brief 打开新信道(线程安全)* @param req 包含信道ID的请求对象* 流程:* 1. 检查信道ID唯一性* 2. 通过ChannelManager创建信道* 3. 发送操作响应*/void openChannel(const openChannelRequestPtr &req){bool ret = _channels_ptr->openChannel(req->cid(), _host_ptr, _consumer_manager_ptr, _codec_ptr, _tcp_connection_ptr, _threadpool_ptr);if (!ret) {DLOG("信道创建失败:重复的CID %s", req->cid().c_str());return basicResponse(false, req->rid(), req->cid());}DLOG("信道创建成功 CID: %s", req->cid().c_str());basicResponse(true, req->rid(), req->cid());}/*** @brief 关闭指定信道* @param req 包含目标信道ID的请求* 注意:立即发送操作确认响应*/void closeChannel(const closeChannelRequestPtr &req){_channels_ptr->closeChannel(req->cid());basicResponse(true, req->rid(), req->cid());}/*** @brief 获取指定信道对象* @param cid 信道标识符* @return 信道智能指针(不存在时返回空指针)*/Channel::ptr getChannel(const std::string &cid){return _channels_ptr->getChannel(cid);}private:/*** @brief 发送通用响应消息* @param ok 操作是否成功* @param rid 请求ID(用于请求-响应匹配)* @param cid 连接ID*/void basicResponse(bool ok, const std::string &rid, const std::string &cid){basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec_ptr->send(_tcp_connection_ptr, resp); // 通过编解码器发送响应}private:// 核心组件依赖muduo::net::TcpConnectionPtr _tcp_connection_ptr; // 底层TCP连接ProtobufCodecPtr _codec_ptr;        // 协议编解码器ConsumerManager::ptr _consumer_manager_ptr; // 消费者管理VirtualHost::ptr _host_ptr;         // 所属虚拟主机ThreadPool::ptr _threadpool_ptr;    // 异步任务线程池// 信道管理ChannelManager::ptr _channels_ptr;  // 多信道管理容器
    };/*** @class ConnectionManager* @brief 全局连接管理类(单例模式)* * 功能:* - 维护所有活跃连接* - 提供连接查找功能* - 保证线程安全的连接操作*/
    class ConnectionManager
    {
    public:using ptr = std::shared_ptr<ConnectionManager>;ConnectionManager() = default;/*** @brief 创建并注册新连接* @param host 虚拟主机实例* @param cmp 消费者管理器* @param codec 编解码器* @param conn 新建立的TCP连接* @param pool 线程池* 线程安全:通过互斥锁保证注册操作的原子性*/void newConnection(const VirtualHost::ptr &host,const ConsumerManager::ptr &cmp,const ProtobufCodecPtr &codec,const muduo::net::TcpConnectionPtr &conn,const ThreadPool::ptr &pool){std::unique_lock<std::mutex> lock(_mutex);if (!_connections.contains(conn)) {auto connection = std::make_shared<Connection>(host, cmp, codec, conn, pool);_connections.emplace(conn, connection);}}/*** @brief 删除指定连接* @param conn 要移除的TCP连接*/void delConnection(const muduo::net::TcpConnectionPtr &conn){std::unique_lock<std::mutex> lock(_mutex);_connections.erase(conn);}/*** @brief 获取连接对象* @param conn TCP连接指针* @return 关联的Connection对象(不存在时返回空指针)*/Connection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn){std::unique_lock<std::mutex> lock(_mutex);auto it = _connections.find(conn);return (it != _connections.end()) ? it->second : nullptr;}private:std::mutex _mutex; // 连接映射表互斥锁std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _connections;
    };} // namespace MQ
    #endif
    
在MQServer中编写Broker.hpp文件实现服务器模块
  • BrokerServer模块是对整体服务器所有模块的整合,接收客⼾端的请求,并提供服务

  • 实现代码

    #ifndef __M_Broker_H__
    #define __M_Broker_H__#include "../MQCommon/Logger.hpp"
    #include "../MQCommon/ThreadPool.hpp"
    #include "../MQCommon/message.pb.h"
    #include "../MQCommon/request.pb.h"
    #include "Connection.hpp"
    #include "Consumer.hpp"
    #include "VirtualHost.hpp"
    #include "muduo/base/Logging.h"
    #include "muduo/base/Mutex.h"
    #include "muduo/net/EventLoop.h"
    #include "muduo/net/TcpServer.h"
    #include "muduo/protobuf/codec.h"
    #include "muduo/protobuf/dispatcher.h"namespace MQ
    {
    #define DBFILE "/meta.db"
    #define HOSTNAME "MyVirtualHost"class BrokerServer{public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;// 构造函数BrokerServer(int port, const std::string &basedir): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort), // muduo服务器对象初始化_dispatcher(std::bind(&BrokerServer::onUnknownMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3)),                                                                                                          // 请求分发器对象初始化_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), // protobuf协议处理器对象初始化_virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),                                                                                             // 虚拟主机对象初始化_consumer_manager(std::make_shared<ConsumerManager>()),                                                                                                                        // 消费者管理器对象初始化_connection_manager(std::make_shared<ConnectionManager>()),                                                                                                                    // 连接管理器对象初始化_threadpool(std::make_shared<ThreadPool>())                                                                                                                                    // 线程池对象初始化{// 针对历史消息中的所有队列,别忘了,初始化队列的消费者管理结构QueueMap queue_map = _virtual_host->allQueues();for (auto &q : queue_map){_consumer_manager->initQueueConsumer(q.first);DLOG("%s",q.first.c_str());}// 注册业务请求处理函数_dispatcher.registerMessageCallback<MQ::openChannelRequest>(std::bind(&BrokerServer::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::closeChannelRequest>(std::bind(&BrokerServer::onCloseChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::declareExchangeRequest>(std::bind(&BrokerServer::onDeclareExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::deleteExchangeRequest>(std::bind(&BrokerServer::onDeleteExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::declareQueueRequest>(std::bind(&BrokerServer::onDeclareQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::deleteQueueRequest>(std::bind(&BrokerServer::onDeleteQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::queueBindRequest>(std::bind(&BrokerServer::onQueueBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::queueUnBindRequest>(std::bind(&BrokerServer::onQueueUnBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::basicPublishRequest>(std::bind(&BrokerServer::onBasicPublish, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::basicAckRequest>(std::bind(&BrokerServer::onBasicAck, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::basicConsumeRequest>(std::bind(&BrokerServer::onBasicConsume, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<MQ::basicCancelRequest>(std::bind(&BrokerServer::onBasicCancel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&BrokerServer::onConnection, this, std::placeholders::_1));}// 服务器启动void start(){_server.start();_baseloop.loop();}private:// 请求处理的逻辑:由连接管理器先找到对应的链接,然后找到对应的信道,然后调用信道的相应处理函数//  打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const openChannelRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("打开信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->openChannel(message);}// 关闭信道void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const closeChannelRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("关闭信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->closeChannel(message);}// 声明交换机void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const declareExchangeRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("声明交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("声明交换机时,没有找到信道!");return;}return cp->declareExchange(message);}// 删除交换机void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const deleteExchangeRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("删除交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("删除交换机时,没有找到信道!");return;}return cp->deleteExchange(message);}// 声明队列void onDeclareQueue(const muduo::net::TcpConnectionPtr &conn, const declareQueueRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("声明队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("声明队列时,没有找到信道!");return;}return cp->declareQueue(message);}// 删除队列void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn, const deleteQueueRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("删除队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("删除队列时,没有找到信道!");return;}return cp->deleteQueue(message);}// 队列绑定void onQueueBind(const muduo::net::TcpConnectionPtr &conn, const queueBindRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("队列绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("队列绑定时,没有找到信道!");return;}return cp->queueBind(message);}// 队列解绑void onQueueUnBind(const muduo::net::TcpConnectionPtr &conn, const queueUnBindRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("队列解除绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("队列解除绑定时,没有找到信道!");return;}return cp->queueUnBind(message);}// 消息发布void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const basicPublishRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("发布消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("发布消息时,没有找到信道!");return;}return cp->basicPublish(message);}// 消息确认void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const basicAckRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("确认消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("确认消息时,没有找到信道!");return;}return cp->basicAck(message);}// 队列消息订阅void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const basicConsumeRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("队列消息订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("队列消息订阅时,没有找到信道!");return;}return cp->basicConsume(message);}// 队列消息取消订阅void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const basicCancelRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr){DLOG("队列消息取消订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr){DLOG("队列消息取消订阅时,没有找到信道!");return;}return cp->basicCancel(message);}// 处理未知消息void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}// 处理新连接void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_connection_manager->newConnection(_virtual_host, _consumer_manager, _codec, conn, _threadpool);}else{_connection_manager->delConnection(conn);}}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;  // 服务器对象ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数ProtobufCodecPtr _codec;        // protobuf协议处理器--针对收到的请求数据进行protobuf协议处理VirtualHost::ptr _virtual_host;ConsumerManager::ptr _consumer_manager;ConnectionManager::ptr _connection_manager;ThreadPool::ptr _threadpool;};
    }
    #endif
    
在MQServer中编写Server.cpp文件实现服务器示例
  • 实现代码

    #include "Broker.hpp"int main()
    {MQ::BrokerServer server(8085, "./data/");server.start();return 0;
    }
    
http://www.xdnf.cn/news/349759.html

相关文章:

  • 云计算运维
  • vue实现半圆转盘旋转(门户网页上)
  • 企业级UI测试的“双保险”:TestComplete的智能对象识别与详细报告功能
  • 二叉搜索树的插入操作(递归遍历)
  • 力扣-142.环形链表II
  • 引文索引数据库在科研中的应用
  • 问题 | 低空经济未来发展前景机遇及挑战
  • BFS算法的学习
  • 腾讯云:数字世界的“量子熔炉”与硅基文明引擎​
  • 数据结构-堆排序
  • Houdini 深圳实操交流会!即将开幕
  • 代码随想录第39天:单调栈
  • VBA经典应用69例应用8:利用VBA,完成自动运行任务的预设
  • xiaopiu原型设计工具笔记
  • Windows 环境变量完全指南:系统变量、用户变量与 PATH 详解
  • 在不同环境下部署和运行基于后量子密码的轻量级通信协议的详细指南
  • pm2如何执行脚本批量启动多个服务
  • 认识守卫-以及简单的示例和装饰器
  • Java网络编程:理解URI、URL和URN
  • python线上学习进度报告
  • Android13 权限管理机制整理
  • 308.旅行终点站
  • 正点原子IMX6U开发板移植Qt时出现乱码
  • 什么是死信队列?死信队列是如何导致的?
  • TLS 1.3:一把打不开旧锁的新钥匙,为何难成主流?
  • Blind SSRF with Shellshock exploitation过关
  • [人机交互]以用户为中心的交互设计
  • 基于译码器和锁存器的运行逻辑的简易算法
  • 算法解密:轮转数组问题全解析
  • 多源地震资料处理中的震源信号分离算法资料