当前位置: 首页 > news >正文

手撕基于AMQP协议的简易消息队列-7(客户端模块的编写)

在MQClient中编写客户端模块代码

在MQClient中编写makefile文件来编译客户端模块
.PHONY:all
all:PublichClient ConsumeClient
PublichClient : PublichClient.cpp ../MQCommon/request.pb.cc ../MQCommon/message.pb.cc ../ThirdLib/lib/include/muduo/protobuf/codec.ccg++ -g -std=c++11 $^ -o $@ -I../ThirdLib/lib/include -L../ThirdLib/lib/lib  -lmuduo_net -lmuduo_base -pthread -lprotobuf -lz
ConsumeClient : ConsumeClient.cpp  ../MQCommon/request.pb.cc ../MQCommon/message.pb.cc ../ThirdLib/lib/include/muduo/protobuf/codec.ccg++ -g -std=c++11 $^ -o $@ -I../ThirdLib/lib/include -L../ThirdLib/lib/lib  -lmuduo_net -lmuduo_base -pthread -lprotobuf -lz
在MQClient中编写Subscriber.hpp文件实现订阅者模块
  • 与服务端,并⽆太⼤差别

  • 在该文件中,应该实现的功能:

    • 订阅者信息:

      1. 订阅者标识
      2. 订阅队列名
      3. 是否⾃动确认标志
      4. 回调处理函数(收到消息后该如何处理的回调函数对象)
    • 实现函数

      #ifndef __M_Subscriber_H__
      #define __M_Subscriber_H__
      #include "../MQCommon/Helper.hpp"
      #include "../MQCommon/Logger.hpp"
      #include "../MQCommon/message.pb.h"
      #include <google/protobuf/map.h>
      #include <iostream>
      #include <memory>
      #include <mutex>
      #include <unordered_map>namespace MQ
      {using SubscriberCallback = std::function<void(const std::string, const BasicProperties *bp, const std::string)>;class Subscriber{public:using ptr = std::shared_ptr<Subscriber>;// 构造函数Subscriber() {}Subscriber(const std::string &consumer_tag, const std::string &subscribe_queue_name, bool auto_ack, const SubscriberCallback &callback): _auto_ack(auto_ack),_subscribe_queue_name(subscribe_queue_name),_subscribe_queue_tag(consumer_tag),_callback(callback){}// 析构函数virtual ~Subscriber() {}public:// 自动应答标志bool _auto_ack;// 订阅的队列名称std::string _subscribe_queue_name;// 消费者标识std::string _subscribe_queue_tag;// 消费者回调函数SubscriberCallback _callback;};}
      #endif
      
在MQClient中编写Channel.hpp文件实现信道管理模块
  • 同样的,客⼾端也有信道,其功能与服务端⼏乎⼀致,或者说不管是客⼾端的channel还是服务端的channel都是为了⽤⼾提供具体服务⽽存在的,只不过服务端是为客⼾端的对应请求提供服务,⽽客⼾端的接⼝服务是为了⽤⼾具体需要服务,也可以理解是⽤⼾通过客⼾端channel的接⼝调⽤来向服务端发送对应请求,获取请求的服务

  • 在该文件中,应该实现的功能:

    • 信道信息:

      1. 信道ID
      2. 信道关联的⽹络通信连接对象
      3. protobuf协议处理对象
      4. 信道关联的消费者
      5. 请求对应的响应信息队列(这⾥队列使⽤<请求ID,响应>hash表,以便于查找指定的响应)
      6. 互斥锁&条件变量(⼤部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步)
    • 信道操作:

      1. 提供创建信道操作
      2. 提供删除信道操作
      3. 提供声明交换机操作(强断⾔-有则OK,没有则创建)
      4. 提供删除交换机
      5. 提供创建队列操作(强断⾔-有则OK,没有则创建)
      6. 提供删除队列操作
      7. 提供交换机-队列绑定操作
      8. 提供交换机-队列解除绑定操作
      9. 提供添加订阅操作
      10. 提供取消订阅操作
      11. 提供发布消息操作
      12. 提供确认消息操作
    • 信道管理:

      1. 创建信道
      2. 查询信道
      3. 删除信道
  • 实现代码

    #ifndef __M_Channel_H__
    #define __M_Channel_H__#include "../MQCommon/Helper.hpp"
    #include "../MQCommon/Logger.hpp"
    #include "../MQCommon/ThreadPool.hpp"
    #include "../MQCommon/message.pb.h"
    #include "../MQCommon/request.pb.h"
    #include "Subscriber.hpp"
    #include "muduo/net/TcpConnection.h"
    #include "muduo/protobuf/codec.h"
    #include <condition_variable>
    #include <iostream>
    #include <mutex>
    #include <unordered_map>namespace MQ
    {using MessagePtr = std::shared_ptr<google::protobuf::Message>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;class Channel{public:using ptr = std::shared_ptr<Channel>;// 构造函数Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec): _channel_id(UUIDHelper::uuid()),_connection_ptr(conn),_codec_ptr(codec){}// 析构函数~Channel(){basicCancel();}std::string cid(){return _channel_id;}bool openChannel(){std::string rid = UUIDHelper::uuid();openChannelRequest req;req.set_rid(rid);req.set_cid(_channel_id);_codec_ptr->send(_connection_ptr, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void closeChannel(){std::string rid = UUIDHelper::uuid();closeChannelRequest req;req.set_rid(rid);req.set_cid(_channel_id);_codec_ptr->send(_connection_ptr, req);waitResponse(rid);return;}bool declareExchange(const std::string &name,ExchangeType type,bool durable,bool auto_delete,google::protobuf::Map<std::string, std::string> &args){// 构造一个声明虚拟机的请求对象,std::string rid = UUIDHelper::uuid();declareExchangeRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_exchange_name(name);req.set_exchange_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);// 然后向服务器发送请求_codec_ptr->send(_connection_ptr, req);// 等待服务器的响应basicCommonResponsePtr resp = waitResponse(rid);// 返回return resp->ok();}void deleteExchange(const std::string &name){std::string rid = UUIDHelper::uuid();deleteExchangeRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_exchange_name(name);_codec_ptr->send(_connection_ptr, req);waitResponse(rid);return;}bool declareQueue(const std::string &qname,bool qdurable,bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string, std::string> &qargs){std::string rid = UUIDHelper::uuid();declareQueueRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_queue_name(qname);req.set_durable(qdurable);req.set_auto_delete(qauto_delete);req.set_exclusive(qexclusive);req.mutable_args()->swap(qargs);_codec_ptr->send(_connection_ptr, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void deleteQueue(const std::string &qname){std::string rid = UUIDHelper::uuid();deleteQueueRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_queue_name(qname);_codec_ptr->send(_connection_ptr, req);waitResponse(rid);return;}bool queueBind(const std::string &ename,const std::string &qname,const std::string &key){std::string rid = UUIDHelper::uuid();queueBindRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);_codec_ptr->send(_connection_ptr, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void queueUnBind(const std::string &ename, const std::string &qname){std::string rid = UUIDHelper::uuid();queueUnBindRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_exchange_name(ename);req.set_queue_name(qname);_codec_ptr->send(_connection_ptr, req);waitResponse(rid);return;}void basicPublish(const std::string &ename,const BasicProperties *bp,const std::string &body){std::string rid = UUIDHelper::uuid();basicPublishRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_body(body);req.set_exchange_name(ename);if (bp != nullptr){req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}_codec_ptr->send(_connection_ptr, req);waitResponse(rid);return;}void basicAck(const std::string &msgid){if (_subscriber_ptr.get() == nullptr){DLOG("消息确认时,找不到消费者信息!");return;}std::string rid = UUIDHelper::uuid();basicAckRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_queue_name(_subscriber_ptr->_subscribe_queue_name);req.set_message_id(msgid);_codec_ptr->send(_connection_ptr, req);waitResponse(rid);return;}void basicCancel(){if (_subscriber_ptr.get() == nullptr){return;}std::string rid = UUIDHelper::uuid();basicCancelRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_queue_name(_subscriber_ptr->_subscribe_queue_name);req.set_consumer_tag(_subscriber_ptr->_subscribe_queue_tag);_codec_ptr->send(_connection_ptr, req);waitResponse(rid);_subscriber_ptr.reset();return;}bool basicConsume(const std::string &consumer_tag,const std::string &queue_name,bool auto_ack,const SubscriberCallback &cb){if (_subscriber_ptr.get() != nullptr){DLOG("当前信道已订阅其他队列消息!");return false;}std::string rid = UUIDHelper::uuid();basicConsumeRequest req;req.set_rid(rid);req.set_cid(_channel_id);req.set_queue_name(queue_name);req.set_consumer_tag(consumer_tag);req.set_auto_ack(auto_ack);_codec_ptr->send(_connection_ptr, req);basicCommonResponsePtr resp = waitResponse(rid);if (resp->ok() == false){DLOG("添加订阅失败!");return false;}DLOG("添加订阅成功!订阅者:%s,订阅队列:%s", consumer_tag.c_str(), queue_name.c_str());_subscriber_ptr = std::make_shared<Subscriber>(consumer_tag, queue_name, auto_ack, cb);return true;}public:// 连接收到基础响应后,向hash_map中添加响应void putBasicResponse(const basicCommonResponsePtr &resp){std::unique_lock<std::mutex> lock(_mutex);_basic_resp.insert(std::make_pair(resp->rid(), resp));_cv.notify_all();}// 连接收到消息推送后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理void consume(const basicConsumeResponsePtr &resp){if (_subscriber_ptr.get() == nullptr){DLOG("消息处理时,未找到订阅者信息!");return;}if (_subscriber_ptr->_subscribe_queue_tag != resp->consumer_tag()){DLOG("收到的推送消息中的消费者标识,与当前信道消费者标识不一致!");return;}_subscriber_ptr->_callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());}private:basicCommonResponsePtr waitResponse(const std::string &rid){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock, [&rid, this](){ return _basic_resp.find(rid) != _basic_resp.end(); });// while(condition()) _cv.wait();basicCommonResponsePtr basic_resp = _basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}private:std::string _channel_id;muduo::net::TcpConnectionPtr _connection_ptr;ProtobufCodecPtr _codec_ptr;Subscriber::ptr _subscriber_ptr;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp;};class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager() {}Channel::ptr create(const muduo::net::TcpConnectionPtr &conn,const ProtobufCodecPtr &codec){std::unique_lock<std::mutex> lock(_mutex);auto channel = std::make_shared<Channel>(conn, codec);_channels.insert(std::make_pair(channel->cid(), channel));return channel;}void remove(const std::string &cid){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr get(const std::string &cid){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(cid);if (it == _channels.end()){return Channel::ptr();}return it->second;}private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};
    }#endif
    
在MQClient中编写Worker.hpp文件实现异步⼯作线程模块
  • 客⼾端这边存在两个异步⼯作线程,

    • ⼀个是muduo库中客⼾端连接的异步循环线程EventLoopThread,

    • ⼀个是当收到消息后进⾏异步处理的⼯作线程池。

  • 这两项都不是以连接为单元进⾏创建的,⽽是创建后,可以⽤以多个连接中,因此单独进⾏封装。

  • 实现代码:

    #ifndef __M_WORKER_H__
    #define __M_WORKER_H__
    #include "../MQCommon/Helper.hpp"
    #include "../MQCommon/Logger.hpp"
    #include "../MQCommon/ThreadPool.hpp"
    #include "muduo/net/EventLoopThread.h"namespace MQ
    {class AsyncWorker{public:using ptr = std::shared_ptr<AsyncWorker>;muduo::net::EventLoopThread loopthread;MQ::ThreadPool pool;};
    }#endif
    
    在MQClient中编写Connection.hpp文件实现连接管理模块
    • 在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。

    • 这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务

    • 实现代码

      #ifndef __M_CONNECTION_H__
      #define __M_CONNECTION_H__
      #include "muduo/base/CountDownLatch.h"
      #include "muduo/base/Logging.h"
      #include "muduo/base/Mutex.h"
      #include "muduo/net/EventLoop.h"
      #include "muduo/net/EventLoopThread.h"
      #include "muduo/net/TcpClient.h"
      #include "muduo/protobuf/codec.h"
      #include "muduo/protobuf/dispatcher.h"#include "Channel.hpp"
      #include "Subscriber.hpp"
      #include "Worker.hpp"namespace MQ
      {class Connection{public:using ptr = std::shared_ptr<Connection>;Connection(const std::string &sip, int sport, const AsyncWorker::ptr &worker): _latch(1), _client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),_dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<basicCommonResponse>(std::bind(&Connection::basicResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<basicConsumeResponse>(std::bind(&Connection::consumeResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));_client.connect();_latch.wait(); // 阻塞等待,直到连接建立成功}// 打开信道Channel::ptr openChannel(){Channel::ptr channel = _channel_manager->create(_conn, _codec);bool ret = channel->openChannel();if (ret == false){DLOG("打开信道失败!");return Channel::ptr();}return channel;}// 关闭信道void closeChannel(const Channel::ptr &channel){channel->closeChannel();_channel_manager->remove(channel->cid());}private:void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp){// 1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr){DLOG("未找到信道信息!");return;}// 2. 将得到的响应对象,添加到信道的基础响应hash_map中channel->putBasicResponse(message);}void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp){// 1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr){DLOG("未找到信道信息!");return;}// 2. 封装异步任务(消息处理任务),抛入线程池_worker->pool.push([channel, message](){ channel->consume(message); });}void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_latch.countDown(); // 唤醒主线程中的阻塞_conn = conn;}else{// 连接关闭时的操作_conn.reset();}}private:muduo::CountDownLatch _latch;       // 实现同步的muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接muduo::net::TcpClient _client;      // 客户端ProtobufDispatcher _dispatcher;     // 请求分发器ProtobufCodecPtr _codec;            // 协议处理器AsyncWorker::ptr _worker;             // 异步工作线程ChannelManager::ptr _channel_manager; // 信道管理器};
      }
      #endif
      
在MQClient中编写ConsumeClient.cpp文件和PublichClient.cpp文件实现消费者客户端与发布者客户端
  • 消费者客户端的实现

    #include "Connection.hpp"void cb(MQ::Channel::ptr &channel, const std::string consumer_tag, const MQ::BasicProperties *bp, const std::string &body)
    {std::cout << consumer_tag << "消费了消息:" << body << std::endl;channel->basicAck(bp->id());
    }
    int main(int argc, char *argv[])
    {if (argc != 2) {std::cout << "usage: ./consume_client queue1\n";return -1;}//1. 实例化异步工作线程对象MQ::AsyncWorker::ptr awp = std::make_shared<MQ::AsyncWorker>();//2. 实例化连接对象MQ::Connection::ptr conn = std::make_shared<MQ::Connection>("127.0.0.1", 8085, awp);//3. 通过连接创建信道MQ::Channel::ptr channel = conn->openChannel();//4. 通过信道提供的服务完成所需//  1. 声明一个交换机exchange1, 交换机类型为广播模式google::protobuf::Map<std::string, std::string> tmp_map;channel->declareExchange("exchange1", MQ::ExchangeType::DIRECT, true, false, tmp_map);//  2. 声明一个队列queue1channel->declareQueue("queue1", true, false, false, tmp_map);//  3. 声明一个队列queue2channel->declareQueue("queue2", true, false, false, tmp_map);//  4. 绑定queue1-exchange1,且binding_key设置为queue1channel->queueBind("exchange1", "queue1", "queue1");//  5. 绑定queue2-exchange1,且binding_key设置为news.music.#channel->queueBind("exchange1", "queue2", "news.music.#");auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel->basicConsume("consumer1", argv[1], false, functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3));conn->closeChannel(channel);return 0;
    }
    
  • 发布者客户端的实现

    #include "Connection.hpp"int main()
    {//1. 实例化异步工作线程对象MQ::AsyncWorker::ptr awp = std::make_shared<MQ::AsyncWorker>();//2. 实例化连接对象MQ::Connection::ptr conn = std::make_shared<MQ::Connection>("127.0.0.1", 8085, awp);//3. 通过连接创建信道MQ::Channel::ptr channel = conn->openChannel();//4. 通过信道提供的服务完成所需//  1. 声明一个交换机exchange1, 交换机类型为广播模式google::protobuf::Map<std::string, std::string> tmp_map;channel->declareExchange("exchange1", MQ::ExchangeType::DIRECT, true, false, tmp_map);//  2. 声明一个队列queue1channel->declareQueue("queue1", true, false, false, tmp_map);//  3. 声明一个队列queue2channel->declareQueue("queue2", true, false, false, tmp_map);//  4. 绑定queue1-exchange1,且binding_key设置为queue1channel->queueBind("exchange1", "queue1", "queue1");//  5. 绑定queue2-exchange1,且binding_key设置为news.music.#channel->queueBind("exchange1", "queue2", "news.music.#");//5. 循环向交换机发布消息for (int i = 0; i < 10; i++) {MQ::BasicProperties bp;bp.set_id(UUIDHelper::uuid());bp.set_delivery_mode(MQ::DeliveryMode::DURABLE);bp.set_routing_key("queue1");channel->basicPublish("exchange1", &bp, "Hello World-" + std::to_string(i));}MQ::BasicProperties bp;bp.set_id(UUIDHelper::uuid());bp.set_delivery_mode(MQ::DeliveryMode::DURABLE);bp.set_routing_key("queue1");channel->basicPublish("exchange1", &bp, "Hello Bite");bp.set_routing_key("news.sport");channel->basicPublish("exchange1", &bp, "Hello chileme?");//6. 关闭信道conn->closeChannel(channel);return 0;
    }
    
http://www.xdnf.cn/news/338923.html

相关文章:

  • 数字孪生技术中端渲染与流渲染的架构对比
  • linux中的常用命令(一)
  • STM32智能刷卡消费系统(uC/OS-III)
  • commonmark.js 源码阅读(一) - Block Parser
  • ComfyUI 学习笔记,案例 6 :FLUX 模型文生图
  • 【Linux系列】目录大小查看
  • 【Python 日期和时间】
  • 【redis】集群模式
  • Windows命令行软件管理器:Chocolatey
  • 多级路由器如何避免IP冲突
  • 使用JAVA对接Deepseek API实现首次访问和提问
  • Linux网络编程day7 线程池
  • 因子分析——数学原理及R语言代码
  • flinksql bug : Max aggregate function does not support type: CHAR
  • 援外培训项目冈比亚数字政府能力建设研修班莅临麒麟信安参观考察
  • Ubuntu每次开机IP都是127.0.0.1
  • Debian系统详解
  • linux命令行与shell脚本大全——学习笔记(9-10章)
  • ABP vNext + Dapr 实现云原生微服务治理
  • Sass @import rules are deprecated and will be removed in Dart Sass 3.0.0.
  • C++类和对象:构造函数、析构函数、拷贝构造函数
  • 服务器配置llama-factory问题解决
  • javaer快速从idea转战vscode
  • 【Java ee 初阶】文件操作和IO(上)
  • 经验:从CAN到以太网为主的车载网络架构升级
  • 实时云渲染——比像素流送节省80%精力的UE程序推流技术
  • 网盘解析工具更新,支持UC网盘!!
  • 配置VS的DLL文件引用目录(两种方案,无需每次生成dll后手动将其复制到exe目录下)
  • vue2 两种路由跳转方式
  • window 显示驱动开发-处理内存段(二)