当前位置: 首页 > news >正文

Rabbitmq的五种消息类型介绍,以及集成springboot的使用

交换机类型

Fanout Exchange

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列

Direct Exchange

直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。

搭建基本环境

1、pom.xml引入的java包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>${springboot-version}</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.57</version></dependency></dependencies>

2、yaml配置文件

# 8004是zookeeper服务器的支付服务提供者端口号
server:port: 8004
spring:application:name: cloud-mqrabbitmq:addresses: 192.168.96.133port: 5672username: guestpassword: guestvirtual-host: /#消费者配置listener:#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效simple:#开启ack 手动确认消息是否被消费成功acknowledge-mode: manualretry:enabled: true# 消费失败后,继续消费,然后最多消费5次就不再消费。max-attempts: 5# 消费失败后 ,重试初始间隔时间 2秒initial-interval: 2000# 重试最大间隔时间5秒max-interval: 5000# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间multiplier: 2direct:#开启ack 手动确认消息是否被消费成功acknowledge-mode: manual#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效retry:enabled: true# 消费失败后,继续消费,然后最多消费3次就不再消费。max-attempts: 3# 消费失败后 ,重试初始间隔时间 3秒initial-interval: 3000# 重试最大间隔时间max-interval: 7000# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间multiplier: 2# 生产者配置template:retry:# 开启消息发送失败重试机制enabled: true# 生产者 true-开启消息抵达队列的确认publisher-returns: false#simple 配置用于设置 RabbitMQ 消息生产者的消息确认类型为“简单确认”。这意味着当消息被发送到 RabbitMQ 之后,只有在消息成功投递到队列中后,RabbitMQ 才会向生产者发送一个确认(ack)通知。如果消息未能成功投递,则不会收到确认。#该配置通常与 publisher-returns: true 一起使用以启用消息返回机制,但在此配置中 publisher-returns 被设置为 false,表示不启用消息返回功能publisher-confirm-type: simple

3、主启动类


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 10564*/
@SpringBootApplication
public class ApplicationRabbitmq {public static void main(String[] args) {SpringApplication.run(ApplicationRabbitmq.class, args);}
}

五种消息模式

基本模式(Basic Model)工作队列消息类型、,(扇形fanout , 路由direct , 广播主题topic)都属于发布订阅模型

1、RabbitMQ简单模式

也称为基本模式(Basic Model),是RabbitMQ的最简单的消息传递模式,仅涉及到一个生产者和一个消费者。在这个模式中,当我们启动一个程序作为生产者并向RabbitMQ发出消息时,我们希望它直接进入队列中,然后消费者会从队列中获取这个消息并进行处理。简单模式在RabbitMQ中是一个单队列单生产者单消费者的模式,主要适用于单纯的任务处理,消息的生产者和消费者的削峰填谷能力非常高。

1、定义消息队列Queue名称


package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 简单消息队列名称*/public static final String SIMPLE_MQ_NAME = "simpleQueue";
}

2、配置类Configuration


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 创建RabbitMQ的配置类* @author 10564*/
@Configuration
public class RabbitmqSimpleConfig {/*** 简单消息队列*/@Beanpublic Queue simpleQueue() {//名字(name):队列的名字,用来区分不同的队列。//是否持久化(durable):如果设置为 true,表示即使服务器重启了,这个队列依然存在。//是否独占(exclusive):如果设置为 true,表示只有创建它的连接才能使用这个队列。//是否自动删除(autoDelete):如果设置为 true,表示当不再有消费者使用这个队列时,服务器会自动删除它。return new Queue(MqConstant.SIMPLE_MQ_NAME,true,false,false);}
}

3、生产者Producer

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class SimpleProducer {private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderSimple(String userModel) {log.info("\n简单生产者发送消息:{}\n", JSONObject.toJSONString(userModel));rabbitTemplate.convertAndSend(MqConstant.SIMPLE_MQ_NAME,userModel);}
}

4、消费者Consumer

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class SimpleConsumer {private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);@RabbitListener(queues = MqConstant.SIMPLE_MQ_NAME)public void receiveHelloQueueMessage(String userModel, Channel channel) {log.info("\n简单消费者接收消息:{}\n", JSONObject.toJSONString(userModel));}
}

5、测试Test

package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.simple.SimpleProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate SimpleProducer simpleProducer;@GetMapping("/simple")public void simpleProducerTest(String message) {simpleProducer.senderSimple(message);}
}

6、测试结果

### simple
GET http://localhost:8004/mq/simple?message=simple111## 结果
2025-06-21 15:19:54.769  INFO 8236 --- [nio-8004-exec-2] o.x.s.messagetype.simple.SimpleProducer  : 
简单生产者发送消息:"simple111"2025-06-21 15:19:54.772  INFO 8236 --- [ntContainer#7-1] o.x.s.messagetype.simple.SimpleConsumer  : 
简单消费者接收消息:"simple111"

2、RabbitMQ工作模式

当消费者处理消息较为耗时的情况下,可能会导致生产消息的速度远大于消费速度,从而造成消息堆积、无法及时处理的问题。为了解决这一问题,可以采用工作队列模型,即让多个消费者绑定到同一个队列上,共同消费该队列中的消息。在这种模型中,队列中的消息一旦被某个消费者成功消费,就会从队列中移除,因此任务不会被重复执行,且同一个消息只会被一个消费者消费

1、定义消息队列Queue名称

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 工作队列消息队列名称*/public static final String WORK_QUEUE_NAME = "workQueue";
}

2、配置类Configuration


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 创建RabbitMQ的配置类* @author 10564*/
@Configuration
public class RabbitmqWorkQueueConfig {/*** 工作队列*/@Beanpublic Queue workQueue() {return new Queue(MqConstant.WORK_QUEUE_NAME);}
}

3、生产者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class WorkQueueProducer {private static final Logger log = LoggerFactory.getLogger(WorkQueueProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderWorkQueueMessage(String message) {log.info("\n工作队列生产者发送消息:{}\n",message);rabbitTemplate.convertAndSend(MqConstant.WORK_QUEUE_NAME,message);}
}

4、消费者Consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** 多个消费者绑定到一个队列,共同消费队列中的消息* @author 10564*/
@Component
public class WorkQueueConsumer {private static final Logger log = LoggerFactory.getLogger(WorkQueueConsumer.class);/*** 消费者one监听队列,并接收消息* @param message 消息内容*/@RabbitListener(queues = MqConstant.WORK_QUEUE_NAME)public void receiveWorkQueueMessageOne(String message) {log.info("\n工作队列消费者One接收消息:{}\n",message);}/*** 消费者two监听队列,并接收消息* @param message 消息内容*/@RabbitListener(queues = MqConstant.WORK_QUEUE_NAME)public void receiveWorkQueueMessageTwo(String message) {log.info("\n工作队列消费者Two接收消息:{}\n",message);}
}

5、测试Test

6、测试结果

### workQueue 连续请求三次达到以下结果
GET http://localhost:8004/mq/workQueue?message=workQueue2## 结果
2025-06-21 15:31:07.726  INFO 8236 --- [nio-8004-exec-3] o.x.s.m.workqueue.WorkQueueProducer      : 
工作队列生产者发送消息:workQueue workQueue22025-06-21 15:31:07.729  INFO 8236 --- [tContainer#11-1] o.x.s.m.workqueue.WorkQueueConsumer      : 
工作队列消费者One接收消息:workQueue workQueue22025-06-21 15:31:22.640  INFO 8236 --- [nio-8004-exec-7] o.x.s.m.workqueue.WorkQueueProducer      : 
工作队列生产者发送消息:workQueue workQueue22025-06-21 15:31:22.643  INFO 8236 --- [tContainer#10-1] o.x.s.m.workqueue.WorkQueueConsumer      : 
工作队列消费者Two接收消息:workQueue workQueue22025-06-21 15:31:24.122  INFO 8236 --- [nio-8004-exec-8] o.x.s.m.workqueue.WorkQueueProducer      : 
工作队列生产者发送消息:workQueue workQueue22025-06-21 15:31:24.124  INFO 8236 --- [tContainer#11-1] o.x.s.m.workqueue.WorkQueueConsumer      : 
工作队列消费者One接收消息:workQueue workQueue2

3、RabbitMQ 订阅发布Pub/Sub(fanout)

发布订阅模式,也叫做“广播(Broadcast)模式。生产者将消息发送到Exchange(交换机)上,
没有RoutingKey以及BindingKey的概念
,Bindings只是简单的将消息与交换机进行了绑定,如果消息进入了交换机中,那么这个消息会被转发到所有与当前交换机进行绑定的所有队列中。

发布订阅模型:允许一个消息向多个消费者投递,fanout , direct , topics都属于发布订阅模型。交换机使用Fanout-广播类型

1、定义消息队列Queue名称

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//订阅发布消息队列1public static final String FANOUT_QUEUE_ONE = "fanoutQueueOne";//订阅发布消息队列2public static final String FANOUT_QUEUE_TWO = "fanoutQueueTwo";//订阅发布消息队列 - 交换机public static final String FANOUT_EXCHANGE_NAME = "fanoutExchange";
}

2、配置类Configuration


package org.xwb.springcloud.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 创建RabbitMQ的配置类** @author 10564*/
@Configuration
public class RabbitmqFanoutConfig {@Beanpublic Queue fanoutQueueOne() {return new Queue(MqConstant.FANOUT_QUEUE_ONE);}@Beanpublic Queue fanoutQueueTwo() {return new Queue(MqConstant.FANOUT_QUEUE_TWO);}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(MqConstant.FANOUT_EXCHANGE_NAME);}//将订阅发布队列one 与该交换机绑定@Beanpublic Binding fanoutQueueOneBinding() {return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());}//将订阅发布队列two 与该交换机绑定@Beanpublic Binding fanoutQueueTwobinding() {return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());}
}

3、生产者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class FanoutProducer {private static final Logger log = LoggerFactory.getLogger(FanoutProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderFanoutQueue(String message) {log.info("\n订阅发布生产者发送消息:{}\n",message);//参数1:交换机名称//参数2:路由key//参数3:消息// fanout_exchange 广播类型的交换机  不需要指定路由key  所有绑定到该交换机的队列都会收到消息rabbitTemplate.convertAndSend(MqConstant.FANOUT_EXCHANGE_NAME,"",message);}
}

4、消费者Consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class FanoutConsumer {private static final Logger log = LoggerFactory.getLogger(FanoutConsumer.class);//监听queue1队列@RabbitListener(queues = MqConstant.FANOUT_QUEUE_ONE)public void receiveFanoutQueueOne(String msg) {log.info("\n订阅发布消费者One接收消息:{}\n",msg);}//监听queue2队列@RabbitListener(queues = MqConstant.FANOUT_QUEUE_TWO)public void receiveFanoutQueueTwo(String msg) {log.info("\n订阅发布消费者two接收消息:{}\n",msg);}
}

5、测试Test

package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.fanout.FanoutProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate FanoutProducer fanoutProducer;@GetMapping("/fanout")public void fanoutProducerTest(String message) {fanoutProducer.senderFanoutQueue("fanout " + message);}
}

6、测试结果

### fanout
GET http://localhost:8004/mq/fanout?message=fanout## 结果
2025-06-21 15:39:33.247  INFO 8236 --- [nio-8004-exec-4] o.x.s.messagetype.fanout.FanoutProducer  : 
订阅发布生产者发送消息:fanout fanout2025-06-21 15:39:33.250  INFO 8236 --- [ntContainer#6-1] o.x.s.messagetype.fanout.FanoutConsumer  : 
订阅发布消费者two接收消息:fanout fanout2025-06-21 15:39:33.250  INFO 8236 --- [ntContainer#5-1] o.x.s.messagetype.fanout.FanoutConsumer  : 
订阅发布消费者One接收消息:fanout fanout

4、RabbitMQ 路由(direct)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费,我们就要用的routing路由模式,这种模式是通过一个routingkey来收发消息。交换机的类型使用direct。

(与发布订阅模式相比,交换机并不是发送给所有绑定的队列,而是在这些绑定队列中,符合routingkey的队列)

1、定义消息队列Queue名称

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//路由队列1public static final String DIRECT_QUEUE_ONE = "directQueueOne";//路由队列2public static final String DIRECT_QUEUE_TWO = "directQueueTwo";//路由交换机public static final String DIRECT_EXCHANGE_NAME = "directExchangeName";//路由键onepublic static final String DIRECT_ROUTING_KEY_ONE = "directRoutingKeyOne";//路由键twopublic static final String DIRECT_ROUTING_KEY_TWO = "directRoutingKeyTwo";
}

2、配置类Configuration


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 创建RabbitMQ的配置类** @author 10564*/
@Configuration
public class RabbitmqDirectConfig {@Beanpublic Queue directQueueOne() {return new Queue(MqConstant.DIRECT_QUEUE_ONE);}@Beanpublic Queue directQueueTwo() {return new Queue(MqConstant.DIRECT_QUEUE_TWO);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(MqConstant.DIRECT_EXCHANGE_NAME);}@Beanpublic Binding directExchangeBindOne() {return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(MqConstant.DIRECT_ROUTING_KEY_ONE);}@Beanpublic Binding directExchangeBindTwo() {return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(MqConstant.DIRECT_ROUTING_KEY_TWO);}
}

3、生产者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564* @description: 直连交换机生产者*/
@Component
public class DirectProducer {private static final Logger log = LoggerFactory.getLogger(DirectProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderDirectOneMessage(String message) {log.info("\n路由模式生产者one发送消息:{}\n",message);//参数1:交换机名称//参数2:路由key//参数3:消息//topic_exchange交换机  需要指定路由key  绑定到该交换机且符合路由key的队列都会收到消息rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE_NAME,MqConstant.DIRECT_ROUTING_KEY_ONE,message+"ONE");}public void senderDirectTwoMessage(String message) {log.info("\n路由模式生产者two发送消息:{}\n",message);//参数1:交换机名称//参数2:路由key//参数3:消息//topic_exchange交换机  需要指定路由key  绑定到该交换机且符合路由key的队列都会收到消息rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE_NAME,MqConstant.DIRECT_ROUTING_KEY_TWO,message+"TWO");}
}

4、消费者Consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class DirectConsumer {private static final Logger log = LoggerFactory.getLogger(DirectConsumer.class);//监听queue1队列@RabbitListener(queues = MqConstant.DIRECT_QUEUE_ONE)public void receiveHelloQueueMessage1(String msg) {log.info("\n路由模式消费者one-监听队列one-路由键one 收到消息:{}\n",msg);}//监听queue2队列@RabbitListener(queues = MqConstant.DIRECT_QUEUE_TWO)public void receiveHelloQueueMessage2(String msg) {log.info("\n路由模式消费者two-监听队列two-路由键two 收到消息:{}\n",msg);}
}

5、测试Test


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.direct.DirectProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate DirectProducer directProducer;@GetMapping("/direct")public void directProducerTest(String message) {directProducer.senderDirectOneMessage("direct one " + message);directProducer.senderDirectTwoMessage("direct two " + message);}
}

6、测试结果

### direct
GET http://localhost:8004/mq/direct?message=direct## 结果
2025-06-21 15:44:34.354  INFO 8236 --- [nio-8004-exec-8] o.x.s.messagetype.direct.DirectProducer  : 
路由模式生产者one发送消息:direct one direct2025-06-21 15:44:34.354  INFO 8236 --- [nio-8004-exec-8] o.x.s.messagetype.direct.DirectProducer  : 
路由模式生产者two发送消息:direct two direct2025-06-21 15:44:34.357  INFO 8236 --- [ntContainer#3-1] o.x.s.messagetype.direct.DirectConsumer  : 
路由模式消费者one-监听队列one-路由键one 收到消息:direct one directONE2025-06-21 15:44:34.357  INFO 8236 --- [ntContainer#4-1] o.x.s.messagetype.direct.DirectConsumer  : 
路由模式消费者two-监听队列two-路由键two 收到消息:direct two directTWO

5、RabbitMQ 主题(topic)

Topic类型的交换机与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列,没有本质区别,只不过Topic类型交换机可以让队列在绑定Routing
key 的时候使用通配符!即不像Direct写的那么死。

Routing key:可以由一个或多个单词组成,每个单词之间用“.”来分隔,例如:topicExchange.job

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

Routing key = topicExchange.# 代表后面可以匹配多个,topicExchange.job topicExchange.lucky.day topicExchange.luck.for.you

Routing key = topicExchange.* 代表后面只能跟一个,topicExchange.job topicExchange.luck 但topicExchange.lucky.day(此处后面是两个单词所以不匹配)就不可以

1、定义消息队列Queue名称

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//队列01public static final String TOPIC_QUEUE_ONE = "topicQueueOne";//队列02public static final String TOPIC_QUEUE_TWO = "topicQueueTwo";//交换机public static final String TOPIC_EXCHANGE_NAME = "topicExchange";/*** # 路由键匹配一个或多个单词  routingKey. 开头的都可以匹配到 eg: routingKey.one routingKey.two等等*/public static final String TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE = "topicRoutingKey.#";// * 路由键匹配一个单词   topicRoutingKey. 开头的,如果多个,只匹配一个public static final String TOPIC_ROUTING_KEY_MATCHING_ONE = "topicRoutingKey.*";
}

2、配置类Configuration


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 创建RabbitMQ的配置类** @author 10564*/
@Configuration
public class RabbitmqTopicConfig {@Beanpublic Queue topicQueueOne() {return new Queue(MqConstant.TOPIC_QUEUE_ONE);}@Beanpublic Queue topicQueueTwo() {return new Queue(MqConstant.TOPIC_QUEUE_TWO);}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(MqConstant.TOPIC_EXCHANGE_NAME);}@Beanpublic Binding topicBindingMatchingOneOrMore() {return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE);}@Beanpublic Binding topicBindingMatchingOne() {return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE);}
}

3、生产者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class TopicQueueProducer {private static final Logger log = LoggerFactory.getLogger(TopicQueueProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送topic主体消息队列,匹配一个 或者匹配多个* @param message 消息* @param routingKey 路由key*/public void senderTopicQueueRoutingMatching(String message,String routingKey) {log.info("\n主题模式 生产者发送消息 message:{}\n",message);log.info("\n主题模式 Routing:【{},{}】,当前的消息routing:{}\n",MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE,MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE,routingKey);//参数1:交换机名称//TODO 参数2:路由key * 匹配一个 # 匹配一个或者多个【此处的key是匹配MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE或者是TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE】//参数3:消息// topic_exchange交换机  需要指定路由key  绑定到该交换机且符合路由key的队列都会收到消息rabbitTemplate.convertAndSend(MqConstant.TOPIC_EXCHANGE_NAME,routingKey,message);}
}

4、消费者Consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/**** @author 10564*/
@Component
public class TopicQueueConsumer {private static final Logger log = LoggerFactory.getLogger(TopicQueueConsumer.class);/*** 消费者one监听队列,并接收消息* @param message 消息内容*/@RabbitListener(queues = MqConstant.TOPIC_QUEUE_ONE)public void receiveTopicQueueMessageOne(String message) {log.info("\n主题模式消费者01监听队列01-》路由键01 收到消息:{}\n",message);}/*** 消费者two监听队列,并接收消息* @param message 消息内容*/@RabbitListener(queues = MqConstant.TOPIC_QUEUE_TWO)public void receiveTopicQueueMessageTwo(String message) {log.info("\n主题模式消费者02监听队列02-》路由键02 收到消息:{}\n",message);}
}

5、测试Test

package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.topic.TopicQueueProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate TopicQueueProducer topicQueueProducer;@GetMapping("/topic")public void topicQueueProducerTest(String message, String routingKey) {topicQueueProducer.senderTopicQueueRoutingMatching("topic one " + message, routingKey);}
}

6、测试结果


### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one## 结果
2025-06-21 15:49:03.545  INFO 8236 --- [nio-8004-exec-2] o.x.s.m.topic.TopicQueueProducer         : 
主题模式 生产者发送消息 message:topic one topic2025-06-21 15:49:03.545  INFO 8236 --- [nio-8004-exec-2] o.x.s.m.topic.TopicQueueProducer         : 
主题模式 Routing:【topicRoutingKey.*,topicRoutingKey.#】,当前的消息routing:topicRoutingKey.one2025-06-21 15:49:03.547  INFO 8236 --- [ntContainer#9-1] o.x.s.m.topic.TopicQueueConsumer         : 
主题模式消费者01监听队列01-》路由键01 收到消息:topic one topic2025-06-21 15:49:03.547  INFO 8236 --- [ntContainer#8-1] o.x.s.m.topic.TopicQueueConsumer         : 
主题模式消费者02监听队列02-》路由键02 收到消息:topic one topic### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one.two## 结果
2025-06-21 15:49:28.181  INFO 8236 --- [nio-8004-exec-6] o.x.s.m.topic.TopicQueueProducer         : 
主题模式 生产者发送消息 message:topic one topic2025-06-21 15:49:28.182  INFO 8236 --- [nio-8004-exec-6] o.x.s.m.topic.TopicQueueProducer         : 
主题模式 Routing:【topicRoutingKey.*,topicRoutingKey.#】,当前的消息routing:topicRoutingKey.one.two2025-06-21 15:49:28.184  INFO 8236 --- [ntContainer#9-1] o.x.s.m.topic.TopicQueueConsumer         : 
主题模式消费者01监听队列01-》路由键01 收到消息:topic one topic

测试工具类 request.http文件

(IDEA搜索HTTP Client插件工具可以直接使用)


### simple
GET http://localhost:8004/mq/simple?message=simple111### workQueue
GET http://localhost:8004/mq/workQueue?message=workQueue2### fanout
GET http://localhost:8004/mq/fanout?message=fanout### direct
GET http://localhost:8004/mq/direct?message=direct### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one.two
http://www.xdnf.cn/news/1063459.html

相关文章:

  • React JSX语法
  • OCCT基础类库介绍:Modeling Algorithm - Features
  • 软件工程期末试卷简答题版带答案(共21道)
  • 【DCS开源项目】—— Lua 如何调用 DLL、DLL 与 DCS World 的交互
  • Vue3 + TypeScript + xlsx 导入excel文件追踪数据流转详细记录(从原文件到目标数据)
  • 领域驱动设计(DDD)【3】之事件风暴
  • EasyExcel导出极致封装 含枚举转换 分页导出
  • GitHub Copilot快捷键
  • 缓存与加速技术实践-Kafka消息队列
  • 腾讯云IM即时通讯:开启实时通信新时代
  • Python中字符串常用的操作方法
  • Linux TCP/IP协议栈中的TCP输入处理:net/ipv4/tcp_input.c解析
  • 学习C++、QT---03(C++的输入输出、C++的基本数据类型介绍)
  • AI与SEO关键词协同进化
  • IEC61850 通信协议测试验证方法详解
  • 解锁K-近邻算法:数据挖掘的秘密武器
  • 华为云Flexus+DeepSeek征文 | 基于Flexus X实例的金融AI Agent开发:智能风控与交易决策系统
  • 【AI论文】扩散二元性
  • 面试题-定义一个函数入参数是any类型,返回值是string类型,如何写出这个函数,代码示例
  • ncu学习笔记01——合并访存
  • 系统化的Node.js服务器搭建攻略
  • 将Python的JSON字符串转换为JSON
  • UE5 游戏模板 —— FirstShootGame
  • Docker简单介绍与使用以及下载对应镜像(项目前置)
  • 【软考高级系统架构论文】论湖仓一体架构及其应用
  • RNN工作原理和架构
  • Python的6万张图像数据集CIFAR-10和CIFAR-100说明
  • Redis哨兵模式的学习(三)
  • STM32F103_LL库+寄存器学习笔记12.3 - 串口DMA高效收发实战3:支持多实例化的版本
  • 【24】二维码数据集(有v5/v8模型)/YOLO二维码检测