当前位置: 首页 > backend >正文

Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统

Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统

一、Spring AMQP 是什么?

Spring AMQP(Application Messaging Protocol)是 Spring 官方提供的对 AMQP 协议的封装,其核心模块有两个:

  • spring-amqp: 提供 AMQP 抽象封装
  • spring-rabbit: RabbitMQ 的具体实现

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

常见的场景包括:

  • 微服务之间的异步通信
  • 秒杀系统削峰
  • 用户注册发送邮件/短信通知
  • 分布式事务的最终一致性方案

二、Spring Boot 集成 RabbitMQ

2.1. 引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

它会自动引入 spring-rabbit 和 spring-amqp 模块。

2.2. 配置 RabbitMQ

spring:rabbitmq:host: 192.168.184.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

三、快速构建消息系统

  • 一个消息队列
  • 一个消息发送者
  • 一个消息监听者(消费者)

构建示例项目:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者
    在这里插入图片描述
    引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

3.1.消息发送

publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

3.2.消息接收

consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

四、WorkQueues模型

4.1. 介绍

Work Queues(工作队列)又叫 任务队列(Task Queues),主要用于将一个任务分发给多个消费者(工作线程)处理,每个任务只会被一个消费者处理。

核心思想是:生产者只管发送任务,多个消费者竞争获取任务并处理,达到并发消费、分担压力的目的
在这里插入图片描述

  • Producer(生产者):发送任务消息。
  • Queue(队列):缓存任务。
  • Consumer(消费者):从队列中获取任务并处理。

每个任务只会被一个消费者处理,多个消费者之间互不干扰。

4.2. 消息发送

publisher服务中的SpringAmqpTest类中添加一个测试方法实现循环发送:

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

4.3. 消息接收

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

多个消费者监听同一个队列,消息将被平均分配(默认轮询方式)。

4.4. 公平分发 vs 轮询分发

🔁 默认行为:轮询分发
RabbitMQ 默认采用 Round-Robin(轮询) 分发方式,消费者不论是否处理完当前消息,下一条消息仍然会发给它。

这可能导致:处理慢的消费者积压任务,处理快的消费者反而闲着

✅ 公平分发(prefetch)
设置每个消费者的最大未确认消息数,让 RabbitMQ 只向空闲的消费者发送消息。

spring:rabbitmq:listener:simple:prefetch: 1  # 每个消费者同一时间只能处理1条消息
http://www.xdnf.cn/news/17258.html

相关文章:

  • 【接口自动化测试】---requests模块
  • SpringBoot的profile加载
  • 可编辑51页PPT | 某鞋服品牌集团数字化转型项目建议书
  • 微服务如何保证系统高可用?
  • iOS 签名证书全流程详解,申请、管理与上架实战
  • 腾讯iOA:数据安全的港湾
  • 0_外设学习_ESP8266+云流转(no 0基础)
  • 最新的GPT5效果如何,我试了一下(附加GPT5大模型免费使用方法)
  • 力扣-189.轮转数组
  • 秋招笔记-8.8
  • 《Leetcode》-面试题-hot100-链表
  • django uwsgi启动报错failed to get the Python codec of the filesystem encoding
  • Android 系统的安全 和 三星安全的区别
  • C++信息学奥赛一本通-第一部分-基础一-第3章-第1节
  • RAG初步实战:从 PDF 到问答:我的第一个轻量级 RAG 系统(附详细项目代码内容与说明)
  • 2025年渗透测试面试题总结-07(题目+回答)
  • 系统网络端口安全扫描脚本及详解
  • Chrome与Firefox浏览器安全运维配置命令大全:从攻防到优化的专业实践
  • 分治-快排-215.数组中的第k个最大元素-力扣(LeetCode)
  • 【Linux】从零开始:RPM 打包全流程实战万字指南(含目录结构、spec 编写、分步调试)
  • Android 的CameraX的使用(配置,预览,拍照,图像分析,录视频)
  • 使用Python提取PDF大纲(书签)完整指南
  • 持中文的 TXT 合并 PDF 工具 —— GUI + ReportLab 实战
  • Emacs 折腾日记(二十九)—— 打造C++ IDE
  • 使用 Grunt 替换 XML 文件中的属性值
  • 亚马逊跨类目铺货广告运营:从粗放投放to智能提效的省力法则
  • iOS混淆工具有哪些?跨平台 App 混淆与保护的实用方案
  • 零基础深度学习规划路线:从数学公式到AI大模型的系统进阶指南
  • 基于linux环境在centos7上部署gitlab
  • Claude Code 实战场景解析:从代码生成到系统重构的典型应用案例