当前位置: 首页 > news >正文

Spring Boot 与 RabbitMQ 的深度集成实践(二)

集成步骤详解

配置 RabbitMQ 连接信息

在 Spring Boot 项目中,通常在application.properties或application.yml文件中配置 RabbitMQ 的连接信息。以application.yml为例,配置如下:

 

spring:

rabbitmq:

host: localhost

port: 5672

username: guest

password: guest

virtual - host: /

上述配置中:

  • host指定了 RabbitMQ 服务器的地址,这里假设 RabbitMQ 运行在本地。
  • port指定了 RabbitMQ 服务的端口,默认端口是 5672。
  • username和password是连接 RabbitMQ 服务器所需的用户名和密码,默认的用户名和密码都是guest。不过在实际生产环境中,应使用更安全的用户名和密码组合。
  • virtual - host表示虚拟主机,它用于逻辑隔离不同的应用或业务模块之间的消息队列。这里使用根虚拟主机/,在实际应用中,可以根据业务需求创建不同的虚拟主机。例如,对于不同的业务线,可以分别使用/business1、/business2等虚拟主机,每个虚拟主机有独立的队列、交换机和权限控制 。

创建消息队列和交换机

通过配置类可以方便地创建消息队列、交换机以及它们之间的绑定关系。在 Spring Boot 中,使用@Configuration注解来定义配置类,并使用@Bean注解创建所需的 Bean。

下面分别展示direct、topic、fanout类型交换机的配置示例:

Direct Exchange(直连交换机)配置示例

 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class DirectRabbitConfig {

// 定义队列

@Bean

public Queue directQueue() {

return new Queue("direct.queue", true);

}

// 定义直连交换机

@Bean

public DirectExchange directExchange() {

return new DirectExchange("direct.exchange", true, false);

}

// 绑定队列和交换机,设置路由键

@Bean

public Binding directBinding() {

return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key");

}

}

在这个配置中:

  • directQueue方法创建了一个名为direct.queue的队列,true表示该队列是持久化的,即 RabbitMQ 服务器重启后队列依然存在。
  • directExchange方法创建了一个名为direct.exchange的直连交换机,同样设置为持久化。
  • directBinding方法通过BindingBuilder将队列和交换机进行绑定,并指定了路由键direct.routing.key。直连交换机的特点是根据路由键进行精确匹配,只有当消息的路由键与绑定的路由键完全一致时,消息才会被路由到对应的队列。

Topic Exchange(主题交换机)配置示例

 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class TopicRabbitConfig {

// 定义第一个队列

@Bean

public Queue topicQueue1() {

return new Queue("topic.queue1", true);

}

// 定义第二个队列

@Bean

public Queue topicQueue2() {

return new Queue("topic.queue2", true);

}

// 定义主题交换机

@Bean

public TopicExchange topicExchange() {

return new TopicExchange("topic.exchange", true, false);

}

// 绑定第一个队列和交换机,设置路由键模式为topic.#

@Bean

public Binding topicBinding1() {

return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");

}

// 绑定第二个队列和交换机,设置路由键模式为topic.man

@Bean

public Binding topicBinding2() {

return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.man");

}

}

这里:

  • topicQueue1和topicQueue2分别创建了两个持久化队列。
  • topicExchange创建了一个名为topic.exchange的主题交换机。
  • topicBinding1将topicQueue1与topicExchange绑定,并使用topic.#作为路由键模式,其中#是通配符,表示匹配零个或多个单词。这意味着只要消息的路由键以topic.开头,就会被路由到topicQueue1。
  • topicBinding2将topicQueue2与topicExchange绑定,路由键模式为topic.man,只有当消息的路由键为topic.man时,消息才会被路由到topicQueue2。主题交换机通过通配符模式实现了更灵活的消息路由,适用于需要根据不同规则进行消息分发的场景 。

Fanout Exchange(扇形交换机)配置示例

 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class FanoutRabbitConfig {

// 定义第一个队列

@Bean

public Queue fanoutQueue1() {

return new Queue("fanout.queue1", true);

}

// 定义第二个队列

@Bean

public Queue fanoutQueue2() {

return new Queue("fanout.queue2", true);

}

// 定义扇形交换机

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange("fanout.exchange", true, false);

}

// 绑定第一个队列和交换机

@Bean

public Binding fanoutBinding1() {

return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());

}

// 绑定第二个队列和交换机

@Bean

public Binding fanoutBinding2() {

return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());

}

}

在这个配置类中:

  • fanoutQueue1和fanoutQueue2创建了两个持久化队列。
  • fanoutExchange创建了一个名为fanout.exchange的扇形交换机。
  • fanoutBinding1和fanoutBinding2分别将两个队列与扇形交换机进行绑定。扇形交换机的特点是将消息广播到所有与之绑定的队列,而不考虑路由键,只要有队列绑定到该交换机,消息就会被发送到这些队列,适用于需要将消息同时发送给多个消费者的场景 。

编写消息生产者

创建一个消息生产者类,通过注入RabbitTemplate来发送消息。RabbitTemplate是 Spring AMQP 提供的用于发送消息的核心类,它封装了与 RabbitMQ 交互的细节,提供了便捷的消息发送方法。

以下是一个消息生产者类的示例,包含普通消息发送和带自定义属性消息发送的示例:

 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.util.HashMap;

import java.util.Map;

@Service

public class RabbitMQProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

// 发送普通消息

public void sendMessage(String exchange, String routingKey, String message) {

rabbitTemplate.convertAndSend(exchange, routingKey, message);

System.out.println("Sent message: " + message);

}

// 发送带自定义属性的消息

public void sendMessageWithProperties(String exchange, String routingKey, String message) {

Map<String, Object> headers = new HashMap<>();

headers.put("customHeader", "customValue");

rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor -> {

messagePostProcessor.getMessageProperties().setHeaders(headers);

return messagePostProcessor;

});

System.out.println("Sent message with properties: " + message);

}

}

在这个生产者类中:

  • sendMessage方法接收交换机名称、路由键和消息内容作为参数,通过rabbitTemplate.convertAndSend方法将消息发送到指定的交换机和队列。该方法会根据路由键将消息路由到对应的队列,如果路由键匹配成功,消息就会被存储在队列中等待消费者获取。
  • sendMessageWithProperties方法展示了如何发送带有自定义属性的消息。首先创建一个Map来存储自定义属性,这里添加了一个名为customHeader,值为customValue的属性。然后使用rabbitTemplate.convertAndSend的另一个重载方法,该方法接收一个MessagePostProcessor作为参数。在MessagePostProcessor的回调函数中,将自定义属性设置到消息的属性中,从而实现发送带有自定义属性的消息 。这种方式在实际应用中非常有用,例如可以通过自定义属性来传递一些额外的元数据,如消息的优先级、创建时间等,以便消费者在处理消息时根据这些属性进行不同的处理逻辑。

编写消息消费者

创建消息消费者类,使用@RabbitListener注解来监听指定的队列,并处理接收到的消息。@RabbitListener是 Spring AMQP 提供的注解,用于声明一个方法作为消息监听器,当队列中有新消息时,该方法会被自动调用。

以下是一个消息消费者类的示例,涵盖消息处理逻辑和异常处理:

 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class RabbitMQConsumer {

@RabbitListener(queues = "direct.queue")

public void receiveMessage(String message) {

try {

// 消息处理逻辑

System.out.println("Received message: " + message);

// 模拟业务处理

Thread.sleep(1000);

System.out.println("Message processed successfully.");

} catch (Exception e) {

// 异常处理

System.out.println("Error processing message: " + e.getMessage());

// 可以根据具体业务需求进行重试、记录日志或其他操作

}

}

}

在这个消费者类中:

  • @RabbitListener(queues = "direct.queue")注解表示该方法监听名为direct.queue的队列。当该队列中有消息时,receiveMessage方法会被触发。
  • 在receiveMessage方法中,首先打印接收到的消息,然后通过Thread.sleep(1000)模拟业务处理过程,这里让线程睡眠 1 秒来模拟一些耗时操作。如果在处理过程中发生异常,会捕获异常并打印错误信息。在实际应用中,可以根据业务需求进行更复杂的异常处理,例如将消息放入死信队列进行后续处理,或者进行消息重试。如果是因为网络波动等临时原因导致消息处理失败,可以设置重试机制,让消息重新被消费;如果是因为消息内容本身错误等不可恢复的原因,可以将消息放入死信队列,以便后续进行人工处理或分析 。通过合理的异常处理,可以提高系统的稳定性和可靠性,确保消息能够被正确处理。
http://www.xdnf.cn/news/508897.html

相关文章:

  • Web开发-JavaEE应用SpringBoot栈SnakeYaml反序列化链JARWAR构建打包
  • 5.18本日总结
  • LeetCode 35. 搜索插入位置:二分查找的边界条件深度解析
  • nginx概念及使用
  • 分别用 语言模型雏形N-Gram 和 文本表示BoW词袋 来实现文本情绪分类
  • 数据结构 -- 树形查找(三)红黑树
  • Flink 作业提交流程
  • 墨水屏显示模拟器程序解读
  • 《信息论与编码》课程笔记——信源编码(2)
  • vue3_flask实现mysql数据库对比功能
  • FreeSWITCH 简单图形化界面43 - 使用百度的unimrcp搞个智能话务台,用的在线的ASR和TTS
  • NAT(网络地址转换)逻辑图解+实验详解
  • 抖音视频怎么去掉抖音号水印
  • tomcat查看状态页及调优信息
  • 碎片笔记|PromptStealer复现要点(附Docker简单实用教程)
  • oracle 资源管理器的使用
  • C# String 格式说明符
  • python创建flask项目
  • 动态内存管理2+柔性数组
  • 5.18 day24
  • RabbitMq C++客户端的使用
  • QT聊天项目DAY11
  • 服务端HttpServletRequest、HttpServletResponse、HttpSession
  • 软件工具:批量图片区域识别+重命名文件的方法,发票识别和区域选择方法参考,基于阿里云实现
  • SuperYOLO:多模态遥感图像中的超分辨率辅助目标检测之论文阅读
  • 05 部署Nginx反向代理
  • Flink Table SQL
  • [SpringBoot]Spring MVC(4.0)
  • TASK03【Datawhale 组队学习】搭建向量知识库
  • 【图像处理基石】OpenCV中都有哪些图像增强的工具?