当前位置: 首页 > news >正文

Spring AMQP如何通过配置文件避免硬编码实现解耦

        在使用Spring AMQP基于注解声明监听者时,可通过抽取常量来避免硬编码:

@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(MQConstant.USER_EXCHANGE),value = @Queue(MQConstant.USER_QUEUE),key = MQConstant.USER_REDIS_BINDING))public void deleteUserInfoRedisByExchange(String message) {log.info("监听到消息message:{}", message);}
rabbitTemplate.convertAndSend(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, message);

        这种方式方便快捷,在以后修改时可通过修改常量类即可。下面介绍一种基于编程式声明监听者并通过配置文件(yml)进行修改的方式避免硬编码。

配置yml文件(或properties文件),自定义交换机、队列:

hl:amqp:# RabbitMQ交换机名称exchanges:userExchange:name: css.user.exchangetype: directdurable: true# 队列名称queues: userQueue:name: css.user.redis.loginUser.queuedurable: trueexclusive: falseautoDelete: false# 绑定关系    bindings:userRedisBinding:exchange: ${hl.amqp.exchanges.userExchange.name}queue: ${hl.amqp.queues.userQueue.name}routingKey: user.redis.loginUser.del

创建properties读取yml文件:

import java.util.Map;import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import lombok.Data;/***  RabbitMQ配置属性类*/
@Component
@ConfigurationProperties(prefix = "hl.amqp")
@Data
public class AmqpConfigProperties {private Map<String, ExchangeConfig> exchanges;private Map<String, QueueConfig> queues;private Map<String, BindingConfig> bindings;@Datapublic static class ExchangeConfig {private String name;private String type;private boolean durable = true;private boolean autoDelete = false;private boolean internal = false;}@Datapublic static class QueueConfig {private String name;private boolean durable = true;private boolean exclusive = false;private boolean autoDelete = false;}@Datapublic static class BindingConfig {private String exchange;private String queue;private String routingKey;}
}

创建动态加载配置类,进行交换机,队列和绑定关系的注册:

import com.hl.campusservicesys.properties.AmqpConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class DynamicAmqpConfig {private final AmqpAdmin amqpAdmin;private final AmqpConfigProperties amqpConfigProperties;// 改为构造器注入或方法注入public DynamicAmqpConfig(AmqpAdmin amqpAdmin, AmqpConfigProperties amqpConfigProperties) {this.amqpAdmin = amqpAdmin;this.amqpConfigProperties = amqpConfigProperties;}// 创建交换机@Beanpublic Map<String, Exchange> amqpExchanges() {Map<String, Exchange> exchanges = new HashMap<>();amqpConfigProperties.getExchanges().forEach((key, config) -> {Exchange exchange = switch (config.getType()) {case ExchangeTypes.DIRECT -> new DirectExchange(config.getName(), config.isDurable(), config.isAutoDelete());case ExchangeTypes.TOPIC -> new TopicExchange(config.getName(), config.isDurable(), config.isAutoDelete());case ExchangeTypes.FANOUT -> new FanoutExchange(config.getName(), config.isDurable(), config.isAutoDelete());default -> throw new IllegalArgumentException("Unsupported exchange type: " + config.getType());};exchanges.put(key, exchange);// 直接通过amqpAdmin声明交换机到RabbitMQamqpAdmin.declareExchange(exchange);});log.info("RabbitMQ交换机初始化完成!");return exchanges;}// 创建队列@Beanpublic Map<String, Queue> amqpQueues() {Map<String, Queue> queues = new HashMap<>();amqpConfigProperties.getQueues().forEach((key, config) -> {Queue queue = new Queue(config.getName(),config.isDurable(),config.isExclusive(),config.isAutoDelete());queues.put(key, queue);// 直接通过amqpAdmin声明队列到RabbitMQamqpAdmin.declareQueue(queue);});log.info("RabbitMQ队列初始化完成!");return queues;}// 创建绑定关系@Beanpublic Map<String, Binding> amqpBindings(Map<String, Exchange> amqpExchanges, Map<String, Queue> amqpQueues) {Map<String, Binding> bindings = new HashMap<>();amqpConfigProperties.getBindings().forEach((key, config) -> {// 查找对应的交换机和队列Exchange exchange = null;for (Exchange ex : amqpExchanges.values()) {if (ex.getName().equals(config.getExchange())) {exchange = ex;break;}}Queue queue = null;for (Queue q : amqpQueues.values()) {if (q.getName().equals(config.getQueue())) {queue = q;break;}}if (exchange != null && queue != null) {Binding binding = BindingBuilder.bind(queue).to(exchange).with(config.getRoutingKey()).noargs();bindings.put(key, binding);// 直接通过amqpAdmin声明绑定关系到RabbitMQamqpAdmin.declareBinding(binding);}});log.info("RabbitMQ绑定关系初始化完成!");return bindings;}@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置消息转换器@Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}// 配置 RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);return rabbitTemplate;}}

配置消息监听器:

/*** 用户服务监听器* */
@Slf4j
@Component
public class UserListener {@Resourceprivate RedisCache redisCache;@RabbitListener(queues = "${hl.amqp.queues.userQueue.name}")public void deleteUserInfoRedis(Long userId) {log.info("监听到消息message:{}", userId);redisCache.deleteObject(RedisConstant.USER_INFO_KEY + userId);}}

创建消息发送者:

/*** RabbitMQ工具类* */
@Component
public class AmqpMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate AmqpConfigProperties amqpConfigProperties;/*** 发送消息* @param exchange 交换机配置键名* @param routingKey 路由键* @param message 消息内容*/public void sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(amqpConfigProperties.getExchanges().get(exchange).getName(),amqpConfigProperties.getBindings().get(routingKey).getRoutingKey(), message);rabbitTemplate.convertAndSend(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, message);}}

创建常量类:

/*** MQ常量* */
public class MQConstant {public static final String USER_EXCHANGE = "userExchange";public static final String USER_QUEUE = "userQueue";public static final String USER_REDIS_BINDING = "userRedisBinding";
}

 使用:

// 发送消息更新缓存
amqpMessageSender.sendMessage(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, userDTO.getUserId());

示例:

启动时完成初始化:

发送消息,监听者接收并处理:

        在启动时,spring会自动加载注册配置的交换机、队列并完成绑定,对比下来肯定是没直接使用常量类方便来着,感兴趣可以玩玩。

http://www.xdnf.cn/news/1310347.html

相关文章:

  • Linux -- 文件【下】
  • 深度解析和鲸社区热门项目:电商双 11 美妆数据分析的细节与价值
  • 41 C++ STL模板库10-容器3-list
  • 正点原子【第四期】Linux之驱动开发篇学习笔记-1.1 Linux驱动开发与裸机开发的区别
  • docker-compose-mysql-定时备份数据库到其他服务器脚本
  • 【机器学习深度学习】OpenCompass:支持的开源评估数据集及使用差异
  • RemoteCtrl-初步的网络编程框架搭建
  • 安全审计-firewall防火墙
  • 算法训练营day52 图论③ 101.孤岛的总面积、102.沉没孤岛、103.水流问题、104.建造最大岛屿
  • 基于Uni-app+vue3实现微信小程序地图固定中心点范围内拖拽选择位置功能(分步骤详解)
  • MySQL 配置性能优化赛技术文章
  • 基于Python3.10.6与jieba库的中文分词模型接口在Windows Server 2022上的实现与部署教程
  • Flutter开发 网络请求
  • ESP32-S3_ES8311音频输出使用
  • 【嵌入式C语言】六
  • 【读论文】医疗AI大模型:百川开源Baichuan-M2
  • 第二十五天:构造函数/析构函数/拷贝构造
  • 开发一款多商户电商APP要多久?功能拆解与源码技术落地方案
  • 迭代器模式及优化
  • 模式匹配自动机全面理论分析
  • 【Web后端】Django、flask及其场景——以构建系统原型为例
  • AI 搜索时代:引领变革,重塑您的 SEO 战略
  • 基于uni-app+vue3实现的微信小程序地图范围限制与单点标记功能实现指南
  • Matplotlib直线绘制:从基础到三维空间的高级可视化
  • 数组名本质与指针运算揭秘
  • List容器:特性与操作使用指南
  • 零基础学习人工智能的完整路线规划
  • 民法学学习笔记(个人向) Part.5
  • 学习游戏制作记录(制作系统与物品掉落系统)8.16
  • MySQL查询性能慢时索引失效的排查与优化实践