当前位置: 首页 > backend >正文

java实现RabbitMQ消息发送和接收功能(包含测试)

以下是一个完整的Java类,同时包含RabbitMQ消息发送和接收功能,使用纯Java实现(非Spring Boot),包含Maven依赖:

import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class RabbitMQManager {private static final Logger logger = LoggerFactory.getLogger(RabbitMQManager.class);private final ConnectionFactory factory;private Connection connection;private Channel sendChannel;private Channel receiveChannel;private volatile boolean running = true;// 配置参数private final String host;private final int port;private final String username;private final String password;public RabbitMQManager(String host, int port, String username, String password) {this.host = host;this.port = port;this.username = username;this.password = password;factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);}/*** 初始化RabbitMQ连接和通道*/public void initialize() throws IOException, TimeoutException {connection = factory.newConnection();logger.info("成功连接到RabbitMQ服务器: {}:{}", host, port);// 创建发送通道(用于feedback队列)sendChannel = connection.createChannel();// 声明发送队列(持久化)sendChannel.queueDeclare("feedback", true, false, false, null);logger.info("发送队列 'feedback' 已声明");// 创建接收通道(单独通道用于消费消息)receiveChannel = connection.createChannel();}/*** 发送消息到feedback队列* @param message 要发送的消息内容*/public void sendToFeedbackQueue(String message) throws IOException {if (sendChannel == null || !sendChannel.isOpen()) {throw new IllegalStateException("发送通道未初始化或已关闭");}sendChannel.basicPublish("", // 使用默认交换机"feedback", // 路由键为队列名MessageProperties.PERSISTENT_TEXT_PLAIN, // 设置消息持久化message.getBytes(StandardCharsets.UTF_8));logger.info("消息已发送到feedback队列: {}", message);}/*** 开始监听指定队列的消息* @param queueName 要监听的队列名称*/public void startListening(String queueName) throws IOException {if (receiveChannel == null || !receiveChannel.isOpen()) {throw new IllegalStateException("接收通道未初始化或已关闭");}// 声明队列(如果不存在则创建)receiveChannel.queueDeclare(queueName, true, false, false, null);logger.info("开始监听队列: {}", queueName);// 设置每次只接收一条消息(公平分发)receiveChannel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);logger.info("收到消息 [{}]: {}", delivery.getEnvelope().getDeliveryTag(), message);try {// 处理消息processMessage(message, queueName);// 手动确认消息receiveChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {logger.error("消息处理失败", e);// 处理失败时拒绝消息(不重新入队)receiveChannel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);}};// 取消消费者回调CancelCallback cancelCallback = consumerTag -> {logger.warn("消费者被取消: {}", consumerTag);};// 开始消费消息(关闭自动确认)receiveChannel.basicConsume(queueName, false, deliverCallback, cancelCallback);logger.info("监听器已启动,等待消息... (按CTRL+C停止)");// 保持程序运行try {while (running) {Thread.sleep(1000); // 防止CPU空转}} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.info("监听线程被中断");}}/*** 消息处理逻辑(可根据需要自定义)*/private void processMessage(String message, String queueName) {logger.info("处理来自队列 [{}] 的消息: {}", queueName, message);// 示例:如果是feedback队列的消息,可以发送响应if ("feedback".equals(queueName)) {logger.info("处理feedback消息: {}", message);} else {// 其他队列的处理逻辑logger.info("处理来自 {} 队列的消息", queueName);}// 这里可以添加业务逻辑,例如:// - 解析JSON消息// - 调用其他服务// - 发送响应到另一个队列}/*** 停止监听并关闭资源*/public void shutdown() {running = false;logger.info("停止监听...");closeResources();}private void closeResources() {try {if (sendChannel != null && sendChannel.isOpen()) {sendChannel.close();}if (receiveChannel != null && receiveChannel.isOpen()) {receiveChannel.close();}if (connection != null && connection.isOpen()) {connection.close();}logger.info("RabbitMQ连接已关闭");} catch (IOException | TimeoutException e) {logger.error("关闭资源时出错", e);}}public static void main(String[] args) {// 配置RabbitMQ连接参数String host = "localhost";int port = 5672;String username = "guest";String password = "guest";String listenQueue = "my_queue"; // 监听的队列名称RabbitMQManager manager = new RabbitMQManager(host, port, username, password);try {// 初始化连接manager.initialize();// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(manager::shutdown));// 启动监听线程new Thread(() -> {try {manager.startListening(listenQueue);} catch (IOException e) {logger.error("监听失败", e);}}).start();// 主线程发送测试消息到feedback队列Thread.sleep(2000); // 等待监听器启动for (int i = 1; i <= 5; i++) {String message = "测试消息 #" + i;manager.sendToFeedbackQueue(message);Thread.sleep(1000);}// 保持程序运行while (manager.running) {Thread.sleep(1000);}} catch (IOException | TimeoutException | InterruptedException e) {logger.error("RabbitMQ操作失败", e);manager.shutdown();}}
}

Maven 依赖 (pom.xml)

<dependencies><!-- RabbitMQ Java Client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><!-- SLF4J 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.7</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency>
</dependencies>

关键设计说明:

  1. 双通道设计

    • sendChannel:专门用于发送消息到feedback队列
    • receiveChannel:专门用于从其他队列接收消息
    • 分离通道避免发送和接收操作相互阻塞
  2. 消息发送功能

    public void sendToFeedbackQueue(String message) throws IOException {sendChannel.basicPublish("", // 默认交换机"feedback", MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化消息message.getBytes(StandardCharsets.UTF_8));
    }
    
  3. 消息接收功能

    • 支持监听任意指定队列
    • 手动消息确认机制
    • 公平分发设置(basicQos(1))
  4. 资源管理

    • 统一的initialize()方法初始化连接
    • 优雅的shutdown()方法关闭资源
    • 使用Runtime shutdown hook确保程序退出时清理资源
  5. 线程模型

    • 主线程:发送消息到feedback队列
    • 单独线程:持续监听消息队列
    • 使用volatile running标志控制线程退出

使用说明:

  1. 运行程序

    mvn compile exec:java -Dexec.mainClass="RabbitMQManager"
    
  2. 测试流程

    • 程序启动后会自动初始化RabbitMQ连接
    • 启动监听线程监听my_queue队列
    • 主线程发送5条测试消息到feedback队列
    • 按CTRL+C停止程序
  3. 自定义配置

    • 修改main方法中的连接参数
    • 更改监听的队列名称
    • 在processMessage()方法中添加业务逻辑

扩展建议:

  1. 添加消息序列化

    // 发送对象
    public void sendObjectToFeedback(Object obj) throws IOException {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(obj);oos.flush();byte[] bytes = bos.toByteArray();sendChannel.basicPublish("", "feedback", MessageProperties.PERSISTENT_BASIC, bytes);
    }
    
  2. 添加JSON支持

    <!-- 添加Jackson依赖 -->
    <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version>
    </dependency>
    
    // 发送JSON
    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(myObject);
    sendToFeedbackQueue(json);// 接收JSON
    MyObject obj = mapper.readValue(message, MyObject.class);
    
  3. 添加重连机制

    public void ensureConnected() throws IOException, TimeoutException {if (connection == null || !connection.isOpen()) {initialize();}
    }
    

这个实现提供了生产级别的RabbitMQ操作,包含:

  • 连接管理
  • 通道分离
  • 消息持久化
  • 手动确认
  • 错误处理
  • 优雅关闭
  • 完整的日志记录

您可以根据实际需求调整队列名称、消息处理逻辑和错误处理策略。

http://www.xdnf.cn/news/13511.html

相关文章:

  • 日语学习-日语知识点小记-进阶-JLPT-真题训练-N2阶段(1):单词部分练习
  • Unity3D SRP Batcher原理分析
  • DEM 地形分析与水文建模:基于 ArcGIS 的流域特征提取
  • Android 10.0 勿扰模式开启关闭功能实现
  • DevOps软件开发流程规范
  • 抖音授权登录-获取用户授权调用凭证
  • MySQL进阶之索引(1)索引结构分类语法和SQL性能分析
  • Guava常用工具类使用教程
  • 使用OpenCV和Python进行图像掩膜与直方图分析
  • Java基于局域网的聊天室系统设计与实现,附源码+论文
  • ACS的ExtendedSegmentArc1 方法说明
  • 代理模式:AOP 切面编程的底层实现基础
  • Hello Robot发布Stretch3机器人高保真模拟平台-Stretch MuJoCo v0.5-涵盖数百种Robocasa厨房应用测试场景
  • 零基础设计模式——行为型模式 - 中介者模式
  • 使用Jmeter做功能测试有哪些优点?
  • C++ 中的 iostream 库:cin/cout 基本用法
  • Python训练第五十天
  • milvus 总结
  • Uniapp实现多选下拉框
  • 微信小程序Echarts开发问题
  • Vue 数据代理机制对属性名的要求
  • 如何正确的用Trae 打开 Unity 3D 项目
  • 计算机视觉与深度学习 | 基于Matlab的低照度图像增强算法:全面总结与实现
  • 问题八、Articulation中的actuator(执行器)
  • PostgresSQL日常维护
  • Jenkins + Docker + Kubernetes(JKD)自动化部署全链路实践
  • Axure应用交互设计:文本输入计数、显示输入内容、AI对话
  • 适配器模式深度解析:Java设计模式实战指南与接口兼容性解决方案
  • SpringMVC(1)
  • 安全生产管理是什么?安全生产管理主要管什么?