当前位置: 首页 > news >正文

5、SpringBoot整合RabbitMQ

5.1 工作队列模式

1、生产者

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope>
</dependency>

添加配置

spring:rabbitmq:addresses: amqp://pinkboy:123456@8.136.108.248:5672//

声明工作队列(WORK_Queue)名称

public class Constants {// 工作模式public static final String WORK_QUEUE = "work.queue";
}

声明队列

@Bean 在这里的作用是将 workQueue() 方法创建的队列对象纳入 Spring 容器进行统一管理

@Configuration
public class RabbitMQConfig {@Bean("workQueue")public Queue workQueue() {return QueueBuilder.durable(Constants.WORK_QUEUE).build();}
}
@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work() {//使用内置交换机,RoutingKey,和队列名一致for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp : work..." + i);}return "发送成功!";}
}

2、编写消费者代码

@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String message) {System.out.println("[" + Constants.DIRECT_QUEUE1 + "]" + "接收到的信息:" + message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String message) {System.out.println("[" + Constants.DIRECT_QUEUE2 + "]" + "接收到的信息:" + message);}
}

@RabbitListener是Spring框架中用于监听RabbitMq队列的注解,这通过使用这个注解,可以定义一个方以便从RabbitMQ队列中接收消息,该注解支持多种参数类型,这些参数类型代表了从RabitMQ接收到的消息和相关信息

以下是一些常用参数类型

1、String :返回消息的内容

2、Message :Spring AMQP的Message类,返回原始的消息体以及消息的属性,如消息、内容、队列信息等

3、Channel :RbbitMQ的通道对象,可以用于进行更高级的操作,如手动确认消息

3、观察运行结果

1、运行项目调用接口发送消息

监听消息并打印 

两个消费者竞争消费队列中的消息

5.2 Publish/Subscribe(发布订阅模式)

1、编写生产者代码

和简单模式的区别是: 需要创建交换机, 并且绑定队列和交换机
声明队列, 交换机, 绑定队列和交换机
  // 发布订阅模式public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";public static final String FANOUT_EXCHANGE = "fanout.exchange";
 //广播模式@Bean("fanoutQueue1")public Queue fanoutQueue1() {return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2() {return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();}@Bean("fanoutQueueBinding1")public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {return BindingBuilder.bind(queue).to(fanoutExchange);}@Bean("fanoutQueueBinding2")public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) {return BindingBuilder.bind(queue).to(fanoutExchange);}

使用接口发送消息

 @RequestMapping("/fanout")public String fanout() {//使用内置交换机,RoutingKey,和队列名一致for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", "hello spring amqp : fanout..." + i);}return "发送成功!";}

2、编写消费者代码

定义监听类

@Component
public class FanoutListener {@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void queueListener1(String message) {System.out.println("[" + Constants.FANOUT_QUEUE1 + "]" + "接收到的信息:" + message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void queueListener2(String message) {System.out.println("[" + Constants.FANOUT_QUEUE2 + "]" + "接收到的信息:" + message);}
}

3、观察运行结果

调用接口

消息以广播的形式发送给两个消费者

5.3 Routing(路由模式)

交换机类型为Direct时,会把消息交给符合指定routingkey的队列.
队列和交换机的绑定,不是任意的绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也需要指定消息的RoutingKey
Exchange也不再把消息交给每一个绑定的key,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey和消息的RoutingKey完全一致,才会接收到消息

1、编写生产者代码

声明队列, 交换机, 绑定队列和交换机
// 路由模式public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";public static final String DIRECT_EXCHANGE = "direct.exchange";
//路由模式@Bean("directQueue1")public Queue directQueue1() {return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2() {return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}@Bean("directQueueBinding1")public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("orange");}@Bean("directQueueBinding2")public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("black");}@Bean("directQueueBinding3")public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("orange");}

使用接口发送消息

@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable("routingKey") String routingKey) {rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello spring amqp:direct,my routing key is " + routingKey);return "发送成功!";}

2、编写消费者代码

定义监听类

@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String message) {System.out.println("[" + Constants.DIRECT_QUEUE1 + "]" + "接收到的信息:" + message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String message) {System.out.println("[" + Constants.DIRECT_QUEUE2 + "]" + "接收到的信息:" + message);}
}

3、运行程序,观察结果

1、 调用接口发送routingkey为orange的消息

queue1 和 queue2 分别接收到orange消息

2、调用接口发送routingkey为black的消息

5.4 Topic(通配符模式)

1、编写生产者代码

   //通配符模式public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";public static final String TOPIC_EXCHANGE = "topic.exchange";
//通配符模式@Bean("topicQueue1")public Queue topicQueue1() {return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2() {return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange() {return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueueBinding1")public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue) {return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");}@Bean("topicQueueBinding2")public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");}@Bean("topicQueueBinding3")public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");}

使用接口发送消息

@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey) {rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello spring amqp:topic,my routing key is " + routingKey);return "发送成功!";}

2、编写生产者代码

定义监听类

@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String message) {System.out.println("[" + Constants.DIRECT_QUEUE1 + "]" + "接收到的信息:" + message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String message) {System.out.println("[" + Constants.DIRECT_QUEUE2 + "]" + "接收到的信息:" + message);}
}

3、运行程序观察结果

启动程序调用接口发送routingKey为 a.orange.b

启动程序调用接口发送routingKey为 a.orange.rabbit
启动程序调用接口发送routingKey为 lazy.a.b.c

http://www.xdnf.cn/news/227305.html

相关文章:

  • 39.RocketMQ高性能核心原理与源码架构剖析
  • iview表单提交验证时,出现空值参数被过滤掉不提交的问题解决
  • 大连理工大学选修课——机器学习笔记(2):机器学习的一般原理
  • 智能检索革命全景透视——基于《搜索引擎信息检索困境破解体系》深度拆解
  • 数据结构篇:线性表的另一表达—链表之单链表(下篇)
  • 宇树科技开启“人形机器人格斗盛宴”
  • LeetCode 2302.统计得分小于 K 的子数组数目:滑动窗口(不需要前缀和)
  • Java架构师深度技术面试:从核心基础到分布式架构全解析
  • Milvus(11):动态字段、可归零和默认值
  • 基于开源AI智能名片链动2+1模式S2B2C商城小程序的私域电商与微商融合创新研究
  • 基于Docker的Elasticsearch ARM64架构镜像构建实践
  • vue 和 html 的区别
  • 20250430在ubuntu14.04.6系统上查看系统实时网速
  • 运营岗位选择
  • 多用户远程 Debugger 服务隔离方案技术实践
  • Java使用 MyBatis-Plus 实现前端组装查询语句、后端动态执行查询的功能,
  • 使用vue开发electron
  • Git从入门到精通-第二章-工具配置
  • 软考中级-软件设计师 数据结构(手写笔记)
  • 文献分享:CovEpiAb-冠状病毒免疫表位及抗体数据库
  • HCIP-数据通信datacom认证
  • 【RustDesk 】中继1:压力测试 Python 版 RustDesk 中继服务器
  • 【安全扫描器原理】基于协议的服务扫描器
  • 欧洲分子生物学实验室EMBL介绍
  • 详解具身智能机器人开源数据集:RoboMIND
  • 数字孪生技术十大创新应用场景与工程实践
  • Vue3 Echarts 3D立方体柱状图实现教程
  • 碰一碰发视频源码||客户端开发实战:NFC+低延迟传输技术实现引言
  • 【每日八股】复习 Redis Day3:Redis 的应用
  • 电脑干货:开源免费的QQ空间说说下载工具GetQzonehistory