当前位置: 首页 > news >正文

Spring-rabbit使用实战六

目录

优雅实现 Spring RabbitMQ 多交换机多队列绑定

一、核心设计思路

二、完整实现代码

1. 配置定义(枚举方式)

2. 声明式配置工厂

3. 生产者服务(通用发送方法)

4. 消费者抽象基类

5. 具体消费者实现

6. 配置中心扩展(YAML 配置)

三、设计优势与扩展点

1. 架构优势

2. 扩展点设计

四、最佳实践建议

1.命名规范

2.监控增强

3.单元测试方案

4.消费者弹性配置

五、架构演进方向

1.配置中心集成

2.消息路由服务

3.流量控制中间件


优雅实现 Spring RabbitMQ 多交换机多队列绑定

在需要创建多个交换机、每个交换机绑定多个队列的场景中,通过合理设计可以显著提高代码的复用性和扩展性。以下是针对 3 个交换机各绑定 3 个队列的优雅实现方案:

一、核心设计思路

  1. 配置驱动:使用枚举或配置类定义交换机、队列和路由键

  2. 工厂模式:创建交换机、队列和绑定的工厂方法

  3. 自动绑定:通过反射或配置自动创建绑定关系

  4. 模板方法:复用消费者处理逻辑

二、完整实现代码

1. 配置定义(枚举方式)

public enum RabbitConfigEnum {// 订单业务ORDER_EXCHANGE("order.direct.exchange", ExchangeTypes.DIRECT,Arrays.asList(new QueueConfig("order.create.queue", "order.create"),new QueueConfig("order.pay.queue", "order.pay"),new QueueConfig("order.cancel.queue", "order.cancel"))),// 库存业务INVENTORY_EXCHANGE("inventory.topic.exchange", ExchangeTypes.TOPIC,Arrays.asList(new QueueConfig("inventory.deduct.queue", "inventory.deduct"),new QueueConfig("inventory.restore.queue", "inventory.restore.*"),new QueueConfig("inventory.alert.queue", "inventory.alert.#"))),// 通知业务NOTIFICATION_EXCHANGE("notification.fanout.exchange", ExchangeTypes.FANOUT,Arrays.asList(new QueueConfig("email.notification.queue", ""), // Fanout 不需要路由键new QueueConfig("sms.notification.queue", ""),new QueueConfig("push.notification.queue", "")));private final String exchangeName;private final ExchangeType exchangeType;private final List<QueueConfig> queueConfigs;RabbitConfigEnum(String exchangeName, ExchangeType exchangeType, List<QueueConfig> queueConfigs) {this.exchangeName = exchangeName;this.exchangeType = exchangeType;this.queueConfigs = queueConfigs;}// 队列配置内部类@Getter@AllArgsConstructorpublic static class QueueConfig {private final String queueName;private final String routingKey;}// 交换机类型枚举public enum ExchangeType {DIRECT, TOPIC, FANOUT}// 获取所有配置public static List<RabbitConfigEnum> getAllConfigs() {return Arrays.asList(values());}
}

2. 声明式配置工厂

@Configuration
public class RabbitMQConfigFactory {private static final Logger logger = LoggerFactory.getLogger(RabbitMQConfigFactory.class);@Beanpublic Declarables createRabbitElements() {List<Declarable> declarables = new ArrayList<>();for (RabbitConfigEnum config : RabbitConfigEnum.getAllConfigs()) {// 创建交换机AbstractExchange exchange = createExchange(config.getExchangeName(), config.getExchangeType());declarables.add(exchange);logger.info("Created exchange: {}", config.getExchangeName());// 为交换机创建队列和绑定for (RabbitConfigEnum.QueueConfig queueConfig : config.getQueueConfigs()) {Queue queue = new Queue(queueConfig.getQueueName(), true);declarables.add(queue);Binding binding = createBinding(exchange, queue, queueConfig.getRoutingKey(), config.getExchangeType());declarables.add(binding);logger.info("Created queue '{}' bound to '{}' with routing key '{}'",queueConfig.getQueueName(),config.getExchangeName(),queueConfig.getRoutingKey());}}return new Declarables(declarables);}private AbstractExchange createExchange(String name, RabbitConfigEnum.ExchangeType type) {return switch (type) {case DIRECT -> new DirectExchange(name, true, false);case TOPIC -> new TopicExchange(name, true, false);case FANOUT -> new FanoutExchange(name, true, false);};}private Binding createBinding(AbstractExchange exchange, Queue queue, String routingKey, RabbitConfigEnum.ExchangeType type) {return switch (type) {case DIRECT, TOPIC -> BindingBuilder.bind(queue).to(exchange).with(routingKey);case FANOUT -> BindingBuilder.bind(queue).to((FanoutExchange) exchange);};}// 统一消息转换器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}

3. 生产者服务(通用发送方法)

@Service
public class RabbitMQProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 通用消息发送方法* * @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容*/public void sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message, m -> {MessageProperties props = m.getMessageProperties();props.setContentType(MessageProperties.CONTENT_TYPE_JSON);props.setTimestamp(new Date());return m;});logger.debug("Sent message to {}[{}]: {}", exchange, routingKey, message);}/*** 按业务发送消息(推荐)* * @param config 业务配置* @param routingKey 路由键* @param message 消息内容*/public void sendByConfig(RabbitConfigEnum config, String routingKey, Object message) {sendMessage(config.getExchangeName(), routingKey, message);}
}

4. 消费者抽象基类

public abstract class AbstractRabbitConsumer<T> {private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitConsumer.class);/*** 通用消息处理模板* * @param message 消息内容* @param channel RabbitMQ通道* @param tag 消息标签*/@RabbitHandlerpublic void handleMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {// 1. 业务处理processMessage(message);// 2. 手动ACK确认channel.basicAck(tag, false);logger.debug("Message processed: {}", message);} catch (BusinessException e) {// 业务异常处理handleBusinessException(e, message, channel, tag);} catch (Exception e) {// 系统异常处理handleSystemException(e, message, channel, tag);}}/*** 业务处理抽象方法(子类实现)*/protected abstract void processMessage(T message) throws BusinessException;/*** 业务异常处理(可重写)*/protected void handleBusinessException(BusinessException e, T message, Channel channel, long tag) throws IOException {logger.error("Business error processing message: {}", message, e);// 拒绝消息但不重试channel.basicReject(tag, false);}/*** 系统异常处理(可重写)*/protected void handleSystemException(Exception e, T message, Channel channel, long tag) throws IOException {logger.error("System error processing message: {}", message, e);// 拒绝消息并重新入队channel.basicReject(tag, true);}
}

5. 具体消费者实现

// 订单创建消费者
@Component
@RabbitListener(queues = "order.create.queue")
public class OrderCreateConsumer extends AbstractRabbitConsumer<Order> {@Autowiredprivate InventoryService inventoryService;@Overrideprotected void processMessage(Order order) throws BusinessException {// 减库存inventoryService.deductStock(order.getProductId(), order.getQuantity());// 记录订单orderService.saveOrder(order);// 发送创建事件eventPublisher.publishOrderCreated(order);}// 重写异常处理@Overrideprotected void handleBusinessException(BusinessException e, Order order, Channel channel, long tag) throws IOException {if (e instanceof InventoryShortageException) {// 库存不足特殊处理orderService.markAsPending(order);channel.basicAck(tag, false);} else {super.handleBusinessException(e, order, channel, tag);}}
}// 库存告警消费者
@Component
@RabbitListener(queues = "inventory.alert.queue")
public class InventoryAlertConsumer extends AbstractRabbitConsumer<InventoryAlert> {@Overrideprotected void processMessage(InventoryAlert alert) {// 发送告警通知notificationService.sendAlert(alert.getProductId(), alert.getCurrentLevel());// 记录告警日志alertService.logAlert(alert);}
}

6. 配置中心扩展(YAML 配置)

# application.yml
spring:rabbitmq:host: rabbitmq-prod.example.comport: 5672username: ${RABBIT_USER}password: ${RABBIT_PASS}virtual-host: /prodlistener:simple:acknowledge-mode: manualconcurrency: 3max-concurrency: 10prefetch: 20# 自定义交换机配置(可选扩展)
rabbit:exchanges:- name: order.direct.exchangetype: DIRECTqueues:- name: order.create.queuerouting-key: order.create- name: order.pay.queuerouting-key: order.pay- name: order.cancel.queuerouting-key: order.cancel- name: inventory.topic.exchangetype: TOPICqueues:- name: inventory.deduct.queuerouting-key: inventory.deduct- name: inventory.restore.queuerouting-key: inventory.restore.*- name: inventory.alert.queuerouting-key: inventory.alert.#

三、设计优势与扩展点

1. 架构优势

设计特点优势应用场景
配置枚举化集中管理所有配置,避免硬编码多环境部署
工厂模式统一创建逻辑,减少重复代码新增交换机/队列
抽象消费者统一异常处理和ACK机制所有消费者
通用生产者简化消息发送接口所有业务场景

2. 扩展点设计

扩展点 1:动态添加新交换机

// 添加新业务配置
RabbitConfigEnum.NEW_EXCHANGE = new RabbitConfigEnum("new.exchange",ExchangeTypes.DIRECT,Arrays.asList(new QueueConfig("new.queue1", "key1"),new QueueConfig("new.queue2", "key2"))
);

扩展点 2:自定义绑定逻辑

// 重写绑定工厂方法
private Binding createCustomBinding(AbstractExchange exchange, Queue queue, String routingKey, ExchangeType type) {if ("special.binding".equals(routingKey)) {return BindingBuilder.bind(queue).to(exchange).with(routingKey).and(createCustomArguments()); // 自定义参数}return createBinding(exchange, queue, routingKey, type);
}

扩展点 3:基于配置文件的动态配置

@Configuration
@ConfigurationProperties(prefix = "rabbit")
public class DynamicRabbitConfig {private List<ExchangeConfig> exchanges;@Beanpublic Declarables dynamicDeclarables() {// 类似工厂方法实现,从配置文件读取}@Getter @Setterpublic static class ExchangeConfig {private String name;private String type;private List<QueueBinding> queues;}@Getter @Setterpublic static class QueueBinding {private String name;private String routingKey;}
}

四、最佳实践建议

1.命名规范

// 业务.类型.功能
String exchangeName = "order.direct.exchange";
String queueName = "inventory.topic.alert.queue";
String routingKey = "order.payment.completed";

2.监控增强

// 在生产者中添加监控埋点
public void sendMessage(String exchange, String routingKey, Object message) {Timer.Sample sample = Timer.start(metricsRegistry);// ...发送逻辑sample.stop(metricsRegistry.timer("rabbit.produce.time", "exchange", exchange, "routingKey", routingKey));
}

3.单元测试方案

@SpringBootTest
public class RabbitConfigTest {@Autowiredprivate RabbitAdmin rabbitAdmin;@Testpublic void testExchangeAndQueueCreation() {// 验证所有交换机已创建for (RabbitConfigEnum config : RabbitConfigEnum.values()) {Exchange exchange = new DirectExchange(config.getExchangeName());assertTrue(rabbitAdmin.getExchangeInfo(exchange.getName()) != null);// 验证队列绑定for (QueueConfig qc : config.getQueueConfigs()) {Queue queue = new Queue(qc.getQueueName());assertTrue(rabbitAdmin.getQueueInfo(queue.getName()) != null);}}}
}

4.消费者弹性配置

# 针对不同队列配置不同消费者参数
spring:rabbitmq:listener:order:concurrency: 5max-concurrency: 20notification:concurrency: 2max-concurrency: 5

五、架构演进方向

1.配置中心集成

2.消息路由服务

@Service
public class MessageRouter {private Map<MessageType, RabbitConfigEnum> routingMap;public void routeMessage(MessageType type, Object message) {RabbitConfigEnum config = routingMap.get(type);producer.sendByConfig(config, config.getDefaultKey(), message);}
}

3.流量控制中间件

@Around("@annotation(rabbitListener)")
public Object rateLimit(ProceedingJoinPoint joinPoint) {if (!rateLimiter.tryAcquire()) {// 返回特殊响应,触发消费者暂停return new RateLimitExceededResponse();}return joinPoint.proceed();
}

这种设计通过配置驱动、工厂模式和模板方法,实现了高可复用的 RabbitMQ 集成方案,能够轻松应对业务扩展需求,同时保持代码的简洁性和可维护性。

http://www.xdnf.cn/news/1246411.html

相关文章:

  • Could not load the Qt platform plugin “xcb“ in “无法调试与显示Opencv
  • 类内部方法调用,自注入避免AOP失效
  • RK3568 Linux驱动学习——字符设备驱动开发
  • 森赛睿科技成为机器视觉产业联盟会员单位
  • C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(六)
  • Vue.js 教程
  • css3属性总结和浏览器私有属性
  • Matplotlib(六)- 坐标轴定制
  • 【视觉识别】Ubuntu 22.04 上安装和配置 TigerVNC 鲁班猫V5
  • 技术与情感交织的一生 (十一)
  • 漏洞分析:90分钟安全革命
  • 原型模式在C++中的实现与面向对象设计原则
  • vue3 计算属性
  • 前端实现Excel文件的在线预览效果
  • 10-红黑树
  • LINUX 85 SHElL if else 前瞻 实例
  • Goby 漏洞安全通告| NestJS DevTools /inspector/graph/interact 命令执行漏洞(CVE-2025-54782)
  • 国内办公安全平台新标杆:iOA一体化办公安全解决方案
  • 机械学习--决策树(实战案例)
  • Linux和mysql练习题2
  • Electron-updater + Electron-builder + IIS + NSIS + Blockmap 完整增量更新方案
  • HTML 媒体元素概述
  • LeetCode 71~90题解
  • MongoDB 从3.4.0升级到4.0.0完整指南实战-优雅草蜻蜓I即时通讯水银版成功升级-卓伊凡|bigniu
  • Redis内存耗尽时的应对策略
  • # 【Java + EasyExcel 实战】动态列 + 公式备注 Excel 模板导出全流程(附完整代码)
  • 分布式文件系统06-分布式中间件弹性扩容与rebalance冲平衡
  • PromptPilot搭配Doubao-seed-1.6:定制你需要的AI提示prompt
  • 行为模式-模板方法模式
  • 脚手架开发-准备配置-配置文件的准备项目的一些中间件