当前位置: 首页 > ds >正文

手撕基于AMQP协议的简易消息队列-4(项目需求分析)

需求分析

核心概念需求

  1. ⽣产者 (Producer)
  2. 消费者 (Consumer)
  3. 中间⼈ (Broker)
  4. 发布 (Publish)
  5. 订阅 (Subscribe)

整体模型需求

  • 一个生产者一个消费者
    在这里插入图片描述

  • N个⽣产者, N个消费者

    在这里插入图片描述

  • 不难看出Broker Server是最核⼼的部分, 负责消息的存储和转发

Broker Server模块需求

Broker Server模块概览
  • 为了实现AMQP(Advanced Message Queuing Protocol)-⾼级消息队列协议模型,对于消息中间件服务器Broker中,又需要一下模块:

    • 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合。⼀个 BrokerServer 上可以存在多个 VirtualHost
    • 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue队列 (Queue): 真正⽤来存储消息的部分, 每个消费者决定⾃⼰从哪个 Queue 上读取消息
    • 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多"关系,使⽤⼀个关联表就可以把这两个概念联系起来
    • 消息 (Message): 传递的内容
  • BrokerServer结构图

    • 大体上是BrokerServer中包含多个虚拟机,但本项目为了简化代码,仅支持定义一个虚拟机,同志们可以在这做拓展

    在这里插入图片描述

  • VirtualHost结构图

    • 对于生产客户端来说,其生产的消息应该交给交换机进行处理,由交换机根据交换机类型决定该消息应该推送到哪些队列当中去,而不是由生产客户端直接推送到队列当中
    • 对于消费客户端来说,其要消费得消息应该直接从队列里面进行拿取。

    在这里插入图片描述

Broker Server模块核心API
  • Broker Server需要提供相应的接口来给客户端进行操作
  1. 创建交换机 (exchangeDeclare)
  2. 销毁交换机 (exchangeDelete)
  3. 创建队列 (queueDeclare)
  4. 销毁队列 (queueDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)
  10. 取消订阅 (basicCancel)

交换机类型需求

  1. Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名

  2. Fanout: ⽣产者发送的消息会被 到该交换机的所有队列中

  3. Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey。发送消息指定⼀个字符串为routingKey。当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列

持久化需求

  • 需要持久化的数据:
    1. Exchange:交换机数据
    2. Queue:队列数据
    3. Binding:绑定数据,消费者与队列与交换机之间的关系
    4. Message:未被消费得数据

网络通信需求

  • ⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器,通过⽹络进⾏通信。
  • 在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作
  • 客户端需要提供的api:
    1. 创建 Connection
    2. 关闭 Connection
    3. 创建 Channel
    4. 关闭 Channel
    5. 创建队列 (queueDeclare)
    6. 销毁队列 (queueDelete)
    7. 创建交换机 (exchangeDeclare)
    8. 销毁交换机 (exchangeDelete)
    9. 创建绑定 (queueBind)解除绑定 (queueUnbind)
    10. 发布消息 (basicPublish)
    11. 订阅消息 (basicConsume)
    12. 确认消息 (basicAck)
    13. 取消订阅(basicCancel)

消息应答种类需求

  • ⾃动应答: 消费者只要消费了消息, 就算应答完毕了,Broker 直接删除这个消息

  • ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息

模块划分

三层处理架构

在这里插入图片描述

项目模块关系图

在这里插入图片描述

服务端模块

服务器的整体结构图

在这里插入图片描述

  • _server:Muduo库提供的⼀个通⽤TCP服务器, 我们可以封装这个服务器进⾏TCP通信
  • _baseloop:主事件循环器, ⽤于响应IO事件和定时器事件,主loop主要是为了响应监听描述符的IO事件
  • _codec: ⼀个protobuf编解码器, 我们在TCP服务器上设计了⼀层应⽤层协议, 这个编解码器主要就是负责实现应⽤层协议的解析和封装, 下边具体讲解
  • _dispatcher:⼀个消息分发器, 当Socket接收到⼀个报⽂消息后, 我们需要按照消息的类型, 即上⾯提到的typeName进⾏消息分发, 会将不同类型的消息分发相对应的的处理函数中,下边具体讲解
  • _consumer: 服务器中的消费者信息管理句柄。
  • _threadpool: 异步⼯作线程池,主要⽤于队列消息的推送⼯作。
  • _connections: 连接管理句柄,管理当前服务器上的所有已经建⽴的通信连接。
  • _virtual_host:服务器持有的虚拟主机。 队列、交换机 、绑定、消息等数据都是通过虚拟主机管理
关键组件协作

在这里插入图片描述

持久化数据管理中⼼模块
  • 在数据管理模块中管理交换机,队列,队列绑定,消息等部分数据数据。
  1. 交换机管理:

    • 管理信息:名称,类型,是否持久化标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数

    • 管理操作:恢复历史信息,声明,删除,获取,判断是否存在

  2. 队列管理:

    • 管理信息:名称,是否持久化标志,是否独有标志,是否(⽆⼈使⽤时)⾃动删除标志,其他参数…

    • 管理操作:恢复历史信息,声明,删除,获取,判断是否存在

  3. 绑定管理:

  • 管理信息:交换机名称,队列名称,绑定主题

  • 管理操作:恢复历史信息,绑定,解绑,解除交换机关联绑定信息,解除队列关联绑定信息,获取交换机关联绑定信息

  1. 消息管理:

    • 管理信息:

      1. 属性消息ID, 路由主题,持久化模式标志
      2. 消息内容
      3. 持久化有效标志
      4. 持久化位置
      5. 持久化消息长度
    • 管理操作:恢复历史消息、向指定队列新增消息,获取指定队列队首消息,确认移除消息。以上消息都应该在内存和硬盘中存储。

      • 以内存存储为主,主要是保证快速查找信息进⾏处理

      • 以硬盘存储为辅,主要是保证服务器重启之后,之前的信息都可以正常保持

虚拟机管理模块
  • 因为交换机/队列/绑定都是以虚拟机为单元整体进⾏操作的,因此虚拟机是对以上数据管理模块整合模块

  • 虚拟机管理信息:

    • 交换机数据管理模块句柄

    • 队列数据管理模块句柄

    • 绑定数据管理模块句柄

    • 消息数据管理模块句柄

  • 虚拟机对外的接口

    • 提供虚拟机内交换机声明/删除操作

    • 提供虚拟机内队列声明/删除操作

    • 提供虚拟机内交换机-队列绑定/解绑操作

    • 获取交换机的相关绑定信息

  • 对虚拟机的管理操作

    • 创建虚拟机

    • 查询虚拟机

    • 删除虚拟机

交换路由模块
  • 当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?答案是由交换路由模块来决定

  • 在绑定信息中有⼀个binding_key,⽽每条发布的消息中有⼀个routing_key,能否⼊队取决于两个要素:交换机类型和key

  • 不同交换机类型对应的路由操作:

    1. ⼴播:将消息⼊队到该交换机的所有绑定队列中
    2. 直接:将消息⼊队到绑定信息中binding_key与消息routing_key⼀致的队列中
    3. 主题:将消息⼊队到绑定信息中binding_key与routing_key是匹配成功的队列中
  • binding_key:

    • 是由数字字⺟下划线构成的, 并且使⽤ . 分成若⼲部分,比如:news.music.#
    • ⽀持 * 和 # 两种通配符, 但是 * # 只能作为 . 切分出来的独⽴部分, 不能和其他数字字⺟混⽤,比如:
      • ⽐如 a.*.b 是合法的, a.*a.b 是不合法的
      • * 可以匹配任意⼀个单词(注意是单词不是字⺟)
      • # 可以匹配任意零个或者多个单词(注意是单词不是字⺟)
  • routing_key:

    • 是由数据、字⺟和下划线构成, 并且使⽤ . 划分成若⼲部分,如:news.music.pop
  • 路由匹配算法详解

    这个路由匹配算法用于消息队列系统中,根据交换机类型和路由键来决定消息如何路由到队列。算法主要处理三种交换机类型:DIRECT(直接)、FANOUT(扇出)和TOPIC(主题)。

  • 算法概述

    • 该算法实现了AMQP协议中的路由匹配规则,特别是对TOPIC交换机的复杂模式匹配。算法采用动态规划方法解决主题交换机中的通配符匹配问题。

    • 三种交换机类型的处理

      1. DIRECT(直接交换机)
    if (type == ExchangeType::DIRECT) {return (routing_key == binding_key);
    }
    

    ​ 特点:

    • 精确匹配路由键和绑定键

    • 只有当routing_key完全等于binding_key时才匹配成功

    • 时间复杂度:O(1)

    示例:

    • routing_key: “order.create”

    • 匹配的binding_key: “order.create”

    • 不匹配的binding_key: “order.delete”

    1. FANOUT(扇出交换机)
    else if (type == ExchangeType::FANOUT) {return true;
    }
    

    特点:

    • 无条件匹配所有绑定

    • 消息会被路由到所有绑定的队列

    • 不考虑路由键和绑定键

    • 时间复杂度:O(1)

    示例:

    • 无论routing_key是什么,都会匹配所有binding_key
    1. TOPIC(主题交换机)
    else if (type == ExchangeType::TOPIC) {// 详细匹配逻辑...
    }
    

    特点:

    • 支持通配符匹配

    • 使用动态规划算法处理复杂模式

    • 时间复杂度:O(m*n),其中m和n分别是binding_key和routing_key的分段数

    TOPIC交换机匹配算法详解

    1. 键分割
    std::vector<std::string> bkeys, rkeys;
    int count_binding_key = StrHelper::split(binding_key, ".", bkeys);
    int count_routing_key = StrHelper::split(routing_key, ".", rkeys);
    
    • 将binding_key和routing_key按"."分割成多个部分

    • 例如:“news.music.pop” → [“news”, “music”, “pop”]

    1. 动态规划表初始化
    std::vector<std::vector<bool>> dp(count_binding_key + 1, std::vector<bool>(count_routing_key + 1, false));
    dp[0][0] = true;
    
    • 创建(m+1)×(n+1)的二维布尔数组dp

    • dp[i][j]表示binding_key前i部分能否匹配routing_key前j部分

    • 初始状态:空键匹配空键(dp[0][0] = true)

    1. 处理"#"开头的binding_key
    for (int i = 1; i <= count_binding_key; i++) {if (bkeys[i - 1] == "#") {dp[i][0] = true;continue;}break;
    }
    
    • "#"表示匹配零个或多个单词

    • 如果binding_key以"#"开头,则能匹配空routing_key

    • 例如binding_key: "#.news"可以匹配routing_key: “news”

    1. 动态规划填充
    for (int i = 1; i <= count_binding_key; i++) {for (int j = 1; j <= count_routing_key; j++) {// 情况1:当前单词相同或binding_key为"*"if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*") {dp[i][j] = dp[i - 1][j - 1];}// 情况2:当前binding_key为"#"else if (bkeys[i - 1] == "#") {dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}}
    }
    

    匹配规则:

    1. 精确匹配或"*"通配符:

      • "*"匹配单个单词

      • 如果当前binding_key部分等于routing_key部分,或binding_key部分为"*"

      • 继承左上角的结果(dp[i][j] = dp[i-1][j-1])

    2. "#"通配符:

      • "#"匹配零个或多个单词

      • 可以从三个方向继承结果:

        • 左上(dp[i-1][j-1]):"#"匹配当前单词

        • 左(dp[i][j-1]):"#"匹配更多单词

        • 上(dp[i-1][j]):"#"匹配零个单词

    3. 返回结果

    return dp[count_binding_key][count_routing_key];
    
    • 返回动态规划表右下角的值

    • 表示整个binding_key能否匹配整个routing_key

    示例分析

    示例1:

    • binding_key: “news.*.pop”

    • routing_key: “news.music.pop”

    匹配过程:

    1. 分割:

      • bkeys: [“news”, “*”, “pop”]

      • rkeys: [“news”, “music”, “pop”]

    2. 动态规划表:

      • dp[1][1] = true (“news” == “news”)

      • dp[2][2] = true ("*“匹配"music”)

      • dp[3][3] = true (“pop” == “pop”)

    3. 结果:true

    示例2:

    • binding_key: “news.#”

    • routing_key: “news.music.pop.jazz”

    匹配过程:

    1. 分割:

      • bkeys: [“news”, “#”]

      • rkeys: [“news”, “music”, “pop”, “jazz”]

    2. 动态规划表:

      • dp[1][1] = true (“news” == “news”)

      • dp[2][2] = true ("#“匹配"music”)

      • dp[2][3] = true ("#“继续匹配"pop”)

      • dp[2][4] = true ("#“继续匹配"jazz”)

    3. 结果:true

    示例3:

    • binding_key: “news.music.#.jazz”

    • routing_key: “news.music.pop.jazz”

    匹配过程:

    1. 分割:

      • bkeys: [“news”, “music”, “#”, “jazz”]

      • rkeys: [“news”, “music”, “pop”, “jazz”]

    2. 动态规划表:

      • dp[1][1] = true (“news” == “news”)

      • dp[2][2] = true (“music” == “music”)

      • dp[3][3] = true ("#“匹配"pop”)

      • dp[4][4] = true (“jazz” == “jazz”)

    3. 结果:true

消费者管理模块
  • 消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅⼀个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。
  • 因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出⼀个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的⽤法)
  • 需要管理的消费者信息:
    • 标识
    • 订阅队列名称
    • ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)
    • 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送)
  • 对消费者管理的操作:
    • 添加
    • 删除
    • 轮询获取指定队列的消费者
    • 移除队列所有消费者
信道管理模块
  • 在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection,但同一个Connection的Channel之间相互独⽴

  • 需要管理的信道信息

    • 信道ID
    • 信道关联的消费者
    • 信道关联的连接
    • 信道关联的虚拟机
    • ⼯作线程池(⼀条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)
  • 需要管理的操作:

    • 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)

    • 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)

    • 提供绑定&解绑队列操作

    • 提供订阅&取消订阅队列消息操作

    • 提供发布&确认消息操作

链接管理模块
  • 链接模块的典型工作流程

    在这里插入图片描述

  • 管理结构图

    在这里插入图片描述

  • 虽然Muduo库里面提供了链接模块,但不能完全满足我们的要求,所以需要进行二次封装。

  • 除了基础的增删查链接外,还需要能够操作信道

  • 需要管理的链接信息:

    • 链接关联的信道
    • 链接关联的Muduo库Connection
  • 需要管理的操作:

    • 新增链接
    • 删除链接
    • 获取链接
    • 打开信道
    • 关闭信道
Broker服务器模块
  • 整合以上所有模块,并搭建⽹络通信服务器,实现与客⼾端⽹络通信,能够识别客⼾端请求,并提供客⼾端请求的处理服务
  • 需要管理的服务器信息:
    • 虚拟机管理模块句柄
    • 消费者管理模块句柄
    • 连接管理模块句柄
    • ⼯作线程池句柄
    • muduo库通信所需元素

客户端模块

消费者管理
  • 消费者在客⼾端的存在感⽐较低,因为在⽤⼾的使⽤⻆度中,只要创建⼀个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传⼊了⼀个消费者标识,且当前的简单实现也仅仅是⼀个信道只能创建订阅⼀个队列,也就是只能创建⼀个消费者,它们⼀⼀对应,因此更是弱化了消费者的存在

  • 需要管理的消费者信息

    • 标识

      b. 订阅队列名称

      c. ⾃动应答标志(决定了⼀条消息推送给消费者后,是否需要等待收到确认后再删除消息)

      d. 消息处理回调函数指针(⼀个消息发布后调⽤回调,选择消费者进⾏推送)

  • 需要管理的消费者操作:

    • 添加
    • 删除
    • 轮询获取指定队列的消费者
    • 移除队列所有消费者等操作
信道请求模块
  • 与服务端的信道类似,客⼾端这边在AMQP模型中,也是除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴

  • 需要管理的信道信息:

    • 信道ID

    • 信道关联的通信连接

    • 信道关联的消费者

    • 请求对应的响应信息队列(这⾥队列使⽤hash表,以便于查找指定的响应)

    • 互斥锁&条件变量(⼤部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应

      来进⾏同步)

  • 需要管理的信道操作:

    • 提供创建信道操作
    • 提供删除信道操作
    • 提供声明交换机操作(强断⾔-有则OK,没有则创建)
    • 提供删除交换机
    • 提供创建队列操作(强断⾔-有则OK,没有则创建)
    • 提供删除队列操作
    • 提供交换机-队列绑定操作
http://www.xdnf.cn/news/4984.html

相关文章:

  • 现代健康养生新范式:多维度守护身心活力
  • TypeScript 中,属性修饰符
  • pytest自动化测试框架搭建,并生成allure测试报告
  • 【C语言干货】一维数组传参本质
  • 如何用LOTO示波器测量变压器带宽?
  • 一篇文章讲清楚mysql的聚簇索引、非聚簇索引、辅助索引
  • BGA底部填充胶固化异常延迟或不固化原因分析及解决方案
  • 垃圾回收的三色标记算法
  • <el-cascader中多选多层级点击节点也选中
  • Harmonyos-属性修改器和更新器
  • 低代码云MES、轻量级部署、让智造更简单
  • 探索大语言模型(LLM):词袋法(Bag of Words)原理与实现
  • 参考文献怎么对齐操作
  • Python 基础知识
  • 网络流量分析 | Snort
  • LeetCode 216.组合总和 III:回溯算法实现与剪枝优化
  • SpringBoot快速入门WebSocket(​​JSR-356附Demo源码)
  • 为何Google广告频繁拒登?常见原因与解决方法
  • 图表制作-折线图堆叠
  • 允许别的电脑连接我电脑wsl下5001、5002端口
  • 枚举 · 例13-【模板】双指针
  • 《Scala基础》
  • DeepSeek 赋能金融:从智能分析到高效服务的全链路革新
  • WHAT - react-query(TanStack Query) vs swr 请求
  • VUE——自定义指令
  • LabVIEW 2019 与 NI VISA 20.0 安装及报错处理
  • IEEE PRMVAI Workshop 17 | 智能医疗数据分析与应用
  • Baklib云中台赋能企业内容智管
  • Kubernetes外部访问服务全攻略:生产级方案详解
  • 12.hbase 源码构建