Mq队列的了解与深入
说明:
消息队列(MQ)是分布式系统中常用的中间件,用于解耦生产者和消费者,提高系统的可扩展性和可靠性。
各队列的介绍
MQ | 协议 | 吞吐量 | 延迟 | 可靠性 | 适用场景 |
---|---|---|---|---|---|
RabbitMQ | AMQP | 中 | 低 | 高(ACK/持久化) | 复杂路由、中小规模系统 |
Kafka | 自定义协议 | 高 | 中 | 极高(副本机制) | 大数据流处理、日志聚合 |
RocketMQ | 自定义协议 | 高 | 低 | 极高(金融级) | 高可靠交易系统 |
ActiveMQ | JMS/AMQP | 低 | 中 | 中 | 传统企业应用集成 |
Redis Stream | Redis 协议 | 中 | 极低 | 中 | 简单消息队列、实时通知 |
队列使用性能可能有
本地上使用rabbitMq
安装:下载对应的安装包
Installing on Windows | RabbitMQ
安装rabbitserver 前需要安装 otp 默认下载rabbitMQ最新的包,双击后,会自动让你下载对应的OTP包,也可以手动下载 官网地址如下:
OTP Versions Tree
先安装OTP 默认安装,在安装rabbitmq server
基础配置
通过CMD启动管理员的权限,启用管理插件
rabbitmq-plugins enable rabbitmq_management
创建管理员用户
rabbitmqctl add_user admin yourpassword
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
重启服务
net stop RabbitMQ && net start RabbitMQ
打开访问管理界面进行访问
http://localhost:15672
常用命令
# 查看状态
rabbitmqctl status# 列出用户
rabbitmqctl list_users# 列出虚拟主机
rabbitmqctl list_vhosts# 停止服务
net stop RabbitMQ# 启动服务
net start RabbitMQ#查看用户
rabbitmqctl list_users
配置上可能erlang.cookie 系统和用户的两个文件会不一致,所以需要将系统的配置覆盖过去。
C:\Windows\System32\config\systemprofile\.erlang.cookie 覆盖掉 C:\Users\admin\.erlang.cookie
消息队列的简单应用
生产者示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello_queue";public static void main(String[] args) throws Exception {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("guest"); // 默认用户名factory.setPassword("guest"); // 默认密码// 2. 建立连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明队列 (如果不存在则创建)channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4. 准备消息String message = "Hello RabbitMQ!";// 5. 发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
消费者实现
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "hello_queue";public static void main(String[] args) throws Exception {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1"); // 修改主机factory.setPort(5672); // 端口(默认 5672)factory.setVirtualHost("/"); // vhost(默认 /)factory.setUsername("admin"); // 默认用户名factory.setPassword("admin123"); // 默认密码// 2. 建立连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明队列 (必须与生产者一致) 否则会导致一方无法使用 需要删除队列才能使用channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. Press CTRL+C to exit.");// 4. 创建回调处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});// 保持程序运行直到手动终止Thread.currentThread().join();}}
}
开发时注意
统一参数
报错时删除已创建的队列,适合开发-测试环境
消费者或者生产者一方只负责使用
参数建议
-
检查双方代码:确认生产者和消费者的队列声明参数完全一致
-
查看现有队列:通过管理界面检查队列的实际参数
-
使用自动删除:开发阶段可考虑设置
autoDelete=true
让队列自动清除
消息队列消息确认机制
生产者确认消息推送
private static void base3() throws IOException, TimeoutException, InterruptedException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("admin123");// 2. 建立连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明队列 (与消费者一致)// 参数说明:队列名, 持久化, 排他, 自动删除, 额外参数channel.queueDeclare(QUEUE_NAME3, false, false, false, null);// 4. 准备消息String message = "Hello RabbitMQ!";// 5. 发送消息(使用事务或确认模式,二选一)// 方案A:使用事务模式(性能较低但更可靠)try {channel.txSelect(); // 开启事务channel.basicPublish("", QUEUE_NAME3, null, message.getBytes());channel.txCommit(); // 提交事务System.out.println(" [x] Sent '" + message + "'");} catch (Exception ex) {channel.txRollback(); // 回滚事务System.err.println(" [x] Message send failed: " + ex.getMessage());throw ex; // 可以选择重新抛出或处理}/*// 方案B:使用确认模式(性能更好)channel.confirmSelect(); // 开启确认模式channel.basicPublish("", QUEUE_NAME3, null, message.getBytes());// 方案B:使用确认模式(性能更好)channel.confirmSelect(); // 开启确认模式channel.basicPublish("", QUEUE_NAME3, null, message.getBytes());
// B1 同步确认if (channel.waitForConfirms(5000)) { // 等待5秒确认System.out.println(" [x] Sent '" + message + "'");} else {System.err.println(" [x] Message not confirmed");}//B2 异步确认
// channel.addConfirmListener(new ConfirmListener() {
// @Override
// public void handleAck(long l, boolean b) throws IOException {
// System.out.println("消息到达队列");
// }
//
// @Override
// public void handleNack(long l, boolean b) throws IOException {
// System.out.println("消息未到达队列");
//
// }
// });*/}
}
消费者确认消息消费
//自动确认 不推荐
boolean autoAck = true; // 自动确认
channel.basicConsume(queueName, autoAck, consumer);
/*** 长链接 连续消费** @throws IOException* @throws TimeoutException*/private static void consumer1() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1"); // 修改主机factory.setPort(5672); // 端口(默认 5672)factory.setVirtualHost("/"); // vhost(默认 /)factory.setUsername("admin"); // 默认用户名factory.setPassword("admin123"); // 默认密码// 2. 建立连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3. 声明队列 (必须与生产者一致)channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 4. 创建回调处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 手动确认消息 确认单条消息 当autoFlag = false 时有用
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// // 拒绝消息并重新入队 当autoFlag = false 时有用
// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
// // 拒绝单条消息,不重新入队 当autoFlag = false 时有用
// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
// // 拒绝单条消息(旧方法) 当autoFlag = false 时有用channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true); // true表示重新入队};boolean autoFlag = false;// 5. 监听队列 // 关闭自动确认 (false) channel.basicConsume(QUEUE_NAME, autoFlag , deliverCallback, consumerTag -> {});}
实践参考示例:
package com.zbq.mq;import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class RabbitMQAckExample {private final static String QUEUE_NAME = "test_queue";private final static String EXCHANGE_NAME = "test_exchange";private final static String ROUTING_KEY = "test_routing_key";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("admin123");// 启用自动恢复factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(5000); // 5秒重试间隔try (Connection connection = factory.newConnection()) {// 为生产者和消费者使用不同的通道Channel producerChannel = connection.createChannel();Channel consumerChannel = connection.createChannel();// 声明交换机和队列declareResources(producerChannel);// 生产者确认模式producerWithConfirm(producerChannel);// 消费者手动确认consumerWithManualAck(consumerChannel);// 保持主线程运行 否则消费者无法执行完成Thread.sleep(Long.MAX_VALUE);}}private static void declareResources(Channel channel) throws IOException {channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);}private static void producerWithConfirm(Channel channel) throws Exception {// 开启确认模式channel.confirmSelect();// 添加确认监听器channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) {System.out.println("[Producer] Message confirmed, deliveryTag: " + deliveryTag);}@Overridepublic void handleNack(long deliveryTag, boolean multiple) {System.err.println("[Producer] Message nack-ed, deliveryTag: " + deliveryTag);// 这里可以添加消息重发逻辑}});// 发布消息for (int i = 0; i < 5; i++) {String message = "Message " + i;try {channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());System.out.println("[Producer] Sent: " + message);} catch (AlreadyClosedException e) {System.err.println("[Producer] Channel closed while sending: " + message);// 处理通道关闭情况}}}private static void consumerWithManualAck(Channel channel) throws IOException {// 设置每次只预取一条消息channel.basicQos(1);// 关闭自动确认boolean autoAck = false;DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("[Consumer] Received: " + message);try {// 模拟业务处理Thread.sleep(1000);// 处理成功,确认消息if (channel.isOpen()) {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println("[Consumer] Processed: " + message);}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} catch (Exception e) {System.err.println("[Consumer] Error processing message: " + e.getMessage());// 处理失败,拒绝消息并重新入队if (channel.isOpen()) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);System.out.println("[Consumer] Requeued: " + message);}}};CancelCallback cancelCallback = consumerTag -> {System.out.println("[Consumer] Cancelled: " + consumerTag);};channel.basicConsume(QUEUE_NAME, autoAck, "myConsumer", deliverCallback, cancelCallback);}
}
Spring boot 结合rabbitMQ实例
pom文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置信息:
spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: admin123virtual-host: /# 生产者配置publisher-returns: true # 开启发布返回template:mandatory: true # 消息不可达时将消息返回# 消费者配置listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 每次预取1条消息
配置注入
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义队列@Beanpublic Queue demoQueue() {return new Queue("demo.queue", true); // 持久化队列}// 定义直连交换机@Beanpublic DirectExchange demoExchange() {return new DirectExchange("demo.exchange");}// 绑定队列到交换机@Beanpublic Binding bindingDemoQueue(Queue demoQueue, DirectExchange demoExchange) {return BindingBuilder.bind(demoQueue).to(demoExchange).with("demo.routing.key");}}
生产者代码示例
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate DirectExchange demoExchange;/*** 发送消息* @param message 消息内容*/public void sendMessage(String message) {// 设置确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功");} else {System.out.println("消息发送失败: " + cause);}});// 设置返回回调rabbitTemplate.setReturnCallback((message1, replyCode, replyText, exchange, routingKey) -> {System.out.println("消息无法路由: " + message1 + ", 返回码: " + replyCode + ", 原因: " + replyText);});// 发送消息rabbitTemplate.convertAndSend(demoExchange.getName(), "demo.routing.key",message, new CorrelationData(UUID.randomUUID().toString()));}
}
消费者代码示例
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class MessageConsumer {@RabbitListener(queues = "demo.queue")public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("收到消息: " + message);// 模拟业务处理processMessage(message);// 手动确认消息channel.basicAck(deliveryTag, false);} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());// 处理失败,拒绝消息并重新入队channel.basicNack(deliveryTag, false, true);}}private void processMessage(String message) {// 这里实现业务逻辑System.out.println("处理消息: " + message);}
}
其他处理方式 等待更新