SpringBoot集成ActiveMQ
一 、什么是Active MQ
Active MQ是消息队列技术的具体实现之一,支持点对点(Queue)与订阅/发布(Topic)两种信息传递模式。其常用于建立服务之间的连接机制,与Feign相比,更适用于异步操作,像数据同步这种与业务无强关联又耗时的操作,就很适合使用。
Active MQ基于JMS消息代理实现(JMS是Java的消息服务规范,类似于JDBC,是一种实现功能的标准)。与之相应的还有另一种规范AMQP,较JMS而言更加先进,同时兼容JMS,其最著名的具体实现是Rabbit MQ。
二 Active MQ的两种角色
在Active MQ中有两种角色。
发布者(publisher):用于发布消息。
消费者(consumer):用于消费(接收)消息。
三 、Active MQ的两种信息传递模式
1、点对点(Queue)模式
点对点模式:一个消息只能有一个消费者(情场菜鸟,一套说辞撩一个妹纸)。
2、订阅/发布(Topic)模式
订阅/发布模式:一个消息可以有多个消费者(情场老手,一套说辞撩多个妹纸)。
四、Spring Boot工程整合Active MQ准备工作
1、添加依赖
<!-- Spring Boot工程Active MQ起步依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- Apache组织Active MQ池依赖 -->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId>
</dependency>
2、配置文件(application.yml)
spring:activemq:// 注意,默认的端口号是61616。broker-url: tcp://127.0.0.1:61616user: adminpassword: adminpool:enabled: truemax-connections: 10jms:// 是否开启发布/订阅模式(false:Queue模式,true:Topic模式,默认false)。pub-sub-domain: true
五、SpringBoot工程整合ActiveMQ(Queue)
注:Queue模式必须将配置文件中的spring.jms.pub-sub-domain参数设置为false。(默认false)
1、发布者
@Autowiredprivate JmsTemplate jmsTemplate;/*** 1:下发Queue消息*/@PostMapping(value = "send/queue", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public void sendQueue() {// 设置点对点模式队列名。String name = "active.queue";// 实例化点对点模式队列实例。Queue queue = new ActiveMQQueue(name);// 循环发送消息。String message;for (int i = 1; i <= 10; i++) {message = "点对点模式:第" + i + "次消息。";System.out.println("发送 " + message);// 下发消息,参数1表示消息所在的队列,消息二表示消息。jmsTemplate.convertAndSend(queue, message);}}
2、消费者 //多个消费者时,轮询处理消息
设置消息队列监听器,一个监听器代表一个消费者,注解@JmsListener的destination属性用于填写队列名。被@JmsListener注解的方法为监听器。
注:监听器所在的类必须配置为Bean。
@Service
public class ActiveListener {/*** 1:处理*/@JmsListener(destination = "active.queue")public void handlerOne(String message) {System.out.println("消费者1接收 " + message);}/*** 2:处理*/@JmsListener(destination = "active.queue")public void handlerTwo(String message) {System.out.println("消费者2接收 " + message);}
}
六、Spring Boot工程整合Active MQ(Topic)
注:Topic模式必须将配置文件中的spring.jms.pub-sub-domain参数设置为true。
1、发布者
@Autowiredprivate JmsTemplate jmsTemplate;/*** 2:下发Topic消息*/@PostMapping(value = "send/topic", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public void sendTopic() {// 设置发布/订阅模式队列名。String name = "active.topic";// 实例化发布/订阅模式队列实例。Topic topic = new ActiveMQTopic(name);// 循环发送消息。String message;for (int i = 1; i <= 10; i++) {message = "发布/订阅模式:第" + i + "次消息。";System.out.println("发送 " + message);jmsTemplate.convertAndSend(topic, message);}}
2、消费者 //一条消息允许被多个消费者消费
修改监听器监听的队列。
@Service
public class ActiveListener {/*** 1:处理*/@JmsListener(destination = "active.topic")public void handlerOne(String message) {System.out.println("消费者1接收 " + message);}/*** 2:处理*/@JmsListener(destination = "active.topic")public void handlerTwo(String message) {System.out.println("消费者2接收 " + message);}
}
七、使用ActiveMQ传输对象
Active MQ是无法直接传输对象的,可以先将对象转换成JSON字符串后发送,再在消费者端转换成对象的模式。
1、添加依赖
<!-- 阿里巴巴Json依赖 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.25</version></dependency>
2、发布者
/*** 2:下发Topic消息*/@PostMapping(value = "send/topic", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)public void sendTopic() {// 设置发布/订阅模式队列名。String name = "active.topic";// 实例化发布/订阅模式队列实例。Topic topic = new ActiveMQTopic(name);// 循环发送消息。UserInformation userInformation;for (int i = 1; i <= 10; i++) {userInformation = new UserInformation();userInformation.setId(i);// 将对象转换成JSON字符串。jmsTemplate.convertAndSend(topic, JSONObject.toJSONString(userInformation));}}
3、消费者
/*** 1:处理*/@JmsListener(destination = "active.topic")public void handlerOne(String message) {UserInformation userInformation = JSONObject.parseObject(message, UserInformation.class);System.out.println("消费者1接收 " + userInformation);}
八、同时使用Queue和Topic
想在工程中同时使用Queue和Topic,关键是能为消费者自主配置不通模式的监听工厂,因此我们需要先分别为Queue和Topic实例化一个监听工厂。
1、配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;import javax.jms.ConnectionFactory;@Configuration
public class ActiveConfig {// 监听器工厂名public static final String QUEUE_LISTENER_FACTORY = "queueJmsListenerContainerFactory";public static final String TOPIC_LISTENER_FACTORY = "topicJmsListenerContainerFactory";/*** 配置Queue监听器工厂Bean** @param connectionFactory 连接工厂对象* @return 监听器工厂对象*/@Bean(name = QUEUE_LISTENER_FACTORY)public JmsListenerContainerFactory<?> queueJmsListenerContainerFactoryBean(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();defaultJmsListenerContainerFactory.setPubSubDomain(false);defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);return defaultJmsListenerContainerFactory;}/*** 配置Topic监听器工厂Bean** @param connectionFactory 连接工厂对象* @return 监听器工厂对象*/@Bean(name = TOPIC_LISTENER_FACTORY)public JmsListenerContainerFactory<?> topicJmsListenerContainerFactoryBean(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();defaultJmsListenerContainerFactory.setPubSubDomain(true);defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);return defaultJmsListenerContainerFactory;}
}
spring:activemq:// 注意,默认的端口号是61616。broker-url: tcp://127.0.0.1:61616user: adminpassword: adminpool:enabled: truemax-connections: 10// 因为自主配置模式,因此jms配置可以删除。// jms:// 是否开启发布/订阅模式(false:Queue模式,true:Topic模式,默认false)。// pub-sub-domain: true
2、Queue模式
@JmsListener(destination = "active.queue", containerFactory = ActiveConfig.QUEUE_LISTENER_FACTORY)public void handlerOne(String message) {UserInformation userInformation = JSONObject.parseObject(message, UserInformation.class);System.out.println("消费者1接收 " + userInformation);}
3、Topic模式
@JmsListener(destination = "active.topic", containerFactory = ActiveConfig.TOPIC_LISTENER_FACTORY)public void handlerTwo(String message) {UserInformation userInformation = JSONObject.parseObject(message, UserInformation.class);System.out.println("消费者2接收 " + userInformation);}
参考博客:https://blog.csdn.net/qq_39288456/article/details/84137561