当前位置: 首页 > web >正文

消息队列RabbitMQ与AMQP协议详解

消息队列RabbitMQ与AMQP协议详解

什么是RabbitMQ

RabbitMQ是一个开源的消息队列中间件,基于AMQP(Advanced Message Queuing Protocol)协议实现。它作为一个消息代理(Message Broker),可以接收、存储和转发消息数据,用于解耦系统组件、实现异步通信、流量削峰等场景。

核心概念

消息队列基础

消息队列是一种异步通信模式,允许应用程序通过发送和接收消息进行通信:

  • Producer:消息生产者,负责发送消息到队列
  • Consumer:消息消费者,从队列接收消息并处理
  • Queue:消息队列,存储消息的缓冲区
  • Message:消息内容,包含有效载荷和属性

AMQP协议详解

什么是AMQP

AMQP(Advanced Message Queuing Protocol)是一个开放标准的应用层协议,专为面向消息的中间件设计。

AMQP核心特性

  1. 可互操作性:不同厂商实现的AMQP客户端和服务器可以互相通信
  2. 统一模型:定义了一套完整的消息传递模型
  3. 安全性:支持TLS加密和SASL认证
  4. 可靠性:通过事务或确认机制保证消息传递
  5. 二进制协议:更高效的数据传输

AMQP模型

AMQP 0-9-1模型(RabbitMQ采用的版本)包含以下核心组件:

  • Exchange:接收生产者消息并路由到队列
  • Queue:存储消息的队列
  • Binding:Exchange与Queue之间的链接规则
  • Virtual Host:虚拟隔离空间,包含独立的Exchange、Queue和Binding
  • Channel:连接内的虚拟连接,减少TCP连接开销

Exchange类型

RabbitMQ支持多种Exchange类型,决定消息如何路由:

  1. Direct Exchange:根据精确的routing key匹配
  2. Topic Exchange:根据模式匹配的routing key
  3. Fanout Exchange:广播到所有绑定的队列
  4. Headers Exchange:根据消息头属性匹配

RabbitMQ架构

核心组件

                    +----------------+
Producer ------→    |    Exchange    |    -----→ Queue -----→ Consumer+----------------+↓+----------------+|     Binding    |+----------------+

消息流程

  1. Producer发送消息到Exchange
  2. Exchange根据Binding规则将消息路由到相应的Queue
  3. Queue存储消息直到被Consumer消费
  4. Consumer接收并处理消息

消息确认机制

RabbitMQ提供了多种确认机制确保消息可靠传递:

  1. 生产者确认:Publisher Confirms和Publisher Returns
  2. 消费者确认:Consumer Acknowledgements
  3. 持久化:Exchange、Queue和Message的持久化

实战示例

连接RabbitMQ

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

简单消息发布

// 声明队列
channel.queueDeclare("simple-queue", true, false, false, null);// 发布消息
String message = "Hello RabbitMQ!";
channel.basicPublish("", "simple-queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

消息消费

// 声明队列
channel.queueDeclare("simple-queue", true, false, false, null);// 创建消费者
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};// 开始消费消息
channel.basicConsume("simple-queue", false, deliverCallback, consumerTag -> {});

使用Exchange和Binding

// 声明交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);// 创建临时队列
String queueName = channel.queueDeclare().getQueue();// 绑定队列到交换机
channel.queueBind(queueName, "logs", "");// 发布消息到交换机
String message = "Info: This is a log message";
channel.basicPublish("logs", "", null, message.getBytes());

RabbitMQ高级特性

消息优先级

Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
channel.queueDeclare("priority-queue", true, false, false, args);// 发送优先级消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().priority(8).build();
channel.basicPublish("", "priority-queue", properties, message.getBytes());

消息TTL(Time-To-Live)

// 设置队列消息TTL
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 60秒
channel.queueDeclare("ttl-queue", true, false, false, args);// 设置单个消息TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000") // 10秒.build();

死信队列

// 声明死信交换机
channel.exchangeDeclare("dlx", BuiltinExchangeType.DIRECT);// 声明正常队列,并指定死信设置
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx");
args.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare("normal-queue", true, false, false, args);

AMQP协议帧结构

AMQP是一个二进制协议,由帧(frames)组成:

  1. 帧头:包含帧类型、通道号和帧大小
  2. 帧体:包含实际的命令和数据
  3. 帧结尾:标记帧结束的字节

AMQP主要帧类型:

  • Method帧:包含AMQP命令
  • Header帧:包含消息属性和大小
  • Body帧:包含消息内容
  • Heartbeat帧:用于连接保活

总结

RabbitMQ作为实现AMQP协议的成熟消息中间件,提供了丰富的功能特性满足各种场景需求。AMQP协议的开放标准特性确保了系统的互操作性和可靠性。通过理解RabbitMQ和AMQP的核心概念,可以更好地设计和实现分布式系统中的消息通信模式。

在实际应用中,合理利用RabbitMQ的Exchange类型、队列特性和消息确认机制,可以构建高可用、高可靠的消息系统,实现系统解耦、负载均衡和流量削峰等目标。

http://www.xdnf.cn/news/8351.html

相关文章:

  • oracle数据库生成awr报告,排查数据库服务器CPU100%,系统卡顿,慢sql,根据sqlid查询关键信息,如会话SID,客户端机器名
  • 从零搭建SpringBoot Web单体项目3、SpringBoot 核心组件深度解析
  • leetcode hot100:十三、解题思路大全:多维动态规划(不同路径、最小路径和、最长回文子串、 最长公共子序列、编辑距离)
  • 微信小程序用<web-view 嵌入h5网页,改了h5网页后,可能是缓存的原因,小程序上看还是原来的,怎么处理
  • 【MySQL成神之路】MySQL索引相关介绍
  • 应届本科生简历制作指南
  • MySQL数据 在 磁盘上是什么样子的
  • DiagramJS设计原理解读(二)
  • CUDA 加速的基础线性代数库cuBLAS
  • Issac Lab安装
  • SPL做量化---MFI(资金流量指标)
  • 水陆两栖车,水域救援与陆地行动的桥梁
  • 掌握正则表达式:从基础语法到工程实践
  • Redis--SpringDataRedis详解
  • KCTF-CCG CrackMe crypto 1.0
  • TDengine 高可用——三副本
  • YOLOv5:调用官方权重进行检测
  • Socket套接字概述
  • MFC 中实现动态控件启用与命令执行
  • nRF Connect SDK开发之(1)运行一个Zephyr Project例程
  • Python Web开发基础
  • 并发编程之常用原子类
  • 免费直播预告 | 从标准解读到工具落地,《GB/T 45086与ISO11451标准解读》来了!
  • 前端学习笔记——Promis.All
  • ROS_Noetic的安装
  • 机器学习实战:犯罪率预测模型
  • 深入探讨Java循环:类型、性能与优化
  • CrackMe 002
  • VMIC PMV-5565PIORC-21000超高速光纤反射内存硬件参考
  • 08 接口自动化-用例管理框架pytest之fixtrue,conftest.py,allure报告以及logo定制