当前位置: 首页 > backend >正文

SpringBoot集成ActiveMQ

一 、什么是Active MQ
Active MQ是消息队列技术的具体实现之一,支持点对点(Queue)与订阅/发布(Topic)两种信息传递模式。其常用于建立服务之间的连接机制,与Feign相比,更适用于异步操作,像数据同步这种与业务无强关联又耗时的操作,就很适合使用。
Active MQ基于JMS消息代理实现(JMS是Java的消息服务规范,类似于JDBC,是一种实现功能的标准)。与之相应的还有另一种规范AMQP,较JMS而言更加先进,同时兼容JMS,其最著名的具体实现是Rabbit MQ。

二 Active MQ的两种角色
在Active MQ中有两种角色。
发布者(publisher):用于发布消息。
消费者(consumer):用于消费(接收)消息。

三 、Active MQ的两种信息传递模式
1、点对点(Queue)模式
点对点模式:一个消息只能有一个消费者(情场菜鸟,一套说辞撩一个妹纸)。
在这里插入图片描述
2、订阅/发布(Topic)模式
订阅/发布模式:一个消息可以有多个消费者(情场老手,一套说辞撩多个妹纸)。
在这里插入图片描述
四、Spring Boot工程整合Active MQ准备工作
1、添加依赖

<!--  Spring Boot工程Active MQ起步依赖  -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--  Apache组织Active MQ池依赖  -->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId>
</dependency>

2、配置文件(application.yml)

spring:activemq:// 注意,默认的端口号是61616。broker-url: tcp://127.0.0.1:61616user: adminpassword: adminpool:enabled: truemax-connections: 10jms:// 是否开启发布/订阅模式(false:Queue模式,true:Topic模式,默认false)。pub-sub-domain: true

五、SpringBoot工程整合ActiveMQ(Queue)
注:Queue模式必须将配置文件中的spring.jms.pub-sub-domain参数设置为false。(默认false)
1、发布者

 	@Autowiredprivate JmsTemplate jmsTemplate;/*** 1:下发Queue消息*/@PostMapping(value = "send/queue", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public void sendQueue() {// 设置点对点模式队列名。String name = "active.queue";// 实例化点对点模式队列实例。Queue queue = new ActiveMQQueue(name);// 循环发送消息。String message;for (int i = 1; i <= 10; i++) {message = "点对点模式:第" + i + "次消息。";System.out.println("发送 " + message);// 下发消息,参数1表示消息所在的队列,消息二表示消息。jmsTemplate.convertAndSend(queue, message);}}

2、消费者 //多个消费者时,轮询处理消息
设置消息队列监听器,一个监听器代表一个消费者,注解@JmsListener的destination属性用于填写队列名。被@JmsListener注解的方法为监听器。

注:监听器所在的类必须配置为Bean。

@Service
public class ActiveListener {/*** 1:处理*/@JmsListener(destination = "active.queue")public void handlerOne(String message) {System.out.println("消费者1接收 " + message);}/*** 2:处理*/@JmsListener(destination = "active.queue")public void handlerTwo(String message) {System.out.println("消费者2接收 " + message);}
}

六、Spring Boot工程整合Active MQ(Topic)
注:Topic模式必须将配置文件中的spring.jms.pub-sub-domain参数设置为true。
1、发布者

@Autowiredprivate JmsTemplate jmsTemplate;/*** 2:下发Topic消息*/@PostMapping(value = "send/topic", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public void sendTopic() {// 设置发布/订阅模式队列名。String name = "active.topic";// 实例化发布/订阅模式队列实例。Topic topic = new ActiveMQTopic(name);// 循环发送消息。String message;for (int i = 1; i <= 10; i++) {message = "发布/订阅模式:第" + i + "次消息。";System.out.println("发送 " + message);jmsTemplate.convertAndSend(topic, message);}}

2、消费者 //一条消息允许被多个消费者消费
修改监听器监听的队列。

@Service
public class ActiveListener {/*** 1:处理*/@JmsListener(destination = "active.topic")public void handlerOne(String message) {System.out.println("消费者1接收 " + message);}/*** 2:处理*/@JmsListener(destination = "active.topic")public void handlerTwo(String message) {System.out.println("消费者2接收 " + message);}
}

七、使用ActiveMQ传输对象
Active MQ是无法直接传输对象的,可以先将对象转换成JSON字符串后发送,再在消费者端转换成对象的模式。
1、添加依赖

<!--  阿里巴巴Json依赖  --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.25</version></dependency>

2、发布者

 /*** 2:下发Topic消息*/@PostMapping(value = "send/topic", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public void sendTopic() {// 设置发布/订阅模式队列名。String name = "active.topic";// 实例化发布/订阅模式队列实例。Topic topic = new ActiveMQTopic(name);// 循环发送消息。UserInformation userInformation;for (int i = 1; i <= 10; i++) {userInformation = new UserInformation();userInformation.setId(i);// 将对象转换成JSON字符串。jmsTemplate.convertAndSend(topic, JSONObject.toJSONString(userInformation));}}

3、消费者

/*** 1:处理*/@JmsListener(destination = "active.topic")public void handlerOne(String message) {UserInformation userInformation = JSONObject.parseObject(message, UserInformation.class);System.out.println("消费者1接收 " + userInformation);}

八、同时使用Queue和Topic
想在工程中同时使用Queue和Topic,关键是能为消费者自主配置不通模式的监听工厂,因此我们需要先分别为Queue和Topic实例化一个监听工厂。
1、配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;import javax.jms.ConnectionFactory;@Configuration
public class ActiveConfig {// 监听器工厂名public static final String QUEUE_LISTENER_FACTORY = "queueJmsListenerContainerFactory";public static final String TOPIC_LISTENER_FACTORY = "topicJmsListenerContainerFactory";/*** 配置Queue监听器工厂Bean** @param connectionFactory 连接工厂对象* @return 监听器工厂对象*/@Bean(name = QUEUE_LISTENER_FACTORY)public JmsListenerContainerFactory<?> queueJmsListenerContainerFactoryBean(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();defaultJmsListenerContainerFactory.setPubSubDomain(false);defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);return defaultJmsListenerContainerFactory;}/*** 配置Topic监听器工厂Bean** @param connectionFactory 连接工厂对象* @return 监听器工厂对象*/@Bean(name = TOPIC_LISTENER_FACTORY)public JmsListenerContainerFactory<?> topicJmsListenerContainerFactoryBean(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();defaultJmsListenerContainerFactory.setPubSubDomain(true);defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);return defaultJmsListenerContainerFactory;}
}
spring:activemq:// 注意,默认的端口号是61616。broker-url: tcp://127.0.0.1:61616user: adminpassword: adminpool:enabled: truemax-connections: 10// 因为自主配置模式,因此jms配置可以删除。// jms:// 是否开启发布/订阅模式(false:Queue模式,true:Topic模式,默认false)。// pub-sub-domain: true

2、Queue模式

    @JmsListener(destination = "active.queue", containerFactory = ActiveConfig.QUEUE_LISTENER_FACTORY)public void handlerOne(String message) {UserInformation userInformation = JSONObject.parseObject(message, UserInformation.class);System.out.println("消费者1接收 " + userInformation);}

3、Topic模式

@JmsListener(destination = "active.topic", containerFactory = ActiveConfig.TOPIC_LISTENER_FACTORY)public void handlerTwo(String message) {UserInformation userInformation = JSONObject.parseObject(message, UserInformation.class);System.out.println("消费者2接收 " + userInformation);}

参考博客:https://blog.csdn.net/qq_39288456/article/details/84137561

http://www.xdnf.cn/news/13787.html

相关文章:

  • 3D 展示崛起:科技赋能的新变革
  • 【力扣 简单 C】83. 删除排序链表中的重复元素
  • 英一真题阅读单词笔记 10年
  • c语言接口设计模式之抽象算法,以冒泡排序为例
  • @Validation 的使用 Spring
  • Matlab图像清晰度评价指标
  • 如何在网页里填写 PDF下拉框
  • STM32 开发 - 中断案例(中断概述、STM32 的中断、NVIC 嵌套向量中断控制器、外部中断配置寄存器组、EXTI 外部中断控制器、实例实操)
  • Spring Boot 项目中Http 请求如何对响应体进行压缩
  • [C++][设计模式] : 单例模式(饿汉和懒汉)
  • php列表头部增加批量操作按钮,多选订单数据批量微信退款(含微信支付SDK)
  • 洛谷-P3375 【模板】KMP
  • 前端导出PDF(适配ios Safari浏览器)
  • 常见的网络协议有哪些
  • 图像匹配算法 笔记2025
  • 【从零学习JVM|第七篇】快速了解直接内存
  • Qt QTcpSocket的write无法发送数据【已解决】
  • 打卡day52
  • UE5制作与云渲染配置不足?3090/4090显卡云端解放创作力
  • 基于sample_aiisp例子,创建3路编码流,记录
  • 奥威BI:用AI重新定义数据分析,中小企业数字化转型的智能引擎
  • 力扣HOT100之技巧:31. 下一个排列
  • CMS软件以及常见分类
  • excel中自定义公式
  • 基于 Nginx 服务器的泛域名 SSL 证书申请与部署
  • 腾讯云:6月30日起,自动禁用,及时排查
  • keil5怎么关闭工程
  • JavaScript中的迭代器模式:优雅遍历数据的“设计之道”
  • React---Hooks深入
  • vue3 全局过滤器