rabbitmq 03
一、mq的作用和使用场景
MQ的基本作用
MQ(Message Queue,消息队列)是一种应用程序对应用程序的通信方法,主要作用包括:
-
异步处理:解耦生产者和消费者,允许生产者发送消息后立即返回,消费者异步处理
-
应用解耦:降低系统间的直接依赖,通过消息进行间接通信
-
流量削峰:缓冲突发流量,避免系统被压垮
-
消息通信:实现系统间的可靠消息传递
-
最终一致性:支持分布式事务的最终一致性方案
主要使用场景
1. 异步处理
-
用户注册后发送邮件/短信:注册流程快速完成,通知类操作异步处理
-
日志收集:应用将日志发送到MQ,由专门服务异步处理
2. 应用解耦
-
电商订单系统:订单服务生成订单后通过MQ通知库存、物流、支付等系统
-
微服务架构:服务间通过MQ通信而非直接调用
3. 流量削峰
-
秒杀系统:将大量请求先放入MQ,按系统能力逐步处理
-
突发流量处理:应对促销活动等流量高峰
4. 日志处理
-
大数据分析:收集各系统日志到MQ,由大数据平台统一处理
-
实时监控:系统指标通过MQ传输到监控平台
5. 消息通信
-
聊天系统:用户消息通过MQ传递
-
通知系统:系统间的事件通知
适用场景总结表
场景 | 关键技术 | 优势 |
---|---|---|
应用解耦 | 消息队列 | 减少系统间直接依赖 |
异步处理 | 生产者-消费者模型 | 提升响应速度 |
流量削峰 | 队列积压+限速消费 | 保护后端系统 |
跨语言通信 | AMQP 多语言支持 | 统一通信协议 |
发布/订阅 | Exchange(fanout/topic) | 一对多消息广播 |
延迟队列 | TTL + 死信队列 | 实现定时任务 |
二、mq的优点
1. 解耦系统组件
-
生产者和消费者无需相互感知对方的存在
-
系统间通过消息通信而非直接调用,降低耦合度
-
新增消费者不会影响生产者代码
2. 异步处理提升性能
-
生产者发送消息后无需等待消费者处理完成
-
非关键路径操作可异步执行(如发送通知、记录日志)
-
显著减少系统响应时间,提高吞吐量
3. 流量削峰与过载保护
-
缓冲突发流量,避免系统被瞬间高峰压垮
-
消费者可按自身处理能力从队列获取消息
-
特别适合秒杀、促销等瞬时高并发场景
4. 提高系统可靠性
-
消息持久化确保重要数据不丢失
-
重试机制和死信队列处理失败消息
-
网络波动时仍能保证消息最终送达
5. 扩展性强
-
可轻松增加消费者实例提高处理能力
-
天然支持分布式系统架构
-
各组件可独立扩展(生产者、MQ本身、消费者)
6. 顺序保证
-
某些MQ(如Kafka)可保证消息顺序性
-
对需要严格顺序的业务场景非常重要
7. 最终一致性支持
-
实现分布式事务的最终一致性方案
-
通过消息驱动的方式同步系统状态
-
比强一致性方案性能更高
8. 灵活的通信模式
-
支持点对点、发布/订阅等多种模式
-
可实现广播、组播等不同消息分发方式
-
适应各种业务场景需求
9. 系统恢复能力
-
消费者宕机恢复后可从断点继续消费
-
避免数据丢失或重复处理
-
支持消息回溯重新消费
10. 平衡资源利用率
-
平滑系统负载,避免资源闲置或过载
-
提高整体资源使用效率
-
降低系统建设成本(无需按峰值配置资源)
三、mq的缺点
1. 系统复杂度增加
-
引入MQ后,系统架构变得更加复杂,需要额外维护MQ集群
-
需要处理消息的发送、接收、确认、重试等逻辑
-
增加了调试和问题排查的难度(如消息丢失、重复消费等)
2. 消息一致性问题
-
消息丢失:生产者发送失败、MQ宕机、消费者处理失败都可能导致消息丢失
-
消息重复:网络问题或消费者超时可能导致消息被重复消费(需业务层做幂等处理)
-
顺序问题:某些MQ(如Kafka)只能保证分区内有序,全局有序需要额外设计
3. 延迟问题
-
异步处理导致延迟:消息队列的消费通常是异步的,不适合实时性要求极高的场景(如支付交易)
-
堆积时延迟加剧:如果消费者处理速度跟不上,消息堆积会导致延迟越来越高
4. 运维成本高
-
集群管理:MQ本身需要高可用部署(如Kafka的ZooKeeper依赖、RabbitMQ的镜像队列)
-
监控与告警:需监控消息积压、消费延迟、错误率等指标
-
资源占用:MQ集群可能占用较多CPU、内存和磁盘IO
5. 数据一致性与事务问题
-
分布式事务挑战:如果业务涉及数据库和MQ的协同(如扣库存+发消息),需要引入事务消息或本地消息表等方案
-
最终一致性:MQ通常只保证最终一致性,不适合强一致性要求的场景
6. 依赖风险
-
MQ成为单点故障:如果MQ集群崩溃,可能导致整个系统不可用
-
版本兼容性问题:MQ升级可能影响生产者和消费者的兼容性
7. 消息积压风险
-
消费者处理能力不足:如果消费者宕机或处理缓慢,消息会堆积,可能导致MQ存储爆满
-
影响新消息处理:积压严重时,新消息可能被阻塞或丢弃
8. 不适合所有场景
-
低延迟场景:如高频交易、实时游戏,MQ的异步机制可能引入不可接受的延迟
-
小规模系统:如果系统简单,直接调用可能比引入MQ更高效
四、mq相关产品,每种产品的特点
1. RabbitMQ
特点:
-
基于AMQP协议,支持多种客户端语言
-
轻量级,易于部署和管理
-
提供灵活的路由机制(直连/主题/扇出/头交换)
-
支持消息确认、持久化、优先级队列
-
集群部署相对简单
-
社区活跃,文档完善
适用场景:
-
中小规模消息处理
-
需要复杂路由规则的场景
-
企业级应用集成
-
对延迟要求不高的异步任务
2. Kafka
特点:
-
超高吞吐量(百万级TPS)
-
分布式、高可用设计
-
基于发布/订阅模式
-
消息持久化存储(可配置保留时间)
-
支持消息回溯和批量消费
-
水平扩展能力强
-
支持流式处理(Kafka Streams)
适用场景:
-
大数据日志收集与分析
-
实时流处理
-
高吞吐量消息系统
-
事件溯源
-
监控数据聚合
3. RocketMQ
特点:
-
阿里开源,经受双11考验
-
支持事务消息
-
严格的顺序消息
-
支持消息轨迹查询
-
分布式架构,高可用
-
支持定时/延迟消息
-
支持消息过滤
适用场景:
-
电商交易系统
-
金融支付场景
-
需要严格顺序的消息处理
-
分布式事务场景
4. ActiveMQ
特点:
-
支持JMS规范
-
支持多种协议(STOMP、AMQP、MQTT等)
-
提供消息持久化和事务支持
-
支持集群部署
-
相对轻量级
适用场景:
-
传统企业应用集成
-
需要JMS支持的场景
-
中小型消息系统
-
IoT设备通信
5. Pulsar
特点:
-
云原生设计,计算存储分离架构
-
支持多租户
-
低延迟和高吞吐并存
-
支持多种消费模式(独占/共享/故障转移)
-
支持分层存储(热数据+冷数据)
-
内置函数计算能力
适用场景:
-
云原生应用
-
多租户SaaS平台
-
需要统一消息和流处理的场景
-
混合云部署
6. ZeroMQ
特点:
-
无中间件,基于库的方式
-
极高性能(纳秒级延迟)
-
支持多种通信模式(请求-响应/发布-订阅等)
-
轻量级,无消息持久化
-
无broker架构
适用场景:
-
高性能计算
-
低延迟通信
-
进程间通信
-
不需要持久化的场景
7. NATS
特点:
-
极简设计,性能优异
-
无持久化(NATS Streaming提供持久化扩展)
-
支持请求-响应模式
-
轻量级,适合云环境
-
低资源消耗
适用场景:
-
IoT设备通信
-
云原生微服务
-
不需要持久化的实时消息
-
服务发现和配置分发
选型建议对比表
特性 \ MQ | RabbitMQ | Kafka | RocketMQ | Pulsar | ActiveMQ |
---|---|---|---|---|---|
吞吐量 | 中 | 极高 | 高 | 高 | 中 |
延迟 | 低 | 中 | 低 | 低 | 中 |
顺序保证 | 有限 | 分区有序 | 严格有序 | 分区有序 | 有限 |
持久化 | 支持 | 支持 | 支持 | 支持 | 支持 |
事务支持 | 有限 | 支持 | 支持 | 支持 | 支持 |
集群扩展 | 中等 | 容易 | 中等 | 容易 | 中等 |
运维复杂度 | 低 | 高 | 中 | 中 | 低 |
适用规模 | 中小 | 超大 | 中大 | 中大 | 中小 |
五、rabbitmq的搭建过程
Docker安装方式:
# 拉取镜像
docker pull rabbitmq:management
# 运行容器
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:management
Linux安装方式:
# 1. 安装Erlang
sudo apt-get install erlang
# 2. 下载RabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server_3.8.9-1_all.deb
# 3. 安装
sudo dpkg -i rabbitmq-server_3.8.9-1_all.deb
# 4. 启动服务
sudo systemctl start rabbitmq-server
# 5. 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management
# 6. 创建用户
sudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
六、rabbitmq相关角色
1. 生产者 (Publisher/Producer)
-
职责:创建并发送消息到 RabbitMQ 服务器
-
特点:
-
不直接将消息发送到队列,而是发送到交换器 (Exchange)
-
可以设置消息属性(如持久化、优先级等)
-
通常不知道消息最终会被哪些消费者接收
-
2. 消费者 (Consumer)
-
职责:接收并处理来自队列的消息
-
特点:
-
可以订阅一个或多个队列
-
可以手动或自动确认消息 (ack/nack)
-
可以设置 QoS(服务质量)控制预取数量
-
3. 代理服务器 (Broker)
-
职责:RabbitMQ 服务本身,负责接收、路由和传递消息
-
组成:
-
Exchange(交换器)
-
Queue(队列)
-
Binding(绑定)
-
4. 交换器 (Exchange)
-
类型:
类型 路由规则 典型用途 Direct 精确匹配 Routing Key 点对点精确路由 Fanout 忽略 Routing Key,广播到所有绑定队列 广播通知 Topic 模糊匹配 Routing Key(支持通配符) 多条件路由 Headers 根据消息头属性匹配 复杂路由条件 -
特性:
-
接收生产者发送的消息
-
根据类型和绑定规则将消息路由到队列
-
可以持久化(在服务器重启后仍然存在)
-
5. 队列 (Queue)
-
特性:
-
消息存储的缓冲区
-
可以有多个消费者(竞争消费模式)
-
可配置属性:
-
持久化(Durable)
-
自动删除(Auto-delete)
-
排他性(Exclusive)
-
消息 TTL(存活时间)
-
最大长度等
-
-
6. 绑定 (Binding)
-
作用:连接 Exchange 和 Queue 的规则
-
组成要素:
-
Exchange 名称
-
Queue 名称
-
Routing Key(或用于 Headers Exchange 的匹配参数)
-
7. 通道 (Channel)
-
特点:
-
在 TCP 连接上建立的虚拟连接
-
轻量级,减少 TCP 连接开销
-
每个 Channel 有独立 ID
-
建议每个线程使用独立的 Channel
-
8. 虚拟主机 (Virtual Host)
-
作用:提供逻辑隔离环境
-
特点:
-
类似于命名空间
-
每个 vhost 有独立的 Exchange、Queue 和绑定
-
需要单独配置权限
-
默认 vhost 为 "/"
-
9. 管理员角色 (Administrator)
-
权限:
-
管理用户权限
-
创建/删除 vhost
-
查看所有资源
-
通常通过 rabbitmqctl 工具或管理界面操作
-
10. 插件系统 (Plugins)
-
常见插件:
-
rabbitmq_management
:提供 Web 管理界面 -
rabbitmq_shovel
:跨集群消息转移 -
rabbitmq_federation
:分布式部署支持 -
rabbitmq_delayed_message_exchange
:延迟消息
-
角色交互示意图
+------------+ +---------+ +-------+ +--------+
| Publisher | ----> | Exchange| ====> | Queue | <---- | Consumer|
+------------+ +---------+ +-------+ +--------+(Binding)
七、rabbitmq内部组件
1、ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。
2、Channel(信道):消息推送使用的通道。
3、Exchange(交换器):用于接受、分配消息。
4、Queue(队列):用于存储生产者的消息。
5、RoutingKey(路由键):用于把生成者的数据分配到交换器上。
6、BindingKey(绑定键):用于把交换器的消息绑定到队列上。
八、生产者发送消息的过程?
一、建立连接阶段
-
TCP连接建立
-
生产者应用通过AMQP客户端库发起TCP连接
-
默认端口5672(带管理插件时为5672/15672)
-
三次握手完成后建立物理连接
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("rabbitmq-host"); factory.setPort(5672); Connection connection = factory.newConnection();
-
-
认证与vhost选择
-
发送START/START-OK协议帧进行认证
-
选择虚拟主机(vhost),默认"/"
-
认证失败会收到CONNECTION-CLOSE帧
-
二、通道创建阶段
-
通道(Channel)初始化
-
在TCP连接上创建虚拟通道(Channel)
-
每个Channel有唯一ID(从1开始递增)
-
通道参数协商:
-
Frame Max Size(默认128KB)
-
Channel Max(默认2047)
-
Channel channel = connection.createChannel();
-
-
交换器声明(可选)
-
检查目标Exchange是否存在
-
不存在时根据参数自动创建
-
关键参数:
-
type:exchange类型(direct/fanout/topic/headers)
-
durable:是否持久化
-
autoDelete:无绑定时是否自动删除
-
channel.exchangeDeclare("order.exchange", "direct", true);
-
三、消息发布阶段
-
消息构造
-
组成结构:
{"body": "消息内容(二进制)","properties": {"delivery_mode": 2, # 1-非持久化 2-持久化"priority": 0, # 0-9优先级"headers": {}, # 自定义头"timestamp": 1620000000} }
-
-
发布消息到Exchange
-
通过Basic.Publish命令发送
-
关键参数:
-
exchange:目标交换器名称
-
routingKey:路由键
-
mandatory:是否触发Return回调
-
immediate:已废弃参数
-
channel.basicPublish("order.exchange", "order.create", MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes );
-
四、消息路由阶段
-
Exchange路由决策
-
根据Exchange类型处理:
-
Direct:精确匹配routingKey
-
Fanout:忽略routingKey,广播到所有绑定队列
-
Topic:通配符匹配(*匹配一个词,#匹配零或多个词)
-
Headers:匹配header键值对
-
-
-
队列投递
-
成功匹配:消息进入队列内存缓冲区
-
无匹配时处理:
-
设置了alternate-exchange:转到备用交换器
-
未设置备用交换器且mandatory=true:触发Return回调
-
否则丢弃消息
-
-
五、确认阶段
-
Confirm模式(可选)
-
开启方式:
channel.confirmSelect(); // 开启Confirm模式
-
确认机制:
-
单条确认:
waitForConfirms()
-
批量确认:
waitForConfirmsOrDie()
-
异步回调:
channel.addConfirmListener((sequenceNumber, multiple) -> {// 处理ack }, (sequenceNumber, multiple) -> {// 处理nack });
-
-
-
事务模式(可选)
-
事务操作流程:
channel.txSelect(); // 开启事务 try {channel.basicPublish(...);channel.txCommit(); // 提交事务 } catch (Exception e) {channel.txRollback(); // 回滚事务 }
-
六、资源释放阶段
-
通道关闭
-
发送Channel.Close命令
-
处理未确认消息:
-
事务模式:回滚未提交消息
-
Confirm模式:未确认消息会触发nack
-
-
-
连接关闭
-
发送Connection.Close命令
-
服务端释放相关资源
-
客户端等待TCP连接正常关闭
-
九、消费者接收消息过程?
一、连接建立阶段
-
TCP连接初始化
-
消费者客户端与RabbitMQ服务器建立TCP连接(默认端口5672)
-
完成AMQP协议握手:
-
-
通道创建
-
在TCP连接上创建虚拟通道(Channel)
-
每个Channel独立维护消息流状态
-
关键参数设置:
Channel channel = connection.createChannel(); channel.basicQos(10); // 设置prefetch count
-
二、队列订阅阶段
-
队列声明与检查
-
检查目标队列是否存在
-
自动创建队列(如果不存在且允许):
channel.queueDeclare("order.queue", true, false, false, null);
-
队列参数解析:
-
durable:是否持久化
-
exclusive:是否排他队列
-
autoDelete:无消费者时是否自动删除
-
arguments:扩展参数(TTL、死信等)
-
-
-
消费者注册
-
向Broker注册消费者标签(consumer tag)
-
选择消费模式:
-
推模式(Push API):服务端主动推送
-
拉模式(Basic.Get):客户端主动拉取
-
-
三、消息接收阶段
-
消息推送机制
-
Broker按照QoS设置推送消息:
while (unacked_count < prefetch_count) and (queue.has_messages):message = queue.next_message()send_to_consumer(message)unacked_count += 1
-
消息帧结构:
Basic.Deliver(consumer-tag,delivery-tag,redelivered,exchange,routing-key ) Message Body
-
-
消息处理流程
-
消费者接收消息后的处理步骤:
-
反序列化消息体
-
验证消息完整性
-
执行业务逻辑
-
发送ack/nack
-
处理异常情况
-
-
四、确认与反馈阶段
-
消息确认机制
-
自动确认(autoAck=true):
-
消息发出即视为成功
-
高风险(消息可能处理失败但已确认)
-
-
手动确认(autoAck=false):
// 成功处理 channel.basicAck(deliveryTag, false); // 处理失败(requeue=true重新入队) channel.basicNack(deliveryTag, false, true);
-
关键参数:
-
deliveryTag:消息唯一标识
-
multiple:是否批量操作
-
requeue:是否重新入队
-
-
-
拒绝消息处理
-
三种拒绝方式对比:
方法 是否批量 是否重入队列 适用场景 basicReject 否 可配置 单条消息处理失败 basicNack 是 可配置 批量消息处理异常 basicRecover - 是 重新投递未ack消息
-
五、流量控制机制
-
QoS预取设置
-
作用:限制未确认消息数量
-
全局 vs 通道级:
// 单通道限制 channel.basicQos(10); // 全局限制(所有通道总和) channel.basicQos(10, true);
-
最佳实践值:
-
高吞吐场景:100-300
-
高延迟任务:5-10
-
-
-
流控(Flow Control)
-
当消费者处理能力不足时:
-
Broker暂停发送新消息
-
触发Channel.Flow命令
-
消费者处理积压后恢复流动
-
-
六、异常处理阶段
-
连接中断处理
-
自动恢复机制:
factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000);
-
恢复过程:
-
重建TCP连接
-
恢复所有Channel
-
重新注册消费者
-
恢复未ack消息(根据redelivered标记)
-
-
-
死信处理
-
触发条件:
-
消息被拒绝且requeue=false
-
消息TTL过期
-
队列达到长度限制
-
-
死信队列配置:
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); channel.queueDeclare("order.queue", true, false, false, args);
-
消费者最佳实践
-
幂等性设计
// 使用消息ID实现幂等 if (processedMessageIds.contains(messageId)) {channel.basicAck(tag, false);return; }
-
批量确认优化
// 每处理100条消息批量确认一次 if (messageCount % 100 == 0) {channel.basicAck(lastTag, true); }
-
死信监控
// 监听死信队列 channel.basicConsume("dlx.queue", false, (tag, msg) -> {log.error("死信消息: {}", msg.getBody());channel.basicAck(tag, false); });
-
消费者标签管理
// 优雅关闭消费者 void shutdown() {channel.basicCancel(consumerTag);// 等待处理中的消息完成while (inProgressCount > 0) {Thread.sleep(100);} }
十、springboot项目中如何使用mq?
十一、如何保障消息不丢失?
1、发送阶段:发送阶段保障消息到达交换机 事务机制|confirm确认机制
2、存储阶段:持久化机制 交换机持久化、队列的持久化、消息内容的持久化
3、消费阶段:消息的确认机制 自动ack|手动ack
接收方消息确认机制
自动ack|手动ack
spring:rabbitmq:host: 1.94.230.82port: 5672username: adminpassword: 123456virtual-host: /yan3listener:simple:acknowledge-mode: manualdirect:acknowledge-mode: manual
package com.hl.rabbitmq01.web;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
@RestController
@RequestMapping("/c")
public class ConsumerController {
@RabbitListener(queues = {"topicQueue01"})public void receive(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());System.out.println(msg);//业务逻辑 比如传入订单id,根据订单id,减少库存、支付等,// 如果操作成功,确认消息(从队列移除),如果操作失败,手动拒绝消息if(msg.length() >= 5){//确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else{//拒绝消息 not ack// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}
}
}
消息的持久化机制
交换机的持久化
队列的持久化
消息内容的持久化
package com.hl.rabbitmq01.direct;
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
生产者 javaSE方式简单测试
发布订阅-------direct模型
生产者----消息队列----消费者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接Connection connection = MQUtil.getConnection();//2、基于连接,创建信道Channel channel = connection.createChannel();//3、基于信道,创建队列/*参数:1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建2. durable:是否持久化,当mq重启之后,消息还在3. exclusive:* 是否独占。只能有一个消费者监听这队列4。当Connection关闭时,是否删除队列autoDelete:是否自动删除。当没有Consumer时,自动删除掉5. arguments:参数。*/channel.queueDeclare("directQueue01", true, false, false, null);channel.queueDeclare("directQueue02", false, false, false, null);/*声明交换机参数1:交换机名称参数2:交换机类型*/channel.exchangeDeclare("directExchange01", BuiltinExchangeType.DIRECT,true);/*绑定交换机和队列参数1:队列名参数2:交换机名称参数3:路由key 广播模型 不支持路由key ""*/channel.queueBind("directQueue01","directExchange01","error");channel.queueBind("directQueue02","directExchange01","error");channel.queueBind("directQueue02","directExchange01","info");channel.queueBind("directQueue02","directExchange01","trace");//发送消息到消息队列/*参数:1. exchange:交换机名称。简单模式下交换机会使用默认的 ""2. routingKey:路由名称,简单模式下路由名称使用消息队列名称3. props:配置信息4. body:发送消息数据*/
channel.basicPublish("directExchange01","user", MessageProperties.PERSISTENT_TEXT_PLAIN,("Hello World ").getBytes());
//4、关闭信道,断开连接channel.close();connection.close();}
}
package com.hl.rabbitmq01.web;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("/p")
public class ProducerController {@Autowiredprivate AmqpTemplate amqpTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;
@RequestMapping("/send")public void send(@RequestParam(defaultValue = "user") String key,@RequestParam(defaultValue = "hello") String msg) throws IOException {//amqpTemplate.convertAndSend("topicExchange", key, msg);
// rabbitTemplate.convertAndSend("topicExchange",key,msg);Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false); //false 非事务模式运行 无需手动提交channel.basicPublish("topicExchange", key,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}
}
/*
创建交换机*/
@Bean
public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange("topicExchange").durable(true) //是否支持持久化机制.build();
}
/*
创建队列*/
@Bean
public Queue queue(){return QueueBuilder.durable("topicQueue01").build();
}
发送方的消息确认机制
1、事务机制
消耗资源
RabbitMQ中与事务有关的主要有三个方法:
-
txSelect() 开始事务
-
txCommit() 提交事务
-
txRollback() 回滚事务
txSelect主要用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。
当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。
示例
@RestController
public class RabbitMQController {
@Autowiredprivate RabbitTemplate rabbitTemplate;
@RequestMapping("/send")public String sendMessage(String message){rabbitTemplate.setChannelTransacted(true); //开启事务操作rabbitTemplate.execute(channel -> {try {channel.txSelect();//开启事务
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
int i = 5/0;
channel.txCommit();//没有问题提交事务}catch (Exception e){e.printStackTrace();channel.txRollback();//有问题回滚事务}
return null;});
return "success";}
}
消费者没有任何变化。
通过测试会发现,发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常,就会捕获异常通过txRollback方法进行回滚事务了,则消息不会发送,消费者就获取不到消息。
2、confirm确认机制
推荐
同步通知
channel.confirmSelect(); //开始confirm操作
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
if (channel.waitForConfirms()){System.out.println("发送成功");
}else{//进行消息重发System.out.println("消息发送失败,进行消息重发");
}
异步通知
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {//消息正确到达broker,就会发送一条ack消息@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println("发送消息成功");}
//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("发送消息失败,重新发送消息");}
});
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
十二、死信交换机和死信队列
在实际开发项目时,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中进行处理。
死信队列:RabbitMQ中并不是直接声明一个公共的死信队列,然后死信消息就会跑到死信队列中。而是为每个需要使用死信的消息队列配置一个死信交换机,当消息成为死信后,可以被重新发送到死信交换机,然后再发送给使用死信的消息队列。
死信交换机:英文缩写:DLX 。Dead Letter Exchange(死信交换机),死信交换机其实就是普通的交换机,通过给队列设置参数: x-dead-letter-exchange 和x-dead-letter-routing-key,来指向死信交换机
RabbitMQ规定消息符合以下某种情况时,将会成为死信
-
队列消息长度到达限制(队列消息个数限制);
-
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
-
原队列存在消息过期设置,消息到达超时时间未被消费;
死信消息会被RabbitMQ特殊处理,如果配置了死信队列,则消息会被丢到死信队列中,如果没有配置死信队列,则消息会被丢弃。
Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange","deadExchange");//当前队列和死信交换机绑定map.put("x-dead-letter-routing-key","user.#");//当前队列和死信交换机绑定的路由规则
// map.put("x-max-length",2);//队列长度map.put("x-message-ttl",10000);//队列消息过期时间,时间ms
// return QueueBuilder.durable("topicQueue01").build();return QueueBuilder.durable("topicQueue").withArguments(map).build();
十三、延迟队列简介
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
RabbitMQ中没有延迟队列,但是可以用ttl+死信队列方式和延迟插件两种方式来实现
ttl+死信队列
ttl+死信队列代码在讲死信队列时已经实现,这个不再阐述。
延迟插件
人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
十四、RabbitMQ消息重复消费
RabbitMQ消息重复消费问题_rabbitmq重复消费的问题解决-CSDN博客
业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题。
比如,用户到银行取钱后会收到扣款通知短信,如果用户收到多条扣款信息通知则会有困惑。
解决方法一:send if not exist 首先将 RabbitMQ 的消息自动确认机制改为手动确认,然后每当有一条消息消费成功了,就把该消息的唯一ID记录在Redis 上,然后每次发送消息时,都先去 Redis 上查看是否有该消息的 ID,如果有,表示该消息已经消费过了,不再处理,否则再去处理。
2.1 利用数据库唯一约束实现幂等
解决方法二:insert if not exist 可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据
解决方法三:sql的乐观锁 比如给用户发送短信,变成如果该用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,每修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。
十五、RabbitMQ消息积压
RabbitMq——消息积压分析和解决思路_rabbitmq消息积压-CSDN博客
消息积压产生的原因 正常而言,一般的消息从消息产生到消息消费需要经过以下几种阶段。
以Direct模式为例:
消息由生产者产生,比如新订单的创建等,经过交换机,将消息发送至指定的队列中,然后提供给对应的消费者进行消费。
在这个链路中,存在消息积压的原因大致分为以下几种:
1、消费者宕机,导致消息队列中的消息无法及时被消费,出现积压。 2、消费者没有宕机,但因为本身逻辑处理数据耗时,导致消费者消费能力不足,引起队列消息积压。 3、消息生产方单位时间内产生消息过多,比如“双11大促活动”,导致消费者处理不过来。 消息积压问题解决 针对上面消息积压问题的出现,大致进行了分析,那么根据分析则能制定相关的应对方法。如下所示:
1、大促活动等,导致生产者流量过大,引起积压问题。
提前增加服务器的数量,增加消费者数目,提升消费者针对指定队列消息处理的效率。
2、上线更多的消费者,处理消息队列中的数据。(和1中的大致类似)
3、如果成本有限,则可以专门针对这个队列,编写一个另类的消费者。
当前另类消费者,不进行复杂逻辑处理,只将消息从队列中取出,存放至数据库中,然后basicAck反馈给消息队列。
十六、消息入库(消息补偿)
如果RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,这样也不太好进行处理。所以为了避免RabbitMQ持久化失败而导致数据丢失,我们自己也要做一些消息补偿机制,以应对一些极端情况。
在使用消息队列(Message Queue)时,消息的补偿机制是一种处理消息处理失败或异常情况的方法。当消息消费者无法成功处理消息时,补偿机制允许系统将消息重新发送或执行其他操作,以确保消息的可靠传递和处理。
补偿机制通常涉及以下几个方面:
-
重试机制:当消息处理失败时,补偿机制会尝试重新发送消息给消费者,以便重新处理。重试间隔和重试次数可以根据具体情况进行配置,以避免重复投递导致的消息处理失败。
-
延时队列:补偿机制还可以使用延时队列来处理无法立即处理的消息。当某个消息处理失败时,可以将该消息放入到延时队列中,在一定的延时之后再次尝试发送给消费者进行处理。
-
死信队列:当消息无法被成功处理时,可以将这些无法处理的消息发送到死信队列(Dead Letter Queue)。死信队列通常用于存储无法被消费者处理的消息,以便后续进行排查和处理。
-
可视化监控和报警:补偿机制还可以包括对消息队列的监控和报警功能,以便及时发现和处理异常情况。通过可视化监控工具可以实时查看消息队列的状态和处理情况,及时发现问题并采取相应的补救措施。
补偿机制的设计和实现密切依赖于具体的消息中间件和使用场景,不同的消息队列系统可能提供不同的补偿机制。因此,在选择和使用消息队列时,需要根据自身的需求和系统特点来选择适合的消息补偿机制。