当前位置: 首页 > backend >正文

Spring Boot with RabbitMQ:四大核心模式指南

在现代分布式系统和微服务架构中,消息队列(Message Queue)是不可或缺的组件。它能实现服务间的异步通信、应用解耦和流量削峰。RabbitMQ 作为最受欢迎的消息队列中间件之一,以其稳定性、可靠性和灵活的路由模式而著称。

本文将带领你使用 Spring Boot (spring-boot-starter-amqp),通过清晰的代码实例和详尽的解释,深入理解并通过代码demo实践 RabbitMQ 的四种核心工作模式:

  • Work Queue (工作队列模式)
  • Direct Exchange (直连交换机模式)
  • Fanout Exchange (扇出/广播交换机模式)
  • Topic Exchange (主题交换机模式)
    java原生的操作方式请看这边

核心概念速览

在深入代码之前,我们先快速了解几个 RabbitMQ 的核心概念:

  • Producer (生产者):发送消息的一方。
  • Consumer (消费者):接收并处理消息的一方。
  • Queue (队列):存储消息的缓冲区,位于内存或磁盘。
  • Exchange (交换机):接收来自生产者的消息,并根据特定规则(类型和路由键)将消息路由到一个或多个队列。
  • Binding (绑定):建立 Exchange 和 Queue 之间的关联关系。
  • Routing Key (路由键):生产者在发送消息给 Exchange 时指定的“地址”或“标签”,Exchange 根据它来决定消息的去向。

模式一:Work Queue (工作队列)

这是最简单的模式,用于将一个耗时任务分发给多个消费者并行处理,从而提高整体处理效率。

特点:一个生产者将消息发送到一个特定队列,多个消费者共同监听这同一个队列。消息会以轮询(Round-Robin)的方式被分发给消费者,即一条消息只会被一个消费者处理。

1. 生产者配置

在 Work 模式下,我们只需要定义一个队列即可。消息将直接发送到这个队列。

// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {// 为了方便管理,我们将队列、交换机、路由键的名称统一定义在常量类中public static final String SPRING_WORK_QUEUE = "spring.work.queue";@Beanpublic Queue workQueue() {// 创建一个持久化的队列return QueueBuilder.durable(SPRING_WORK_QUEUE).build();}
}

2. 生产者接口

我们创建一个接口,循环发送 10 条消息到工作队列。

// ProducerController.java
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/work")public String sendWorkMessages() {for (int i = 1; i <= 10; i++) {String message = "Hello, Spring Work Queue message " + i;// 第一个参数是交换机名,这里为空字符串表示使用默认交换机// 第二个参数是路由键,对于工作队列模式,通常就是队列名rabbitTemplate.convertAndSend("", RabbitMQConfig.SPRING_WORK_QUEUE, message);}return "10 work messages sent successfully.";}
}

3. 消费者代码

我们创建两个消费者,它们监听同一个 SPRING_WORK_QUEUE 队列,以模拟任务竞争。

// WorkListener.java
@Component
public class WorkListener {@RabbitListener(queues = RabbitMQConfig.SPRING_WORK_QUEUE)public void listen1(String message) throws InterruptedException {System.out.println("[Work Consumer 1] received: " + message);// 模拟耗时任务Thread.sleep(100); }@RabbitListener(queues = RabbitMQConfig.SPRING_WORK_QUEUE)public void listen2(String message) throws InterruptedException {System.out.println("[Work Consumer 2] received: " + message);Thread.sleep(150);}
}

4. 测试与结果

访问 http://localhost:8080/producer/work。你会看到控制台交替打印输出来自两个消费者的日志,这表明 10 条消息被两个消费者“瓜分”了。

[Work Consumer 1] received: Hello, Spring Work Queue message 1
[Work Consumer 2] received: Hello, Spring Work Queue message 2
[Work Consumer 1] received: Hello, Spring Work Queue message 3
[Work Consumer 2] received: Hello, Spring Work Queue message 4
[Work Consumer 1] received: Hello, Spring Work Queue message 5
... (交替输出)

注意:默认情况下,分发策略是公平轮询。可以配置 prefetch 等参数实现更复杂的负载均衡。


模式二:Direct Exchange (直连交换机)

Direct Exchange 会将消息路由到 Routing Key 与 Binding Key 完全匹配的队列。这是一种精确的、点对点的路由方式。

特点:一对一路由。你可以将多个队列用不同的 Binding Key 绑定到同一个 Direct Exchange 上,实现消息的精确投递。

Direct Exchange 模型图

1. 生产者配置

我们定义一个 Direct Exchange,两个队列,以及三个绑定关系:

  • queue1 绑定 orange
  • queue2 绑定 black
  • queue2 也绑定 green
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {// ... 其他常量public static final String SPRING_DIRECT_EXCHANGE = "spring.direct.exchange";public static final String SPRING_DIRECT_QUEUE_1 = "spring.direct.queue1";public static final String SPRING_DIRECT_QUEUE_2 = "spring.direct.queue2";// 声明 Direct Exchange@Beanpublic DirectExchange directExchange() {return new DirectExchange(SPRING_DIRECT_EXCHANGE);}// 声明 Queue 1@Beanpublic Queue directQueue1() {return new Queue(SPRING_DIRECT_QUEUE_1);}// 声明 Queue 2@Beanpublic Queue directQueue2() {return new Queue(SPRING_DIRECT_QUEUE_2);}// 绑定关系1: queue1 -> exchange, with routingKey "orange"@Beanpublic Binding directBinding1(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("orange");}// 绑定关系2: queue2 -> exchange, with routingKey "black"@Beanpublic Binding directBinding2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("black");}// 绑定关系3: queue2 -> exchange, with routingKey "green"@Beanpublic Binding directBinding3(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("green");}
}

2. 生产者接口

// ProducerController.java
// ...
@GetMapping("/direct")
public String sendDirectMessage(String routingKey) {String message = "Hello, Spring Direct Exchange with routingKey: " + routingKey;rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_DIRECT_EXCHANGE, routingKey, message);return "Direct message sent with routingKey: " + routingKey;
}

3. 消费者代码

// DirectListener.java
@Component
public class DirectListener {@RabbitListener(queues = RabbitMQConfig.SPRING_DIRECT_QUEUE_1)public void listenQueue1(String message) {System.out.println("[" + RabbitMQConfig.SPRING_DIRECT_QUEUE_1 + "] received: " + message);}@RabbitListener(queues = RabbitMQConfig.SPRING_DIRECT_QUEUE_2)public void listenQueue2(String message) {System.out.println("[" + RabbitMQConfig.SPRING_DIRECT_QUEUE_2 + "] received: " + message);}
}

4. 测试与结果

  • 发送 orange: http://localhost:8080/producer/direct?routingKey=orange

    • 输出: [spring.direct.queue1] received: Hello, Spring Direct Exchange with routingKey: orange
    • 分析: orange 精确匹配 directBinding1,消息进入 queue1
  • 发送 black: http://localhost:8080/producer/direct?routingKey=black

    • 输出: [spring.direct.queue2] received: Hello, Spring Direct Exchange with routingKey: black
    • 分析: black 精确匹配 directBinding2,消息进入 queue2
  • 发送 green: http://localhost:8080/producer/direct?routingKey=green

    • 输出: [spring.direct.queue2] received: Hello, Spring Direct Exchange with routingKey: green
    • 分析: green 精确匹配 directBinding3,消息同样进入 queue2
  • 发送 blue: http://localhost:8080/producer/direct?routingKey=blue

    • 输出: (无任何输出)
    • 分析: blue 路由键在所有绑定关系中都找不到匹配项,消息被交换机丢弃。

模式三:Fanout Exchange (扇出/广播)

Fanout Exchange 是最简单的交换机类型。它会忽略 Routing Key,将收到的所有消息广播给所有绑定到该交换机上的队列。

特点:一对多广播。适用于需要将同一消息通知给所有订阅者的场景,如系统通知、配置更新等。

1. 生产者配置

我们定义一个 Fanout Exchange 和两个队列,并将这两个队列都绑定到交换机上。
在这里插入图片描述

// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {// ... 其他常量public static final String SPRING_FANOUT_EXCHANGE = "spring.fanout.exchange";public static final String SPRING_FANOUT_QUEUE_1 = "spring.fanout.queue1";public static final String SPRING_FANOUT_QUEUE_2 = "spring.fanout.queue2";@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(SPRING_FANOUT_EXCHANGE);}@Beanpublic Queue fanoutQueue1() {return new Queue(SPRING_FANOUT_QUEUE_1);}@Beanpublic Queue fanoutQueue2() {return new Queue(SPRING_FANOUT_QUEUE_2);}// 绑定 Queue1 到 Fanout Exchange@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 绑定 Queue2 到 Fanout Exchange@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

注意:Fanout 类型的绑定不需要 .with(routingKey)

2. 生产者接口

// ProducerController.java
// ...
@GetMapping("/fanout")
public String sendFanoutMessage() {String message = "Hello, this is a broadcast message!";// Fanout Exchange 忽略路由键,所以第二个参数可以为空字符串rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_FANOUT_EXCHANGE, "", message);return "Broadcast message sent successfully.";
}

3. 消费者代码

// FanoutListener.java
@Component
public class FanoutListener {@RabbitListener(queues = RabbitMQConfig.SPRING_FANOUT_QUEUE_1)public void listenQueue1(String message) {System.out.println("[" + RabbitMQConfig.SPRING_FANOUT_QUEUE_1 + "] received: " + message);}@RabbitListener(queues = RabbitMQConfig.SPRING_FANOUT_QUEUE_2)public void listenQueue2(String message) {System.out.println("[" + RabbitMQConfig.SPRING_FANOUT_QUEUE_2 + "] received: " + message);}
}

4. 测试与结果

访问 http://localhost:8080/producer/fanout

  • 输出:

    [spring.fanout.queue1] received: Hello, this is a broadcast message!
    [spring.fanout.queue2] received: Hello, this is a broadcast message!
    
  • 分析: 一条消息被成功广播到了所有绑定的队列,两个消费者都收到了同样的消息。


模式四:Topic Exchange (主题交换机)

Topic Exchange 是最灵活的交换机。它通过模式匹配来路由消息,Routing Key 是一个由点(.)分隔的单词列表,而 Binding Key 可以使用通配符。

  • * (星号): 匹配一个单词。
  • # (井号): 匹配零个或多个单词。

特点:灵活的、多对多的路由。非常适合用于实现基于内容的多维度订阅/发布系统。

![[Pasted image 20250705160511.png]]

1. 生产者配置

我们定义一个 Topic Exchange,两个队列,以及三个绑定关系,来演示通配符的用法:

  • queue1 绑定 *.orange.* (匹配中间是 orange 的三个单词的 key)
  • queue2 绑定 *.*.rabbit (匹配结尾是 rabbit 的三个单词的 key)
  • queue2 也绑定 lazy.# (匹配以 lazy. 开头的所有 key)
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {// ... 其他常量public static final String SPRING_TOPIC_EXCHANGE = "spring.topic.exchange";public static final String SPRING_TOPIC_QUEUE_1 = "spring.topic.queue1";public static final String SPRING_TOPIC_QUEUE_2 = "spring.topic.queue2";@Beanpublic TopicExchange topicExchange() {return new TopicExchange(SPRING_TOPIC_EXCHANGE);}@Beanpublic Queue topicQueue1() {return new Queue(SPRING_TOPIC_QUEUE_1);}@Beanpublic Queue topicQueue2() {return new Queue(SPRING_TOPIC_QUEUE_2);}// 绑定1: *.orange.*@Beanpublic Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.orange.*");}// 绑定2: *.*.rabbit@Beanpublic Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.rabbit");}// 绑定3: lazy.#@Beanpublic Binding topicBinding3(Queue topicQueue2, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("lazy.#");}
}

2. 生产者接口

// ProducerController.java
// ...
@GetMapping("/topic")
public String sendTopicMessage(String routingKey) {String message = "Hello, Spring Topic Exchange with routingKey: " + routingKey;rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_TOPIC_EXCHANGE, routingKey, message);return "Topic message sent with routingKey: " + routingKey;
}

3. 消费者代码

// TopicListener.java
@Component
public class TopicListener {@RabbitListener(queues = RabbitMQConfig.SPRING_TOPIC_QUEUE_1)public void listenQueue1(String message) {System.out.println("[" + RabbitMQConfig.SPRING_TOPIC_QUEUE_1 + "] received: " + message);}@RabbitListener(queues = RabbitMQConfig.SPRING_TOPIC_QUEUE_2)public void listenQueue2(String message) {System.out.println("[" + RabbitMQConfig.SPRING_TOPIC_QUEUE_2 + "] received: " + message);}
}

4. 测试与结果

  • 发送 quick.orange.rabbit: http://localhost:8080/producer/topic?routingKey=quick.orange.rabbit

    • 输出:
[spring.topic.queue1] received: Hello, Spring Topic Exchange with routingKey: quick.orange.rabbit
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: quick.orange.rabbit
  • 分析: quick.orange.rabbit 同时匹配 *.orange.* (queue1) 和 *.*.rabbit (queue2),所以两个队列都收到了消息。

  • 发送 lazy.orange.elephant: http://localhost:8080/producer/topic?routingKey=lazy.orange.elephant

    • 输出:
[spring.topic.queue1] received: Hello, Spring Topic Exchange with routingKey: lazy.orange.elephant
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: lazy.orange.elephant
  • 分析: lazy.orange.elephant 同时匹配 *.orange.* (queue1) 和 lazy.# (queue2)。

  • 发送 quick.brown.fox: http://localhost:8080/producer/topic?routingKey=quick.brown.fox

    • 输出: (无任何输出)
    • 分析: 该 routing key 不匹配任何绑定规则,消息被丢弃。
  • 发送 lazy.fox: http://localhost:8080/producer/topic?routingKey=lazy.fox

    • 输出:
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: lazy.fox
  • 分析: 该 routing key 仅匹配 lazy.# (queue2)。

总结:如何选择合适的模式?

模式交换机类型Routing Key核心特点适用场景
Work Queue默认 (空字符串)必须是队列名任务分发,竞争消费耗时任务处理、资源密集型操作,如视频转码、日志分析。
DirectDirect精确匹配点对点精确路由需要将消息准确发送到特定处理者的场景,如按地区、按类型分发任务。
FanoutFanout忽略广播向所有订阅者发送相同消息,如系统通知、配置更新、实时聊天室。
TopicTopic通配符模式匹配灵活的、多维度的订阅/发布基于内容的多条件订阅,如新闻系统(*.sports.basketball)、日志系统(error.critical.#)。
http://www.xdnf.cn/news/16570.html

相关文章:

  • python-网络编程
  • PCIE4.0/5.0/DDR4/DDR5使用以及布局布线规则-集萃
  • RHCE综合项目:分布式LNMP私有博客服务部署
  • 【Lua】题目小练4
  • 【保姆级 - 大模型应用开发】DeepSeek R1 本地部署全攻略:Ollama + vLLM + PyTorch 多选方案
  • 【图像处理基石】如何对遥感图像进行实例分割?
  • 【LeetCode 热题 100】34. 在排序数组中查找元素的第一个和最后一个位置——二分查找
  • 宇树 G1 部署(九)——遥操作控制脚本 teleop_hand_and_arm.py 分析与测试部署
  • Go 客户端玩转 ES|QL API 直连与 Mapping Helpers 实战详解
  • 11、read_object_model_3d 读取点云
  • 预装Windows 11系统的新电脑怎么跳过联网验机
  • 预过滤环境光贴图制作教程:第四阶段 - Lambert 无权重预过滤(Stage 3)
  • 三、Linux用户与权限管理详解
  • Redis内存使用耗尽情况分析
  • 编辑距离:理论基础、算法演进与跨领域应用
  • Windows使用Powershell自动安装SqlServer2025服务器与SSMS管理工具
  • css3之三维变换详说
  • Qt 多线程界面更新策略
  • 如何在Windows操作系统上通过conda 安装 MDAnalysis
  • 激光雷达/相机一体机 时间同步和空间标定(1)
  • 自然语言处理NLP(3)
  • leetcode 74. 搜索二维矩阵
  • 柔性生产前端动态适配:小批量换型场景下的参数配置智能切换技术
  • 汇总10个高质量免费AI生成论文网站,支持GPT4.0和DeepSeek-R1
  • cpolar 内网穿透 ubuntu 使用石
  • 2025年06月 C/C++(二级)真题解析#中国电子学会#全国青少年软件编程等级考试
  • go install报错: should be v0 or v1, not v2问题解决
  • 【自制组件库】从零到一实现属于自己的 Vue3 组件库!!!
  • P2910 [USACO08OPEN] Clear And Present Danger S
  • 四、Linux核心工具:Vim, 文件链接与SSH