当前位置: 首页 > ops >正文

rabbitmq 03

一、mq的作用和使用场景

MQ的基本作用

MQ(Message Queue,消息队列)是一种应用程序对应用程序的通信方法,主要作用包括:

  1. 异步处理:解耦生产者和消费者,允许生产者发送消息后立即返回,消费者异步处理

  2. 应用解耦:降低系统间的直接依赖,通过消息进行间接通信

  3. 流量削峰:缓冲突发流量,避免系统被压垮

  4. 消息通信:实现系统间的可靠消息传递

  5. 最终一致性:支持分布式事务的最终一致性方案

主要使用场景

1. 异步处理

  • 用户注册后发送邮件/短信:注册流程快速完成,通知类操作异步处理

  • 日志收集:应用将日志发送到MQ,由专门服务异步处理

2. 应用解耦

  • 电商订单系统:订单服务生成订单后通过MQ通知库存、物流、支付等系统

  • 微服务架构:服务间通过MQ通信而非直接调用

3. 流量削峰

  • 秒杀系统:将大量请求先放入MQ,按系统能力逐步处理

  • 突发流量处理:应对促销活动等流量高峰

4. 日志处理

  • 大数据分析:收集各系统日志到MQ,由大数据平台统一处理

  • 实时监控:系统指标通过MQ传输到监控平台

5. 消息通信

  • 聊天系统:用户消息通过MQ传递

  • 通知系统:系统间的事件通知

适用场景总结表

场景关键技术优势
应用解耦消息队列减少系统间直接依赖
异步处理生产者-消费者模型提升响应速度
流量削峰队列积压+限速消费保护后端系统
跨语言通信AMQP 多语言支持统一通信协议
发布/订阅Exchange(fanout/topic)一对多消息广播
延迟队列TTL + 死信队列实现定时任务

二、mq的优点

1. 解耦系统组件

  • 生产者和消费者无需相互感知对方的存在

  • 系统间通过消息通信而非直接调用,降低耦合度

  • 新增消费者不会影响生产者代码

2. 异步处理提升性能

  • 生产者发送消息后无需等待消费者处理完成

  • 非关键路径操作可异步执行(如发送通知、记录日志)

  • 显著减少系统响应时间,提高吞吐量

3. 流量削峰与过载保护

  • 缓冲突发流量,避免系统被瞬间高峰压垮

  • 消费者可按自身处理能力从队列获取消息

  • 特别适合秒杀、促销等瞬时高并发场景

4. 提高系统可靠性

  • 消息持久化确保重要数据不丢失

  • 重试机制和死信队列处理失败消息

  • 网络波动时仍能保证消息最终送达

5. 扩展性强

  • 可轻松增加消费者实例提高处理能力

  • 天然支持分布式系统架构

  • 各组件可独立扩展(生产者、MQ本身、消费者)

6. 顺序保证

  • 某些MQ(如Kafka)可保证消息顺序性

  • 对需要严格顺序的业务场景非常重要

7. 最终一致性支持

  • 实现分布式事务的最终一致性方案

  • 通过消息驱动的方式同步系统状态

  • 比强一致性方案性能更高

8. 灵活的通信模式

  • 支持点对点、发布/订阅等多种模式

  • 可实现广播、组播等不同消息分发方式

  • 适应各种业务场景需求

9. 系统恢复能力

  • 消费者宕机恢复后可从断点继续消费

  • 避免数据丢失或重复处理

  • 支持消息回溯重新消费

10. 平衡资源利用率

  • 平滑系统负载,避免资源闲置或过载

  • 提高整体资源使用效率

  • 降低系统建设成本(无需按峰值配置资源)

三、mq的缺点

1. 系统复杂度增加

  • 引入MQ后,系统架构变得更加复杂,需要额外维护MQ集群

  • 需要处理消息的发送、接收、确认、重试等逻辑

  • 增加了调试和问题排查的难度(如消息丢失、重复消费等)

2. 消息一致性问题

  • 消息丢失:生产者发送失败、MQ宕机、消费者处理失败都可能导致消息丢失

  • 消息重复:网络问题或消费者超时可能导致消息被重复消费(需业务层做幂等处理)

  • 顺序问题:某些MQ(如Kafka)只能保证分区内有序,全局有序需要额外设计

3. 延迟问题

  • 异步处理导致延迟:消息队列的消费通常是异步的,不适合实时性要求极高的场景(如支付交易)

  • 堆积时延迟加剧:如果消费者处理速度跟不上,消息堆积会导致延迟越来越高

4. 运维成本高

  • 集群管理:MQ本身需要高可用部署(如Kafka的ZooKeeper依赖、RabbitMQ的镜像队列)

  • 监控与告警:需监控消息积压、消费延迟、错误率等指标

  • 资源占用:MQ集群可能占用较多CPU、内存和磁盘IO

5. 数据一致性与事务问题

  • 分布式事务挑战:如果业务涉及数据库和MQ的协同(如扣库存+发消息),需要引入事务消息或本地消息表等方案

  • 最终一致性:MQ通常只保证最终一致性,不适合强一致性要求的场景

6. 依赖风险

  • MQ成为单点故障:如果MQ集群崩溃,可能导致整个系统不可用

  • 版本兼容性问题:MQ升级可能影响生产者和消费者的兼容性

7. 消息积压风险

  • 消费者处理能力不足:如果消费者宕机或处理缓慢,消息会堆积,可能导致MQ存储爆满

  • 影响新消息处理:积压严重时,新消息可能被阻塞或丢弃

8. 不适合所有场景

  • 低延迟场景:如高频交易、实时游戏,MQ的异步机制可能引入不可接受的延迟

  • 小规模系统:如果系统简单,直接调用可能比引入MQ更高效

四、mq相关产品,每种产品的特点

1. RabbitMQ

特点

  • 基于AMQP协议,支持多种客户端语言

  • 轻量级,易于部署和管理

  • 提供灵活的路由机制(直连/主题/扇出/头交换)

  • 支持消息确认、持久化、优先级队列

  • 集群部署相对简单

  • 社区活跃,文档完善

适用场景

  • 中小规模消息处理

  • 需要复杂路由规则的场景

  • 企业级应用集成

  • 对延迟要求不高的异步任务

2. Kafka

特点

  • 超高吞吐量(百万级TPS)

  • 分布式、高可用设计

  • 基于发布/订阅模式

  • 消息持久化存储(可配置保留时间)

  • 支持消息回溯和批量消费

  • 水平扩展能力强

  • 支持流式处理(Kafka Streams)

适用场景

  • 大数据日志收集与分析

  • 实时流处理

  • 高吞吐量消息系统

  • 事件溯源

  • 监控数据聚合

3. RocketMQ

特点

  • 阿里开源,经受双11考验

  • 支持事务消息

  • 严格的顺序消息

  • 支持消息轨迹查询

  • 分布式架构,高可用

  • 支持定时/延迟消息

  • 支持消息过滤

适用场景

  • 电商交易系统

  • 金融支付场景

  • 需要严格顺序的消息处理

  • 分布式事务场景

4. ActiveMQ

特点

  • 支持JMS规范

  • 支持多种协议(STOMP、AMQP、MQTT等)

  • 提供消息持久化和事务支持

  • 支持集群部署

  • 相对轻量级

适用场景

  • 传统企业应用集成

  • 需要JMS支持的场景

  • 中小型消息系统

  • IoT设备通信

5. Pulsar

特点

  • 云原生设计,计算存储分离架构

  • 支持多租户

  • 低延迟和高吞吐并存

  • 支持多种消费模式(独占/共享/故障转移)

  • 支持分层存储(热数据+冷数据)

  • 内置函数计算能力

适用场景

  • 云原生应用

  • 多租户SaaS平台

  • 需要统一消息和流处理的场景

  • 混合云部署

6. ZeroMQ

特点

  • 无中间件,基于库的方式

  • 极高性能(纳秒级延迟)

  • 支持多种通信模式(请求-响应/发布-订阅等)

  • 轻量级,无消息持久化

  • 无broker架构

适用场景

  • 高性能计算

  • 低延迟通信

  • 进程间通信

  • 不需要持久化的场景

7. NATS

特点

  • 极简设计,性能优异

  • 无持久化(NATS Streaming提供持久化扩展)

  • 支持请求-响应模式

  • 轻量级,适合云环境

  • 低资源消耗

适用场景

  • IoT设备通信

  • 云原生微服务

  • 不需要持久化的实时消息

  • 服务发现和配置分发

选型建议对比表

特性 \ MQRabbitMQKafkaRocketMQPulsarActiveMQ
吞吐量极高
延迟
顺序保证有限分区有序严格有序分区有序有限
持久化支持支持支持支持支持
事务支持有限支持支持支持支持
集群扩展中等容易中等容易中等
运维复杂度
适用规模中小超大中大中大中小

五、rabbitmq的搭建过程

Docker安装方式

# 拉取镜像
docker pull rabbitmq:management
​
# 运行容器
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:management

Linux安装方式

# 1. 安装Erlang
sudo apt-get install erlang
​
# 2. 下载RabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server_3.8.9-1_all.deb
​
# 3. 安装
sudo dpkg -i rabbitmq-server_3.8.9-1_all.deb
​
# 4. 启动服务
sudo systemctl start rabbitmq-server
​
# 5. 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management
​
# 6. 创建用户
sudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

六、rabbitmq相关角色

1. 生产者 (Publisher/Producer)

  • 职责:创建并发送消息到 RabbitMQ 服务器

  • 特点

    • 不直接将消息发送到队列,而是发送到交换器 (Exchange)

    • 可以设置消息属性(如持久化、优先级等)

    • 通常不知道消息最终会被哪些消费者接收

2. 消费者 (Consumer)

  • 职责:接收并处理来自队列的消息

  • 特点

    • 可以订阅一个或多个队列

    • 可以手动或自动确认消息 (ack/nack)

    • 可以设置 QoS(服务质量)控制预取数量

3. 代理服务器 (Broker)

  • 职责:RabbitMQ 服务本身,负责接收、路由和传递消息

  • 组成

    • Exchange(交换器)

    • Queue(队列)

    • Binding(绑定)

4. 交换器 (Exchange)

  • 类型

    类型路由规则典型用途
    Direct精确匹配 Routing Key点对点精确路由
    Fanout忽略 Routing Key,广播到所有绑定队列广播通知
    Topic模糊匹配 Routing Key(支持通配符)多条件路由
    Headers根据消息头属性匹配复杂路由条件
  • 特性

    • 接收生产者发送的消息

    • 根据类型和绑定规则将消息路由到队列

    • 可以持久化(在服务器重启后仍然存在)

5. 队列 (Queue)

  • 特性

    • 消息存储的缓冲区

    • 可以有多个消费者(竞争消费模式)

    • 可配置属性:

      • 持久化(Durable)

      • 自动删除(Auto-delete)

      • 排他性(Exclusive)

      • 消息 TTL(存活时间)

      • 最大长度等

6. 绑定 (Binding)

  • 作用:连接 Exchange 和 Queue 的规则

  • 组成要素

    • Exchange 名称

    • Queue 名称

    • Routing Key(或用于 Headers Exchange 的匹配参数)

7. 通道 (Channel)

  • 特点

    • 在 TCP 连接上建立的虚拟连接

    • 轻量级,减少 TCP 连接开销

    • 每个 Channel 有独立 ID

    • 建议每个线程使用独立的 Channel

8. 虚拟主机 (Virtual Host)

  • 作用:提供逻辑隔离环境

  • 特点

    • 类似于命名空间

    • 每个 vhost 有独立的 Exchange、Queue 和绑定

    • 需要单独配置权限

    • 默认 vhost 为 "/"

9. 管理员角色 (Administrator)

  • 权限

    • 管理用户权限

    • 创建/删除 vhost

    • 查看所有资源

    • 通常通过 rabbitmqctl 工具或管理界面操作

10. 插件系统 (Plugins)

  • 常见插件

    • rabbitmq_management:提供 Web 管理界面

    • rabbitmq_shovel:跨集群消息转移

    • rabbitmq_federation:分布式部署支持

    • rabbitmq_delayed_message_exchange:延迟消息

角色交互示意图

+------------+       +---------+       +-------+       +--------+
| Publisher  | ----> | Exchange| ====> | Queue | <---- | Consumer|
+------------+       +---------+       +-------+       +--------+(Binding)

七、rabbitmq内部组件

1、ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。

2、Channel(信道):消息推送使用的通道。

3、Exchange(交换器):用于接受、分配消息。

4、Queue(队列):用于存储生产者的消息。

5、RoutingKey(路由键):用于把生成者的数据分配到交换器上。

6、BindingKey(绑定键):用于把交换器的消息绑定到队列上。

八、生产者发送消息的过程?

一、建立连接阶段

  1. TCP连接建立

    • 生产者应用通过AMQP客户端库发起TCP连接

    • 默认端口5672(带管理插件时为5672/15672)

    • 三次握手完成后建立物理连接

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("rabbitmq-host");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
  2. 认证与vhost选择

    • 发送START/START-OK协议帧进行认证

    • 选择虚拟主机(vhost),默认"/"

    • 认证失败会收到CONNECTION-CLOSE帧

二、通道创建阶段

  1. 通道(Channel)初始化

    • 在TCP连接上创建虚拟通道(Channel)

    • 每个Channel有唯一ID(从1开始递增)

    • 通道参数协商:

      • Frame Max Size(默认128KB)

      • Channel Max(默认2047)

    Channel channel = connection.createChannel();

  2. 交换器声明(可选)

    • 检查目标Exchange是否存在

    • 不存在时根据参数自动创建

    • 关键参数:

      • type:exchange类型(direct/fanout/topic/headers)

      • durable:是否持久化

      • autoDelete:无绑定时是否自动删除

    channel.exchangeDeclare("order.exchange", "direct", true);

三、消息发布阶段

  1. 消息构造

    • 组成结构:

      {"body": "消息内容(二进制)","properties": {"delivery_mode": 2,  # 1-非持久化 2-持久化"priority": 0,       # 0-9优先级"headers": {},       # 自定义头"timestamp": 1620000000}
      }
  2. 发布消息到Exchange

    • 通过Basic.Publish命令发送

    • 关键参数:

      • exchange:目标交换器名称

      • routingKey:路由键

      • mandatory:是否触发Return回调

      • immediate:已废弃参数

    channel.basicPublish("order.exchange", "order.create", MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes
    );

四、消息路由阶段

  1. Exchange路由决策

    • 根据Exchange类型处理:

      • Direct:精确匹配routingKey

      • Fanout:忽略routingKey,广播到所有绑定队列

      • Topic:通配符匹配(*匹配一个词,#匹配零或多个词)

      • Headers:匹配header键值对

  2. 队列投递

    • 成功匹配:消息进入队列内存缓冲区

    • 无匹配时处理:

      • 设置了alternate-exchange:转到备用交换器

      • 未设置备用交换器且mandatory=true:触发Return回调

      • 否则丢弃消息

五、确认阶段

  1. Confirm模式(可选)

    • 开启方式:

      channel.confirmSelect();  // 开启Confirm模式

    • 确认机制:

      • 单条确认:waitForConfirms()

      • 批量确认:waitForConfirmsOrDie()

      • 异步回调:

        channel.addConfirmListener((sequenceNumber, multiple) -> {// 处理ack
        }, (sequenceNumber, multiple) -> {// 处理nack
        });

  2. 事务模式(可选)

    • 事务操作流程:

      channel.txSelect();  // 开启事务
      try {channel.basicPublish(...);channel.txCommit();  // 提交事务
      } catch (Exception e) {channel.txRollback(); // 回滚事务
      }

六、资源释放阶段

  1. 通道关闭

    • 发送Channel.Close命令

    • 处理未确认消息:

      • 事务模式:回滚未提交消息

      • Confirm模式:未确认消息会触发nack

  2. 连接关闭

    • 发送Connection.Close命令

    • 服务端释放相关资源

    • 客户端等待TCP连接正常关闭

九、消费者接收消息过程?

一、连接建立阶段

  1. TCP连接初始化

    • 消费者客户端与RabbitMQ服务器建立TCP连接(默认端口5672)

    • 完成AMQP协议握手:

  2. 通道创建

    • 在TCP连接上创建虚拟通道(Channel)

    • 每个Channel独立维护消息流状态

    • 关键参数设置:

      Channel channel = connection.createChannel();
      channel.basicQos(10); // 设置prefetch count

二、队列订阅阶段

  1. 队列声明与检查

    • 检查目标队列是否存在

    • 自动创建队列(如果不存在且允许):

      channel.queueDeclare("order.queue", true, false, false, null);
    • 队列参数解析:

      • durable:是否持久化

      • exclusive:是否排他队列

      • autoDelete:无消费者时是否自动删除

      • arguments:扩展参数(TTL、死信等)

  2. 消费者注册

    • 向Broker注册消费者标签(consumer tag)

    • 选择消费模式:

      • 推模式(Push API):服务端主动推送

      • 拉模式(Basic.Get):客户端主动拉取

三、消息接收阶段

  1. 消息推送机制

    • Broker按照QoS设置推送消息:

      while (unacked_count < prefetch_count) and (queue.has_messages):message = queue.next_message()send_to_consumer(message)unacked_count += 1
    • 消息帧结构:

      Basic.Deliver(consumer-tag,delivery-tag,redelivered,exchange,routing-key
      )
      Message Body
  2. 消息处理流程

    • 消费者接收消息后的处理步骤:

      1. 反序列化消息体

      2. 验证消息完整性

      3. 执行业务逻辑

      4. 发送ack/nack

      5. 处理异常情况

四、确认与反馈阶段

  1. 消息确认机制

    • 自动确认(autoAck=true)

      • 消息发出即视为成功

      • 高风险(消息可能处理失败但已确认)

    • 手动确认(autoAck=false)

      // 成功处理
      channel.basicAck(deliveryTag, false); 
      // 处理失败(requeue=true重新入队)
      channel.basicNack(deliveryTag, false, true);
    • 关键参数:

      • deliveryTag:消息唯一标识

      • multiple:是否批量操作

      • requeue:是否重新入队

  2. 拒绝消息处理

    • 三种拒绝方式对比:

      方法是否批量是否重入队列适用场景
      basicReject可配置单条消息处理失败
      basicNack可配置批量消息处理异常
      basicRecover-重新投递未ack消息

五、流量控制机制

  1. QoS预取设置

    • 作用:限制未确认消息数量

    • 全局 vs 通道级:

      // 单通道限制
      channel.basicQos(10); 
      // 全局限制(所有通道总和)
      channel.basicQos(10, true);
    • 最佳实践值:

      • 高吞吐场景:100-300

      • 高延迟任务:5-10

  2. 流控(Flow Control)

    • 当消费者处理能力不足时:

      1. Broker暂停发送新消息

      2. 触发Channel.Flow命令

      3. 消费者处理积压后恢复流动

六、异常处理阶段

  1. 连接中断处理

    • 自动恢复机制:

      factory.setAutomaticRecoveryEnabled(true);
      factory.setNetworkRecoveryInterval(5000);
    • 恢复过程:

      1. 重建TCP连接

      2. 恢复所有Channel

      3. 重新注册消费者

      4. 恢复未ack消息(根据redelivered标记)

  2. 死信处理

    • 触发条件:

      • 消息被拒绝且requeue=false

      • 消息TTL过期

      • 队列达到长度限制

    • 死信队列配置:

      Map<String, Object> args = new HashMap<>();
      args.put("x-dead-letter-exchange", "dlx.exchange");
      channel.queueDeclare("order.queue", true, false, false, args);

消费者最佳实践

  1. 幂等性设计

    // 使用消息ID实现幂等
    if (processedMessageIds.contains(messageId)) {channel.basicAck(tag, false);return;
    }
  2. 批量确认优化

    // 每处理100条消息批量确认一次
    if (messageCount % 100 == 0) {channel.basicAck(lastTag, true);
    }
  3. 死信监控

    // 监听死信队列
    channel.basicConsume("dlx.queue", false, (tag, msg) -> {log.error("死信消息: {}", msg.getBody());channel.basicAck(tag, false);
    });
  4. 消费者标签管理

    // 优雅关闭消费者
    void shutdown() {channel.basicCancel(consumerTag);// 等待处理中的消息完成while (inProgressCount > 0) {Thread.sleep(100);}
    }

十、springboot项目中如何使用mq?

十一、如何保障消息不丢失?

1、发送阶段:发送阶段保障消息到达交换机 事务机制|confirm确认机制

2、存储阶段:持久化机制 交换机持久化、队列的持久化、消息内容的持久化

3、消费阶段:消息的确认机制 自动ack|手动ack

接收方消息确认机制

自动ack|手动ack

spring:rabbitmq:host: 1.94.230.82port: 5672username: adminpassword: 123456virtual-host: /yan3listener:simple:acknowledge-mode: manualdirect:acknowledge-mode: manual
package com.hl.rabbitmq01.web;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
​
import java.io.IOException;
​
@RestController
@RequestMapping("/c")
public class ConsumerController {
​@RabbitListener(queues = {"topicQueue01"})public void receive(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());System.out.println(msg);//业务逻辑 比如传入订单id,根据订单id,减少库存、支付等,// 如果操作成功,确认消息(从队列移除),如果操作失败,手动拒绝消息if(msg.length() >= 5){//确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else{//拒绝消息 not ack// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}
​
​}
}

消息的持久化机制

交换机的持久化

队列的持久化

消息内容的持久化

package com.hl.rabbitmq01.direct;
​
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/*
生产者  javaSE方式简单测试
发布订阅-------direct模型
生产者----消息队列----消费者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接Connection connection = MQUtil.getConnection();//2、基于连接,创建信道Channel channel = connection.createChannel();//3、基于信道,创建队列/*参数:1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建2. durable:是否持久化,当mq重启之后,消息还在3. exclusive:* 是否独占。只能有一个消费者监听这队列4。当Connection关闭时,是否删除队列autoDelete:是否自动删除。当没有Consumer时,自动删除掉5. arguments:参数。*/channel.queueDeclare("directQueue01", true, false, false, null);channel.queueDeclare("directQueue02", false, false, false, null);/*声明交换机参数1:交换机名称参数2:交换机类型*/channel.exchangeDeclare("directExchange01", BuiltinExchangeType.DIRECT,true);/*绑定交换机和队列参数1:队列名参数2:交换机名称参数3:路由key 广播模型 不支持路由key  ""*/channel.queueBind("directQueue01","directExchange01","error");channel.queueBind("directQueue02","directExchange01","error");channel.queueBind("directQueue02","directExchange01","info");channel.queueBind("directQueue02","directExchange01","trace");//发送消息到消息队列/*参数:1. exchange:交换机名称。简单模式下交换机会使用默认的 ""2. routingKey:路由名称,简单模式下路由名称使用消息队列名称3. props:配置信息4. body:发送消息数据*/
​channel.basicPublish("directExchange01","user", MessageProperties.PERSISTENT_TEXT_PLAIN,("Hello World ").getBytes());
​
​//4、关闭信道,断开连接channel.close();connection.close();}
}
package com.hl.rabbitmq01.web;
​
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
​
import java.io.IOException;
import java.nio.charset.StandardCharsets;
​
@RestController
@RequestMapping("/p")
public class ProducerController {@Autowiredprivate AmqpTemplate amqpTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;
​
​@RequestMapping("/send")public void send(@RequestParam(defaultValue = "user") String key,@RequestParam(defaultValue = "hello") String msg) throws IOException {//amqpTemplate.convertAndSend("topicExchange", key, msg);
//        rabbitTemplate.convertAndSend("topicExchange",key,msg);Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false); //false 非事务模式运行 无需手动提交channel.basicPublish("topicExchange", key,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}
}
​
/*
创建交换机*/
@Bean
public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange("topicExchange").durable(true)  //是否支持持久化机制.build();
}
/*
创建队列*/
@Bean
public Queue queue(){return QueueBuilder.durable("topicQueue01").build();
}

发送方的消息确认机制

1、事务机制

消耗资源

RabbitMQ中与事务有关的主要有三个方法:

  • txSelect() 开始事务

  • txCommit() 提交事务

  • txRollback() 回滚事务

txSelect主要用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。

当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。

示例

@RestController
public class RabbitMQController {
​@Autowiredprivate RabbitTemplate rabbitTemplate;
​@RequestMapping("/send")public String sendMessage(String message){rabbitTemplate.setChannelTransacted(true); //开启事务操作rabbitTemplate.execute(channel -> {try {channel.txSelect();//开启事务
​channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
​int i = 5/0;
​channel.txCommit();//没有问题提交事务}catch (Exception e){e.printStackTrace();channel.txRollback();//有问题回滚事务}
​return null;});
​return "success";}
​
}

消费者没有任何变化。

通过测试会发现,发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常,就会捕获异常通过txRollback方法进行回滚事务了,则消息不会发送,消费者就获取不到消息。

2、confirm确认机制

推荐

同步通知
channel.confirmSelect(); //开始confirm操作
​
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
​
if (channel.waitForConfirms()){System.out.println("发送成功");
}else{//进行消息重发System.out.println("消息发送失败,进行消息重发");
}

异步通知
channel.confirmSelect();
​
channel.addConfirmListener(new ConfirmListener() {//消息正确到达broker,就会发送一条ack消息@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println("发送消息成功");}
​//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("发送消息失败,重新发送消息");}
});
​
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
​

十二、死信交换机和死信队列

在实际开发项目时,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中进行处理。

死信队列:RabbitMQ中并不是直接声明一个公共的死信队列,然后死信消息就会跑到死信队列中。而是为每个需要使用死信的消息队列配置一个死信交换机,当消息成为死信后,可以被重新发送到死信交换机,然后再发送给使用死信的消息队列。

死信交换机:英文缩写:DLX 。Dead Letter Exchange(死信交换机),死信交换机其实就是普通的交换机,通过给队列设置参数: x-dead-letter-exchange 和x-dead-letter-routing-key,来指向死信交换机

RabbitMQ规定消息符合以下某种情况时,将会成为死信

  • 队列消息长度到达限制(队列消息个数限制);

  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  • 原队列存在消息过期设置,消息到达超时时间未被消费;

死信消息会被RabbitMQ特殊处理,如果配置了死信队列,则消息会被丢到死信队列中,如果没有配置死信队列,则消息会被丢弃。

Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange","deadExchange");//当前队列和死信交换机绑定map.put("x-dead-letter-routing-key","user.#");//当前队列和死信交换机绑定的路由规则
//        map.put("x-max-length",2);//队列长度map.put("x-message-ttl",10000);//队列消息过期时间,时间ms
​
//        return QueueBuilder.durable("topicQueue01").build();return QueueBuilder.durable("topicQueue").withArguments(map).build();

十三、延迟队列简介

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

RabbitMQ中没有延迟队列,但是可以用ttl+死信队列方式和延迟插件两种方式来实现

ttl+死信队列

ttl+死信队列代码在讲死信队列时已经实现,这个不再阐述。

延迟插件

人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送。

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

十四、RabbitMQ消息重复消费

RabbitMQ消息重复消费问题_rabbitmq重复消费的问题解决-CSDN博客

业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题。

比如,用户到银行取钱后会收到扣款通知短信,如果用户收到多条扣款信息通知则会有困惑。

解决方法一:send if not exist 首先将 RabbitMQ 的消息自动确认机制改为手动确认,然后每当有一条消息消费成功了,就把该消息的唯一ID记录在Redis 上,然后每次发送消息时,都先去 Redis 上查看是否有该消息的 ID,如果有,表示该消息已经消费过了,不再处理,否则再去处理。

2.1 利用数据库唯一约束实现幂等

解决方法二:insert if not exist 可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据

解决方法三:sql的乐观锁 比如给用户发送短信,变成如果该用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,每修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。

十五、RabbitMQ消息积压

RabbitMq——消息积压分析和解决思路_rabbitmq消息积压-CSDN博客

消息积压产生的原因 正常而言,一般的消息从消息产生到消息消费需要经过以下几种阶段。

以Direct模式为例:

消息由生产者产生,比如新订单的创建等,经过交换机,将消息发送至指定的队列中,然后提供给对应的消费者进行消费。

在这个链路中,存在消息积压的原因大致分为以下几种:

1、消费者宕机,导致消息队列中的消息无法及时被消费,出现积压。 2、消费者没有宕机,但因为本身逻辑处理数据耗时,导致消费者消费能力不足,引起队列消息积压。 3、消息生产方单位时间内产生消息过多,比如“双11大促活动”,导致消费者处理不过来。 消息积压问题解决 针对上面消息积压问题的出现,大致进行了分析,那么根据分析则能制定相关的应对方法。如下所示:

1、大促活动等,导致生产者流量过大,引起积压问题。

提前增加服务器的数量,增加消费者数目,提升消费者针对指定队列消息处理的效率。

2、上线更多的消费者,处理消息队列中的数据。(和1中的大致类似)

3、如果成本有限,则可以专门针对这个队列,编写一个另类的消费者。

当前另类消费者,不进行复杂逻辑处理,只将消息从队列中取出,存放至数据库中,然后basicAck反馈给消息队列。

十六、消息入库(消息补偿)

如果RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,这样也不太好进行处理。所以为了避免RabbitMQ持久化失败而导致数据丢失,我们自己也要做一些消息补偿机制,以应对一些极端情况。

在使用消息队列(Message Queue)时,消息的补偿机制是一种处理消息处理失败或异常情况的方法。当消息消费者无法成功处理消息时,补偿机制允许系统将消息重新发送或执行其他操作,以确保消息的可靠传递和处理。

补偿机制通常涉及以下几个方面:

  1. 重试机制:当消息处理失败时,补偿机制会尝试重新发送消息给消费者,以便重新处理。重试间隔和重试次数可以根据具体情况进行配置,以避免重复投递导致的消息处理失败。

  2. 延时队列:补偿机制还可以使用延时队列来处理无法立即处理的消息。当某个消息处理失败时,可以将该消息放入到延时队列中,在一定的延时之后再次尝试发送给消费者进行处理。

  3. 死信队列:当消息无法被成功处理时,可以将这些无法处理的消息发送到死信队列(Dead Letter Queue)。死信队列通常用于存储无法被消费者处理的消息,以便后续进行排查和处理。

  4. 可视化监控和报警:补偿机制还可以包括对消息队列的监控和报警功能,以便及时发现和处理异常情况。通过可视化监控工具可以实时查看消息队列的状态和处理情况,及时发现问题并采取相应的补救措施。

补偿机制的设计和实现密切依赖于具体的消息中间件和使用场景,不同的消息队列系统可能提供不同的补偿机制。因此,在选择和使用消息队列时,需要根据自身的需求和系统特点来选择适合的消息补偿机制。

http://www.xdnf.cn/news/16022.html

相关文章:

  • HTTP 协议常见字段(请求头/响应头)
  • 按键精灵脚本:自动化利刃的双面性 - 从技术原理到深度实践与反思
  • 大型语言模型(Large Language Models,LLM)
  • 循环神经网络--NLP基础
  • LINUX 722 逻辑卷快照
  • 单细胞转录组学+空间转录组的整合及思路
  • MySQL 学习二 MVCC
  • Python -- logging --日志模块
  • VUE2 项目学习笔记 ? 语法 v-if/v-show
  • 使用docker(ubuntu)搭建web环境(php,apahce2)
  • 无人机吊舱与遥控器匹配技术解析
  • LeetCode 热题100:42.接雨水
  • 如何在 Windows 10 下部署多个 PHP 版本7.4,8.2
  • 从零搭建 OpenCV 项目(新手向)--第一天初识OpenCV与图像基础
  • javaweb小案例1
  • 开源AI智能客服、AI智能名片与S2B2C商城小程序在客户复购与转介绍中的协同效应研究
  • 在腾讯云上安装gitlab
  • Qt开发环境搭建全攻略(Windows+Linux+macOS)
  • 【Altium Designer2025】电子设计自动化(EDA)软件——Altium Designer25版保姆级下载安装详细图文教程(附安装包)
  • 基于JAVA实现基于“obj--html--pdf” 的PDF格式文本生成
  • linux内核与GNU之间的联系和区别
  • 【QT常用技术讲解】QSystemTrayIcon系统托盘
  • 大模型——Data Agent:超越 BI 与 AI 的边界
  • 跨境企业破局国际市场:海外媒体发稿如何为品牌声誉赋能?
  • 算法笔记之堆排序
  • Docker实战:使用Docker部署TeamMapper思维导图工具
  • haproxy七层代理新手入门详解
  • EasyMan 数字人服务全面焕新,交互型AI数字人助推孪生体验全新升级
  • 大模型——上下文工程如何重塑智能体的“思考方式”
  • 【接口自动化】掌握接口自动化:核心概念讲解(理论知识)