Spring AMQP如何通过配置文件避免硬编码实现解耦
在使用Spring AMQP基于注解声明监听者时,可通过抽取常量来避免硬编码:
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(MQConstant.USER_EXCHANGE),value = @Queue(MQConstant.USER_QUEUE),key = MQConstant.USER_REDIS_BINDING))public void deleteUserInfoRedisByExchange(String message) {log.info("监听到消息message:{}", message);}
rabbitTemplate.convertAndSend(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, message);
这种方式方便快捷,在以后修改时可通过修改常量类即可。下面介绍一种基于编程式声明监听者并通过配置文件(yml)进行修改的方式避免硬编码。
配置yml文件(或properties文件),自定义交换机、队列:
hl:amqp:# RabbitMQ交换机名称exchanges:userExchange:name: css.user.exchangetype: directdurable: true# 队列名称queues: userQueue:name: css.user.redis.loginUser.queuedurable: trueexclusive: falseautoDelete: false# 绑定关系 bindings:userRedisBinding:exchange: ${hl.amqp.exchanges.userExchange.name}queue: ${hl.amqp.queues.userQueue.name}routingKey: user.redis.loginUser.del
创建properties读取yml文件:
import java.util.Map;import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import lombok.Data;/*** RabbitMQ配置属性类*/
@Component
@ConfigurationProperties(prefix = "hl.amqp")
@Data
public class AmqpConfigProperties {private Map<String, ExchangeConfig> exchanges;private Map<String, QueueConfig> queues;private Map<String, BindingConfig> bindings;@Datapublic static class ExchangeConfig {private String name;private String type;private boolean durable = true;private boolean autoDelete = false;private boolean internal = false;}@Datapublic static class QueueConfig {private String name;private boolean durable = true;private boolean exclusive = false;private boolean autoDelete = false;}@Datapublic static class BindingConfig {private String exchange;private String queue;private String routingKey;}
}
创建动态加载配置类,进行交换机,队列和绑定关系的注册:
import com.hl.campusservicesys.properties.AmqpConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class DynamicAmqpConfig {private final AmqpAdmin amqpAdmin;private final AmqpConfigProperties amqpConfigProperties;// 改为构造器注入或方法注入public DynamicAmqpConfig(AmqpAdmin amqpAdmin, AmqpConfigProperties amqpConfigProperties) {this.amqpAdmin = amqpAdmin;this.amqpConfigProperties = amqpConfigProperties;}// 创建交换机@Beanpublic Map<String, Exchange> amqpExchanges() {Map<String, Exchange> exchanges = new HashMap<>();amqpConfigProperties.getExchanges().forEach((key, config) -> {Exchange exchange = switch (config.getType()) {case ExchangeTypes.DIRECT -> new DirectExchange(config.getName(), config.isDurable(), config.isAutoDelete());case ExchangeTypes.TOPIC -> new TopicExchange(config.getName(), config.isDurable(), config.isAutoDelete());case ExchangeTypes.FANOUT -> new FanoutExchange(config.getName(), config.isDurable(), config.isAutoDelete());default -> throw new IllegalArgumentException("Unsupported exchange type: " + config.getType());};exchanges.put(key, exchange);// 直接通过amqpAdmin声明交换机到RabbitMQamqpAdmin.declareExchange(exchange);});log.info("RabbitMQ交换机初始化完成!");return exchanges;}// 创建队列@Beanpublic Map<String, Queue> amqpQueues() {Map<String, Queue> queues = new HashMap<>();amqpConfigProperties.getQueues().forEach((key, config) -> {Queue queue = new Queue(config.getName(),config.isDurable(),config.isExclusive(),config.isAutoDelete());queues.put(key, queue);// 直接通过amqpAdmin声明队列到RabbitMQamqpAdmin.declareQueue(queue);});log.info("RabbitMQ队列初始化完成!");return queues;}// 创建绑定关系@Beanpublic Map<String, Binding> amqpBindings(Map<String, Exchange> amqpExchanges, Map<String, Queue> amqpQueues) {Map<String, Binding> bindings = new HashMap<>();amqpConfigProperties.getBindings().forEach((key, config) -> {// 查找对应的交换机和队列Exchange exchange = null;for (Exchange ex : amqpExchanges.values()) {if (ex.getName().equals(config.getExchange())) {exchange = ex;break;}}Queue queue = null;for (Queue q : amqpQueues.values()) {if (q.getName().equals(config.getQueue())) {queue = q;break;}}if (exchange != null && queue != null) {Binding binding = BindingBuilder.bind(queue).to(exchange).with(config.getRoutingKey()).noargs();bindings.put(key, binding);// 直接通过amqpAdmin声明绑定关系到RabbitMQamqpAdmin.declareBinding(binding);}});log.info("RabbitMQ绑定关系初始化完成!");return bindings;}@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置消息转换器@Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}// 配置 RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);return rabbitTemplate;}}
配置消息监听器:
/*** 用户服务监听器* */
@Slf4j
@Component
public class UserListener {@Resourceprivate RedisCache redisCache;@RabbitListener(queues = "${hl.amqp.queues.userQueue.name}")public void deleteUserInfoRedis(Long userId) {log.info("监听到消息message:{}", userId);redisCache.deleteObject(RedisConstant.USER_INFO_KEY + userId);}}
创建消息发送者:
/*** RabbitMQ工具类* */
@Component
public class AmqpMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate AmqpConfigProperties amqpConfigProperties;/*** 发送消息* @param exchange 交换机配置键名* @param routingKey 路由键* @param message 消息内容*/public void sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(amqpConfigProperties.getExchanges().get(exchange).getName(),amqpConfigProperties.getBindings().get(routingKey).getRoutingKey(), message);rabbitTemplate.convertAndSend(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, message);}}
创建常量类:
/*** MQ常量* */
public class MQConstant {public static final String USER_EXCHANGE = "userExchange";public static final String USER_QUEUE = "userQueue";public static final String USER_REDIS_BINDING = "userRedisBinding";
}
使用:
// 发送消息更新缓存
amqpMessageSender.sendMessage(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, userDTO.getUserId());
示例:
启动时完成初始化:
发送消息,监听者接收并处理:
在启动时,spring会自动加载注册配置的交换机、队列并完成绑定,对比下来肯定是没直接使用常量类方便来着,感兴趣可以玩玩。