当前位置: 首页 > java >正文

SpringBoot3.x入门到精通系列:3.2 整合 RabbitMQ 详解

🎯 RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。它是使用Erlang语言编写的,并且基于AMQP协议。

核心概念

  • Producer: 消息生产者,发送消息的应用
  • Consumer: 消息消费者,接收消息的应用
  • Queue: 消息队列,存储消息的缓冲区
  • Exchange: 交换机,负责接收消息并路由到队列
  • Routing Key: 路由键,Exchange根据它来决定消息路由到哪个队列
  • Binding: 绑定,Exchange和Queue之间的连接关系

交换机类型

  • Direct: 直连交换机,完全匹配路由键
  • Topic: 主题交换机,支持通配符匹配
  • Fanout: 扇出交换机,广播到所有绑定的队列
  • Headers: 头交换机,根据消息头属性路由

🚀 快速开始

1. 添加依赖

<dependencies><!-- SpringBoot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SpringBoot RabbitMQ Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. RabbitMQ配置

spring:# RabbitMQ配置rabbitmq:# RabbitMQ服务器地址host: localhost# RabbitMQ服务器端口port: 5672# 用户名username: guest# 密码password: guest# 虚拟主机virtual-host: /# 连接配置connection-timeout: 15000# 生产者配置publisher-confirm-type: correlated # 确认模式publisher-returns: true # 开启return机制# 消费者配置listener:simple:# 手动确认模式acknowledge-mode: manual# 并发消费者数量concurrency: 1# 最大并发消费者数量max-concurrency: 10# 每次从队列获取的消息数量prefetch: 1# 重试机制retry:enabled: trueinitial-interval: 1000max-attempts: 3max-interval: 10000multiplier: 1.0# 日志配置
logging:level:org.springframework.amqp: DEBUG

🔧 RabbitMQ配置类

1. 基础配置

package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {// 队列名称常量public static final String DIRECT_QUEUE = "direct.queue";public static final String TOPIC_QUEUE_1 = "topic.queue.1";public static final String TOPIC_QUEUE_2 = "topic.queue.2";public static final String FANOUT_QUEUE_1 = "fanout.queue.1";public static final String FANOUT_QUEUE_2 = "fanout.queue.2";public static final String DELAY_QUEUE = "delay.queue";public static final String DLX_QUEUE = "dlx.queue";// 交换机名称常量public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String DELAY_EXCHANGE = "delay.exchange";public static final String DLX_EXCHANGE = "dlx.exchange";/*** 消息转换器 - 使用JSON格式*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}/*** RabbitTemplate配置*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());// 设置确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功: " + correlationData);} else {System.out.println("消息发送失败: " + cause);}});// 设置返回回调rabbitTemplate.setReturnsCallback(returned -> {System.out.println("消息返回: " + returned.getMessage());System.out.println("回复码: " + returned.getReplyCode());System.out.println("回复文本: " + returned.getReplyText());System.out.println("交换机: " + returned.getExchange());System.out.println("路由键: " + returned.getRoutingKey());});return rabbitTemplate;}/*** 监听器容器工厂配置*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter());return factory;}// ========================= Direct Exchange =========================@Beanpublic Queue directQueue() {return QueueBuilder.durable(DIRECT_QUEUE).build();}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE);}@Beanpublic Binding directBinding() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key");}// ========================= Topic Exchange =========================@Beanpublic Queue topicQueue1() {return QueueBuilder.durable(TOPIC_QUEUE_1).build();}@Beanpublic Queue topicQueue2() {return QueueBuilder.durable(TOPIC_QUEUE_2).build();}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);}@Beanpublic Binding topicBinding1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.*.message");}@Beanpublic Binding topicBinding2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");}// ========================= Fanout Exchange =========================@Beanpublic Queue fanoutQueue1() {return QueueBuilder.durable(FANOUT_QUEUE_1).build();}@Beanpublic Queue fanoutQueue2() {return QueueBuilder.durable(FANOUT_QUEUE_2).build();}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}@Beanpublic Binding fanoutBinding1() {return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}@Beanpublic Binding fanoutBinding2() {return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}// ========================= 延迟队列 =========================@Beanpublic Queue delayQueue() {return QueueBuilder.durable(DELAY_QUEUE).withArgument("x-message-ttl", 60000) // 消息TTL 60秒.withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 死信交换机.withArgument("x-dead-letter-routing-key", "dlx.routing.key") // 死信路由键.build();}@Beanpublic DirectExchange delayExchange() {return new DirectExchange(DELAY_EXCHANGE);}@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.routing.key");}// ========================= 死信队列 =========================@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(DLX_QUEUE).build();}@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE);}@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");}
}

📊 消息实体类

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import java.io.Serializable;
import java.time.LocalDateTime;public class MessageDto implements Serializable {private static final long serialVersionUID = 1L;private String id;private String content;private String type;private String sender;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// 构造函数public MessageDto() {this.timestamp = LocalDateTime.now();}public MessageDto(String id, String content, String type, String sender) {this();this.id = id;this.content = content;this.type = type;this.sender = sender;}// Getter和Setter方法public String getId() { return id; }public void setId(String id) { this.id = id; }public String getContent() { return content; }public void setContent(String content) { this.content = content; }public String getType() { return type; }public void setType(String type) { this.type = type; }public String getSender() { return sender; }public void setSender(String sender) { this.sender = sender; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "MessageDto{" +"id='" + id + '\'' +", content='" + content + '\'' +", type='" + type + '\'' +", sender='" + sender + '\'' +", timestamp=" + timestamp +'}';}
}

📤 消息生产者

package com.example.demo.service;import com.example.demo.config.RabbitConfig;
import com.example.demo.dto.MessageDto;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送Direct消息*/public void sendDirectMessage(String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"DIRECT","Producer");rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"direct.routing.key",message);System.out.println("发送Direct消息: " + message);}/*** 发送Topic消息*/public void sendTopicMessage(String routingKey, String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"TOPIC","Producer");rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,routingKey,message);System.out.println("发送Topic消息 [" + routingKey + "]: " + message);}/*** 发送Fanout消息*/public void sendFanoutMessage(String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"FANOUT","Producer");rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"", // Fanout交换机忽略路由键message);System.out.println("发送Fanout消息: " + message);}/*** 发送延迟消息*/public void sendDelayMessage(String content, int delaySeconds) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"DELAY","Producer");// 设置消息属性rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE,"delay.routing.key",message,msg -> {// 设置消息过期时间msg.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000));return msg;});System.out.println("发送延迟消息 [" + delaySeconds + "s]: " + message);}/*** 发送带优先级的消息*/public void sendPriorityMessage(String content, int priority) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"PRIORITY","Producer");rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"direct.routing.key",message,msg -> {msg.getMessageProperties().setPriority(priority);return msg;});System.out.println("发送优先级消息 [" + priority + "]: " + message);}
}

📥 消息消费者

package com.example.demo.service;import com.example.demo.config.RabbitConfig;
import com.example.demo.dto.MessageDto;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class MessageConsumer {/*** 消费Direct消息*/@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE)public void consumeDirectMessage(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("接收到Direct消息: " + message);// 模拟业务处理Thread.sleep(1000);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Direct消息处理完成");} catch (Exception e) {System.err.println("处理Direct消息失败: " + e.getMessage());// 拒绝消息并重新入队channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Topic消息 - 队列1*/@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_1)public void consumeTopicMessage1(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("队列1接收到Topic消息: " + message);// 模拟业务处理Thread.sleep(500);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("队列1 Topic消息处理完成");} catch (Exception e) {System.err.println("队列1处理Topic消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Topic消息 - 队列2*/@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_2)public void consumeTopicMessage2(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("队列2接收到Topic消息: " + message);// 模拟业务处理Thread.sleep(500);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("队列2 Topic消息处理完成");} catch (Exception e) {System.err.println("队列2处理Topic消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Fanout消息 - 队列1*/@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_1)public void consumeFanoutMessage1(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("Fanout队列1接收到消息: " + message);// 模拟业务处理Thread.sleep(300);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Fanout队列1消息处理完成");} catch (Exception e) {System.err.println("Fanout队列1处理消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Fanout消息 - 队列2*/@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_2)public void consumeFanoutMessage2(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("Fanout队列2接收到消息: " + message);// 模拟业务处理Thread.sleep(300);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Fanout队列2消息处理完成");} catch (Exception e) {System.err.println("Fanout队列2处理消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费死信消息*/@RabbitListener(queues = RabbitConfig.DLX_QUEUE)public void consumeDlxMessage(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("接收到死信消息: " + message);// 处理死信消息的业务逻辑// 比如记录日志、发送告警、人工处理等// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("死信消息处理完成");} catch (Exception e) {System.err.println("处理死信消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);}}
}

🎮 Controller层

package com.example.demo.controller;import com.example.demo.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import java.util.HashMap;
import java.util.Map;@RestController
@RequestMapping("/api/rabbitmq")
@CrossOrigin(origins = "*")
public class RabbitMQController {@Autowiredprivate MessageProducer messageProducer;/*** 发送Direct消息*/@PostMapping("/direct")public ResponseEntity<Map<String, String>> sendDirectMessage(@RequestBody Map<String, String> request) {String content = request.get("content");messageProducer.sendDirectMessage(content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Direct消息发送成功");return ResponseEntity.ok(response);}/*** 发送Topic消息*/@PostMapping("/topic")public ResponseEntity<Map<String, String>> sendTopicMessage(@RequestBody Map<String, String> request) {String content = request.get("content");String routingKey = request.getOrDefault("routingKey", "topic.test.message");messageProducer.sendTopicMessage(routingKey, content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Topic消息发送成功");response.put("routingKey", routingKey);return ResponseEntity.ok(response);}/*** 发送Fanout消息*/@PostMapping("/fanout")public ResponseEntity<Map<String, String>> sendFanoutMessage(@RequestBody Map<String, String> request) {String content = request.get("content");messageProducer.sendFanoutMessage(content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Fanout消息发送成功");return ResponseEntity.ok(response);}/*** 发送延迟消息*/@PostMapping("/delay")public ResponseEntity<Map<String, String>> sendDelayMessage(@RequestBody Map<String, Object> request) {String content = (String) request.get("content");Integer delaySeconds = (Integer) request.getOrDefault("delaySeconds", 10);messageProducer.sendDelayMessage(content, delaySeconds);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "延迟消息发送成功");response.put("delaySeconds", delaySeconds.toString());return ResponseEntity.ok(response);}/*** 发送优先级消息*/@PostMapping("/priority")public ResponseEntity<Map<String, String>> sendPriorityMessage(@RequestBody Map<String, Object> request) {String content = (String) request.get("content");Integer priority = (Integer) request.getOrDefault("priority", 0);messageProducer.sendPriorityMessage(content, priority);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "优先级消息发送成功");response.put("priority", priority.toString());return ResponseEntity.ok(response);}
}

📊 最佳实践

1. 消息可靠性

  • 开启生产者确认机制
  • 使用持久化队列和消息
  • 实现消费者手动确认
  • 配置死信队列处理失败消息

2. 性能优化

  • 合理设置预取数量
  • 使用批量操作
  • 优化序列化方式
  • 监控队列长度

3. 高可用性

  • 配置集群模式
  • 使用镜像队列
  • 实现故障转移
  • 监控系统状态

本文关键词: RabbitMQ, 消息队列, AMQP, 异步通信, 微服务, 解耦

http://www.xdnf.cn/news/17094.html

相关文章:

  • Ubuntu系统VScode实现opencv(c++)图像一维直方图
  • Ubuntu系统VScode实现opencv(c++)图像二维直方图
  • 补:《每日AI-人工智能-编程日报》--2025年7月28日
  • 软件设计 VS 软件需求:了解成功软件开发外包的关键差异
  • git操作命令和golang编译脚本
  • 补:《每日AI-人工智能-编程日报》--2025年7月27日
  • 移动端 WebView 视频无法播放怎么办 媒体控件错误排查与修复指南
  • 高精度实战:YOLOv11交叉口目标行为全透视——轨迹追踪×热力图×滞留分析(附完整代码)
  • Linux-Day01.初识Linux和基础指令
  • 基于FAISS和Ollama的法律智能对话系统开发实录-【大模型应用班-第5课 RAG技术与应用学习笔记】
  • Ubuntu 下编译 SQLCipher 4.8.0
  • CMake进阶: 使用FetchContent方法基于gTest的C++单元测试
  • sqli-labs靶场less29~less35
  • Ethereum:拥抱开源,OpenZeppelin 未来的两大基石 Relayers 与 Monitor
  • 互联网医院整体项目套表整理过程文档全流程分析
  • Linux 文件与目录属性管理总结
  • IPIDEA:全球领先的企业级代理 IP 服务商
  • Go语言 逃 逸 分 析
  • JVM(Java虚拟机)运行时数据区
  • 【测试】⾃动化测试概念篇
  • 服务器突然之间特别卡,什么原因?
  • 晨控CK-GW08S与汇川AC系列PLC配置Ethernet/IP通讯连接手册
  • 开疆智能ModbusTCP转Profient网关连接ER机器人配置案例
  • 第二十三天(APP应用产权渠道服务资产通讯抓包静态提取动态调试测试范围)
  • 红队信息收集工具oneforall子域名搜集爆破工具安装使用教程详细过程
  • Python-初学openCV——图像预处理(七)——模板匹配、霍夫变换
  • Nestjs框架: Node.js 多环境配置策略与 dotenv 与 config 库详解
  • Node.js高并发接口下的事件循环卡顿问题与异步解耦优化方案
  • open-webui pipelines报404, ‘Filter pipeline.exporter not found‘
  • MySQL 约束知识体系:八大约束类型详细讲解