3D打印机管理后台与RabbitMQ集成的业务场景
3D打印机管理后台与RabbitMQ集成的业务场景
文章目录
- 3D打印机管理后台与RabbitMQ集成的业务场景
- 问题
- 分析
- 解决方案:引入RabbitMQ进行异步解耦
- 实战演示
- 核心思路:
- 项目结构:
- 1. `pom.xml` (添加依赖)
- 2. `application.yml` (配置RabbitMQ连接和消费者)
- 3. `PrinterAlarmEvent.java` (数据传输对象)
- 4. `RabbitMQConfig.java` (RabbitMQ配置类)
- 5. `AlarmEventPublisher.java` (告警事件发布者)
- 6. `AlarmController.java` (告警接收API)
- 7.三个消费者
- 8. 主启动类
- Postman进行测试
- 查看Rabbitmq控制台
- 完整执行过程
- 1. **系统启动阶段**
- 2. **告警产生与接收阶段**
- 3.消息发布到mq阶段
- 4.**RabbitMQ路由阶段**
- 5. **异步消费者处理阶段**
问题
系统管理着成千上万台分布在全球各地的工业级3D打印机。这些打印机在长时间打印过程中可能会发生各种异常,例如:
- 喷头堵塞 (Extruder Jam)
- 热床温度异常 (Heated Bed Temperature Error)
- 材料耗尽 (Filament Runout)
- 设备离线 (Device Offline)
遇到的问题:客户投诉**「告警慢」**
系统有严重延迟!有时候打印机已经停摆报警了10分钟,你们的平台后台才弹出一条通知消息。这导致了我们的产线停工,损失巨大!
分析
根本原因分析(为什么不用数据库,而用RabbitMQ?)
如果系统设计是打印机 -> HTTP API -> 直接写数据库,那么在高并发场景下(50台设备同时上报),会遇到以下瓶颈:
- 数据库写入压力:50台设备几乎同时告警,意味着50个
INSERT
请求瞬间到达数据库,极易造成数据库连接池爆满、写入缓慢。 - 同步处理阻塞:在将告警信息写入数据库后,系统还需要进行一系列后续操作(如下文所述),这些操作如果是同步的,会严重拖慢整个API的响应时间,让设备等待,从而形成瓶颈。
解决方案:引入RabbitMQ进行异步解耦
架构如下图所示:
如上图所示,整个高效、解耦的处理流程是:
1.接收告警 (API):设备通过HTTP API上报告警。API服务只做两件事:验证请求、将告警信息作为一个消息体(JSON格式)立即发布(Producer) 到RabbitMQ的一个名为 alarm_events
的Exchange中。API随即返回202 Accepted
给设备,告知其“告警已收到”,处理非常迅速。
2.消息路由 (RabbitMQ):RabbitMQ的 alarm_events
Exchange会根据设定的路由键(routing key),将消息同时分发到三个不同的Queue中:
alarm_db_queue
(用于数据持久化)alarm_notify_queue
(用于发送通知)alarm_dashboard_queue
(用于前台大屏实时展示)
3.异步处理 (Consumers):后台运行着多个消费者(Consumer) 服务,各自独立地从上述Queue中获取消息并处理:
•消费者C1 (DB Writer):从 alarm_db_queue
取消息,异步写入数据库。即使数据库偶尔缓慢,也不会影响其他操作。
•消费者C2 (Notifier):从 alarm_notify_queue
取消息,调用短信或邮件服务发送告警通知。这是通知延迟的根源,但确保了通知最终必达。
•消费者C3 (Dashboard Pusher):从 alarm_dashboard_queue
取消息,通过WebSocket实时推送到运维控制台的大屏页面。这是速度最快的一环。
在这个真实场景中,RabbitMQ的作用是:
- 异步解耦:将耗时的操作(写库、通知)与核心API分离,快速响应设备。
- 削峰填谷:瞬间的50条告警被队列缓存起来,由消费者逐步消化,避免系统被冲垮。
- 提高可靠性:即使某个消费者服务(如发短信的)暂时崩溃,告警消息也会在队列中存活,待服务恢复后继续处理,确保消息不丢失。
- 弹性扩展:如果告警数量持续巨大,您可以简单地启动更多个消费者实例来并行处理队列中的消息。
实战演示
核心思路:
- 模拟打印机设备:创建一个简单的HTTP客户端,模拟向后端API发送告警。
- 告警接收API:创建一个Spring Boot控制器,接收告警,快速验证并将告警事件发布到RabbitMQ。
- RabbitMQ配置:定义交换机(
alarm_events
)、队列(alarm_db_writer
,alarm_notifier
,alarm_dashboard
)以及它们之间的绑定关系。 - 异步消费者
- 模拟数据库写入消费者:监听
alarm_db_writer
队列,模拟耗时的数据库操作。 - 模拟通知发送消费者:监听
alarm_notifier
队列,模拟发送邮件/SMS。 - 模拟大屏推送消费者:监听
alarm_dashboard
队列,模拟通过WebSocket推送(这里简化为打印日志)。
- 模拟数据库写入消费者:监听
项目结构:
printer-alarm-system/
├── src/main/java/com/example/printer/
│ ├── PrinterAlarmSystemApplication.java (主启动类)
│ ├── config/
│ │ └── RabbitMQConfig.java (RabbitMQ配置)
│ ├── controller/
│ │ └── AlarmController.java (告警接收API)
│ ├── dto/
│ │ └── PrinterAlarmEvent.java (告警事件数据传输对象)
│ ├── producer/
│ │ └── AlarmEventPublisher.java (告警事件发布者)
│ └── consumer/
│ ├── DbWriterConsumer.java (数据库写入消费者)
│ ├── NotifierConsumer.java (通知发送消费者)
│ └── DashboardConsumer.java (大屏推送消费者)
├── src/main/resources/
│ └── application.yml (Spring Boot配置)
└── pom.xml (Maven依赖)
1. pom.xml
(添加依赖)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.grant.code</groupId><artifactId>printer-alarm-system</artifactId><version>0.0.1-SNAPSHOT</version><name>printer-alarm-system</name><description>Demo project for Spring Boot + RabbitMQ with 3D Printer Alarms</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot AMQP Starter for RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Jackson for JSON processing --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><dependency><groupId>javax.validation</groupId><artifactId>validation-api</artifactId><version>2.0.1.Final</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2. application.yml
(配置RabbitMQ连接和消费者)
spring:rabbitmq:host: XXX # 根据你的RabbitMQ服务器地址修改port: XXX # 根据你的RabbitMQ服务器端口修改username: XXX # 根据你的RabbitMQ用户名修改password: XXX # 根据你的RabbitMQ密码修改virtual-host: /listener:simple:# 手动确认模式,确保消息处理成功后再确认acknowledge-mode: manual# 预取消息数量,控制消费者负载prefetch: 10# 重试配置retry:enabled: truemax-attempts: 3 # 最大重试次数initial-interval: 2000ms # 初始重试间隔# 发布者确认publisher-confirm-type: correlatedpublisher-returns: true# 自定义配置
app:rabbitmq:exchange: alarm_eventsqueue:db: alarm_db_writernotify: alarm_notifierdashboard: alarm_dashboardrouting-key: alarm.eventlogging:level:com.example.printer: DEBUG # 启用DEBUG日志查看详细流程server:port: 8080 # API服务端口
3. PrinterAlarmEvent.java
(数据传输对象)
package com.grant.code.printeralarmsystem.dto;import com.fasterxml.jackson.annotation.JsonFormat;import java.time.LocalDateTime;
import java.util.Objects;public class PrinterAlarmEvent {private String printerId; // 打印机IDprivate String alarmType; // 指定告警类型,例如:喷头堵塞 (Extruder Jam)、材料耗尽 (Filament Runout)private String description; // 具体描述private String severity; // 这个指示告警的严重程度,例如:"Low", "Medium", "High"@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// Constructorspublic PrinterAlarmEvent() {}public PrinterAlarmEvent(String printerId, String alarmType, String description, String severity) {this.printerId = printerId;this.alarmType = alarmType;this.description = description;this.severity = severity;this.timestamp = LocalDateTime.now();}// Getters and Setterspublic String getPrinterId() { return printerId; }public void setPrinterId(String printerId) { this.printerId = printerId; }public String getAlarmType() { return alarmType; }public void setAlarmType(String alarmType) { this.alarmType = alarmType; }public String getDescription() { return description; }public void setDescription(String description) { this.description = description; }public String getSeverity() { return severity; }public void setSeverity(String severity) { this.severity = severity; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "PrinterAlarmEvent{" +"printerId='" + printerId + '\'' +", alarmType='" + alarmType + '\'' +", description='" + description + '\'' +", severity='" + severity + '\'' +", timestamp=" + timestamp +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;PrinterAlarmEvent that = (PrinterAlarmEvent) o;// 只比较 printerId 和 alarmType,即如果printerId和alarmType相同,则认为两个事件相同return Objects.equals(printerId, that.printerId) && Objects.equals(alarmType, that.alarmType);}@Overridepublic int hashCode() {return Objects.hash(printerId, alarmType);}
}
4. RabbitMQConfig.java
(RabbitMQ配置类)
package com.grant.code.printeralarmsystem.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ配置类*/
@Configuration
public class RabbitMQConfig {@Value("${app.rabbitmq.exchange}")private String exchangeName; // 交换机名称@Value("${app.rabbitmq.queue.db}")private String dbQueueName; // 数据库写入队列名称@Value("${app.rabbitmq.queue.notify}")private String notifyQueueName; // 通知队列名称@Value("${app.rabbitmq.queue.dashboard}")private String dashboardQueueName; // 大屏显示队列名称@Value("${app.rabbitmq.routing-key}")private String routingKey; // 路由键,用于匹配队列// 创建交换机@Beanpublic DirectExchange alarmEventExchange() {return new DirectExchange(exchangeName, true, false); // durable=true表示持久化, autoDelete=false表示不自动删除}// 创建队列@Beanpublic Queue alarmDbWriterQueue() {return QueueBuilder.durable(dbQueueName).build(); // durable=true表示持久化,这里创建的是数据库写入队列}@Beanpublic Queue alarmNotifierQueue() {return QueueBuilder.durable(notifyQueueName).build(); // durable=true表示持久化,这里创建的是通知队列}@Beanpublic Queue alarmDashboardQueue() {return QueueBuilder.durable(dashboardQueueName).build(); // durable=true表示持久化,这里创建的是大屏显示队列}// 绑定队列到交换机@Beanpublic Binding bindingDbWriter(Queue alarmDbWriterQueue, DirectExchange alarmEventExchange) {return BindingBuilder.bind(alarmDbWriterQueue).to(alarmEventExchange).with(routingKey);}@Beanpublic Binding bindingNotifier(Queue alarmNotifierQueue, DirectExchange alarmEventExchange) {return BindingBuilder.bind(alarmNotifierQueue).to(alarmEventExchange).with(routingKey);}@Beanpublic Binding bindingDashboard(Queue alarmDashboardQueue, DirectExchange alarmEventExchange) {return BindingBuilder.bind(alarmDashboardQueue).to(alarmEventExchange).with(routingKey);}// 配置消息转换器为JSON@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate使用JSON转换器和确认回调@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 创建RabbitTemplate,connectionFactory为连接工厂rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); // 设置消息转换器为JSON// 开启发布确认rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println(" [√] 消息发送成功: " + correlationData);} else {System.err.println(" [×] 消息发送失败: " + cause);}});// 开启发布返回 (针对无法路由的消息)rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(returned -> {System.err.println(" [!] 消息无法路由: " + returned);});return rabbitTemplate;}
}
5. AlarmEventPublisher.java
(告警事件发布者)
package com.grant.code.printeralarmsystem.publisher;import com.grant.code.printeralarmsystem.config.RabbitMQConfig;
import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;/*** 告警事件发布者*/
@Service
public class AlarmEventPublisher {private static final Logger logger = LoggerFactory.getLogger(AlarmEventPublisher.class);@Autowiredprivate RabbitTemplate rabbitTemplate;// 注入配置的交换机名称@org.springframework.beans.factory.annotation.Value("${app.rabbitmq.exchange}")private String exchangeName;// 注入配置的路由键@org.springframework.beans.factory.annotation.Value("${app.rabbitmq.routing-key}")private String routingKey;public void publishAlarmEvent(PrinterAlarmEvent event) {// 生成唯一ID用于确认CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info(" [x] 发布告警事件: {} (ID: {})", event, correlationData.getId());// 发布到交换机,使用配置的路由键rabbitTemplate.convertAndSend(exchangeName, routingKey, event, correlationData);}
}
6. AlarmController.java
(告警接收API)
package com.grant.code.printeralarmsystem.controller;import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import com.grant.code.printeralarmsystem.publisher.AlarmEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import javax.validation.Valid;/*** 告警接收API*/
@RestController
@RequestMapping("/api/alarms")
public class AlarmController {private static final Logger logger = LoggerFactory.getLogger(AlarmController.class);@Autowiredprivate AlarmEventPublisher alarmEventPublisher;/*** 接收打印机告警的API端点* @param alarmEvent 告警事件对象* @return ResponseEntity 202 Accepted 表示已接收*/@PostMapping("/receive")public ResponseEntity<String> receiveAlarm(@RequestBody @Valid PrinterAlarmEvent alarmEvent) {logger.info(" [x] 接收到告警: {}", alarmEvent);// 1. 快速验证(这里简化)if (alarmEvent.getPrinterId() == null || alarmEvent.getPrinterId().isEmpty()) {logger.warn(" [!] 无效的打印机ID");return ResponseEntity.badRequest().body("Invalid printer ID");}// 2. 将告警事件发布到RabbitMQ (异步操作)alarmEventPublisher.publishAlarmEvent(alarmEvent);// 3. 立即返回202 Accepted,表示请求已被接受处理logger.info(" [x] 告警已发布至队列,返回202 Accepted");return ResponseEntity.status(HttpStatus.ACCEPTED).body("Alarm received and queued for processing.");}
}
7.三个消费者
package com.grant.code.printeralarmsystem.consumer;import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;/*** 大屏消费者*/
@Service
public class DashboardConsumer {private static final Logger logger = LoggerFactory.getLogger(DashboardConsumer.class);/*** 监听 alarm_dashboard 队列* @param event 告警事件* @param channel AMQP Channel 用于手动确认* @param tag Delivery Tag 用于确认特定消息*/@RabbitListener(queues = "${app.rabbitmq.queue.dashboard}")public void handleDashboardPush(PrinterAlarmEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {logger.info(" [Dashboard Consumer] 开始处理大屏推送告警事件: {}", event);// 模拟非常快速的推送操作 (例如 0.1秒)Thread.sleep(100);// 这里应该是实际的WebSocket推送逻辑// webSocketService.sendToDashboard("/topic/alarms", event);logger.info(" [Dashboard Consumer] 已将告警推送到大屏: {}", event.getPrinterId());// 手动确认消息channel.basicAck(tag, false);logger.debug(" [Dashboard Consumer] 消息确认成功: {}", tag);} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error(" [Dashboard Consumer] 处理中断: {}", event, e);try {channel.basicNack(tag, false, true);} catch (IOException ioException) {logger.error(" [Dashboard Consumer] 无法NACK消息: {}", tag, ioException);}} catch (Exception e) {logger.error(" [Dashboard Consumer] 推送大屏时发生错误: {}", event, e);try {channel.basicNack(tag, false, false);} catch (IOException ioException) {logger.error(" [Dashboard Consumer] 无法NACK消息: {}", tag, ioException);}}}
}
package com.grant.code.printeralarmsystem.consumer;import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;
/** 告警事件写入数据库消费者*/
@Service
public class DbWriterConsumer {private static final Logger logger = LoggerFactory.getLogger(DbWriterConsumer.class);/*** 监听 alarm_db_writer 队列* @param event 告警事件* @param channel AMQP Channel 用于手动确认* @param tag Delivery Tag 用于确认特定消息* header注解:用于获取消息的 Delivery Tag,Delivery Tag是消息的唯一标识符,用于确认消息处理成功或失败*/@RabbitListener(queues = "${app.rabbitmq.queue.db}") // 监听 alarm_db_writer 队列public void handleDbWrite(PrinterAlarmEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {logger.info(" [DB Consumer] 开始处理告警事件: {}", event);// 模拟耗时的数据库写入操作 (例如 2秒)Thread.sleep(2000);// 这里应该是实际的数据库插入逻辑// repository.save(alarmRecord);logger.info(" [DB Consumer] 成功将告警写入数据库: {}", event.getPrinterId());// 手动确认消息channel.basicAck(tag, false);logger.debug(" [DB Consumer] 消息确认成功: {}", tag);} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error(" [DB Consumer] 处理中断: {}", event, e);try {// 拒绝消息并重新入队channel.basicNack(tag, false, true);} catch (IOException ioException) {logger.error(" [DB Consumer] 无法NACK消息: {}", tag, ioException);}} catch (Exception e) {logger.error(" [DB Consumer] 处理告警事件时发生错误: {}", event, e);try {// 拒绝消息,不重新入队 (可以配置死信队列处理)channel.basicNack(tag, false, false);} catch (IOException ioException) {logger.error(" [DB Consumer] 无法NACK消息: {}", tag, ioException);}}}
}
package com.grant.code.printeralarmsystem.consumer;import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;/*** 告警通知消费者*/
@Service
public class NotifierConsumer {private static final Logger logger = LoggerFactory.getLogger(NotifierConsumer.class);/*** 监听 alarm_notifier 队列* @param event 告警事件* @param channel AMQP Channel 用于手动确认* @param tag Delivery Tag 用于确认特定消息*/@RabbitListener(queues = "${app.rabbitmq.queue.notify}")public void handleNotification(PrinterAlarmEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {logger.info(" [Notifier Consumer] 开始处理通知告警事件: {}", event);// 模拟耗时的通知发送操作 (例如 3秒)Thread.sleep(3000);// 这里应该是实际的邮件/SMS发送逻辑// emailService.send("admin@example.com", "Printer Alarm", event.toString());logger.info(" [Notifier Consumer] 已发送告警通知给运维人员: {}", event.getPrinterId());// 手动确认消息channel.basicAck(tag, false);logger.debug(" [Notifier Consumer] 消息确认成功: {}", tag);} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error(" [Notifier Consumer] 处理中断: {}", event, e);try {channel.basicNack(tag, false, true);} catch (IOException ioException) {logger.error(" [Notifier Consumer] 无法NACK消息: {}", tag, ioException);}} catch (Exception e) {logger.error(" [Notifier Consumer] 发送通知时发生错误: {}", event, e);try {channel.basicNack(tag, false, false);} catch (IOException ioException) {logger.error(" [Notifier Consumer] 无法NACK消息: {}", tag, ioException);}}}
}
8. 主启动类
package com.grant.code.printeralarmsystem;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PrinterAlarmSystemApplication {public static void main(String[] args) {SpringApplication.run(PrinterAlarmSystemApplication.class, args);}}
启动服务后…
Postman进行测试
控制台输出
2025-08-24T17:46:08.251+08:00 INFO 20384 --- [nio-8080-exec-3] c.g.c.p.controller.AlarmController : [x] 接收到告警: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null}
2025-08-24T17:46:08.251+08:00 INFO 20384 --- [nio-8080-exec-3] c.g.c.p.publisher.AlarmEventPublisher : [x] 发布告警事件: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null} (ID: 3eacb1ac-c8b0-4cd6-afcd-f119e67a4a05)
2025-08-24T17:46:08.253+08:00 INFO 20384 --- [nio-8080-exec-3] c.g.c.p.controller.AlarmController : [x] 告警已发布至队列,返回202 Accepted
2025-08-24T17:46:08.266+08:00 INFO 20384 --- [ntContainer#2-1] c.g.c.p.consumer.NotifierConsumer : [Notifier Consumer] 开始处理通知告警事件: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null}
2025-08-24T17:46:08.266+08:00 INFO 20384 --- [ntContainer#0-1] c.g.c.p.consumer.DashboardConsumer : [Dashboard Consumer] 开始处理大屏推送告警事件: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null}[√] 消息发送成功: CorrelationData [id=3eacb1ac-c8b0-4cd6-afcd-f119e67a4a05]
2025-08-24T17:46:08.267+08:00 INFO 20384 --- [ntContainer#1-1] c.g.c.p.consumer.DbWriterConsumer : [DB Consumer] 开始处理告警事件: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null}
2025-08-24T17:46:08.376+08:00 INFO 20384 --- [ntContainer#0-1] c.g.c.p.consumer.DashboardConsumer : [Dashboard Consumer] 已将告警推送到大屏: PRINTER_001
2025-08-24T17:46:10.270+08:00 INFO 20384 --- [ntContainer#1-1] c.g.c.p.consumer.DbWriterConsumer : [DB Consumer] 成功将告警写入数据库: PRINTER_001
2025-08-24T17:46:11.269+08:00 INFO 20384 --- [ntContainer#2-1] c.g.c.p.consumer.NotifierConsumer : [Notifier Consumer] 已发送告警通知给运维人员: PRINTER_001
可以看到,API几乎立即返回了202,而耗时的数据库写入(2秒)和通知发送(3秒)是在后台异步进行的,这解决了客户抱怨的“告警慢”问题。大屏推送是最快的(0.1秒)。这种架构实现了异步解耦、削峰填谷和提高可靠性。
查看Rabbitmq控制台
可以发现,自动创建了我们要的交换机,队列,以及绑定关系和使用的routeKey
随便选择一个队列,点击get message,发现队列消息为空,是因为消息已经被消费了,所以不再存在于队列当中
完整执行过程
核心流程:打印机上报告警 -> 系统接收 -> 异步处理
1. 系统启动阶段
- 启动Spring Boot应用 (
PrinterAlarmSystemApplication.main
):- Spring Boot框架启动,加载所有配置和Bean。
- RabbitMQ连接与配置 (
RabbitMQConfig
):- 应用根据
application.yml
中的配置连接到RabbitMQ服务器。 - RabbitMQConfig类中的 @Bean方法被执行:
- 创建一个名为
alarm_events
的 Direct Exchange (交换机)。 - 创建三个 Queue (队列):
alarm_db_writer
,alarm_notifier
,alarm_dashboard
。这些队列被设置为持久化(durable
),即使RabbitMQ重启,队列也不会丢失。 - 通过 Binding (绑定) 将这三个队列都绑定到
alarm_events
交换机上,并使用相同的 Routing Key (alarm.event
)。这意味着发送到alarm_events
交换机且 Routing Key 为alarm.event
的消息,会被路由到所有这三个队列。 - 配置
RabbitTemplate
使用Jackson2JsonMessageConverter
来自动序列化/反序列化Java对象(PrinterAlarmEvent
)为JSON格式进行传输,并设置发布确认回调。
- 创建一个名为
- 应用根据
- 消费者监听启动 (
@RabbitListener
):- Spring AMQP框架扫描到
DbWriterConsumer
,NotifierConsumer
,DashboardConsumer
类中的@RabbitListener
注解。 - 它分别为每个消费者在对应的队列 (
alarm_db_writer
,alarm_notifier
,alarm_dashboard
) 上启动监听器,等待消息的到来。
- Spring AMQP框架扫描到
此时,系统基础设施(API、RabbitMQ交换机/队列/绑定、消费者监听)已准备就绪,等待打印机发送告警。
2. 告警产生与接收阶段
-
模拟打印机发送告警
用postman发出请求:localhost:8080/api/alarms/receive
注意:
Content-Type:application/json
{"printerId": "PRINTER_001","alarmType": "喷头堵塞","description": "打印头堵塞,请检查耗材。","severity": "严重" }
-
API处理 (
AlarmController.receiveAlarm
):
- Spring MVC框架接收到请求,将JSON体反序列化为
PrinterAlarmEvent
对象。 - 控制器方法
receiveAlarm
被调用。 - 进行简单的参数校验(例如检查
printerId
是否为空)。 - 关键步骤:调用
AlarmEventPublisher.publishAlarmEvent(event)
方法,将接收到的PrinterAlarmEvent
对象交给RabbitMQ处理。 - 立即响应:不等待RabbitMQ处理完成或消费者执行完毕,直接返回HTTP状态码
202 Accepted
给打印机。这实现了文档中提到的“快速响应设备”。
3.消息发布到mq阶段
发布消息 (AlarmEventPublisher.publishAlarmEvent
):
- 该方法通过注入的
RabbitTemplate
对象,调用convertAndSend
方法。 RabbitTemplate
使用之前配置的Jackson2JsonMessageConverter
将PrinterAlarmEvent
对象序列化为JSON格式的消息体。- 消息被发送到之前配置的
alarm_events
交换机 (exchangeName
),并附带路由键alarm.event
(routingKey
)。 RabbitTemplate
的ConfirmCallback
被触发(异步),确认消息是否成功发送到了RabbitMQ Broker。日志会打印[x] 消息发送成功
或[!] 消息发送失败
。
4.RabbitMQ路由阶段
交换机路由:
- RabbitMQ Broker接收到发送到
alarm_events
交换机的消息。 - 交换机根据绑定规则(Binding)检查消息的路由键
alarm.event
。 - 因为三个队列 (
alarm_db_writer
,alarm_notifier
,alarm_dashboard
) 都绑定了相同的路由键,所以交换机会将这条消息的一个副本分别放入这三个队列中。
此时,一条告警消息已经成功进入RabbitMQ系统,并被复制分发到三个专门的处理队列中。
5. 异步消费者处理阶段
三个消费者的处理是 完全独立且并发 进行的。
消费者C3 (大屏推送 DashboardConsumer
)
- 消息获取: RabbitMQ将
alarm_dashboard
队列中的消息推送给监听该队列的DashboardConsumer
实例。 - 处理逻辑 (
handleDashboardPush
):- 方法被调用,接收到
PrinterAlarmEvent
对象和RabbitMQ的Channel
及deliveryTag
。 - 执行模拟的快速推送操作(
Thread.sleep(100)
)。 - 手动确认: 调用
channel.basicAck(tag, false)
向RabbitMQ发送确认信号,告知消息已成功处理,RabbitMQ会从alarm_dashboard
队列中移除此消息。 - 日志: 打印处理开始、推送成功和确认成功的日志。
- 方法被调用,接收到
消费者C1 (数据库写入 DbWriterConsumer
)
- 消息获取: RabbitMQ将
alarm_db_writer
队列中的消息推送给监听该队列的DbWriterConsumer
实例。 - 处理逻辑 (
handleDbWrite
):- 方法被调用。
- 执行模拟的耗时数据库写入操作(
Thread.sleep(2000)
)。 - 手动确认: 处理成功后,调用
channel.basicAck
确认消息。 - 错误处理: 如果在处理过程中发生异常(如
InterruptedException
或其他Exception
),会调用channel.basicNack
来否定消息。根据配置(requeue=true/false
),消息可能会被重新放回队列或丢弃(或进入死信队列,如果配置了的话)。 - 日志: 打印处理开始、写库成功和确认成功的日志。
消费者C2 (通知发送 NotifierConsumer
)
- 消息获取: RabbitMQ将
alarm_notifier
队列中的消息推送给监听该队列的NotifierConsumer
实例。 - 处理逻辑 (
handleNotification
):- 方法被调用。
- 执行模拟的耗时通知发送操作(
Thread.sleep(3000)
)。 - 手动确认: 处理成功后,调用
channel.basicAck
确认消息。 - 错误处理: 同样有
try-catch
和basicNack
逻辑来处理失败情况。 - 日志: 打印处理开始、发送通知成功和确认成功的日志。
tMQ会从 alarm_dashboard
队列中移除此消息。
- 日志: 打印处理开始、推送成功和确认成功的日志。
消费者C1 (数据库写入 DbWriterConsumer
)
- 消息获取: RabbitMQ将
alarm_db_writer
队列中的消息推送给监听该队列的DbWriterConsumer
实例。 - 处理逻辑 (
handleDbWrite
):- 方法被调用。
- 执行模拟的耗时数据库写入操作(
Thread.sleep(2000)
)。 - 手动确认: 处理成功后,调用
channel.basicAck
确认消息。 - 错误处理: 如果在处理过程中发生异常(如
InterruptedException
或其他Exception
),会调用channel.basicNack
来否定消息。根据配置(requeue=true/false
),消息可能会被重新放回队列或丢弃(或进入死信队列,如果配置了的话)。 - 日志: 打印处理开始、写库成功和确认成功的日志。
消费者C2 (通知发送 NotifierConsumer
)
- 消息获取: RabbitMQ将
alarm_notifier
队列中的消息推送给监听该队列的NotifierConsumer
实例。 - 处理逻辑 (
handleNotification
):- 方法被调用。
- 执行模拟的耗时通知发送操作(
Thread.sleep(3000)
)。 - 手动确认: 处理成功后,调用
channel.basicAck
确认消息。 - 错误处理: 同样有
try-catch
和basicNack
逻辑来处理失败情况。 - 日志: 打印处理开始、发送通知成功和确认成功的日志。
整个过程通过RabbitMQ实现了异步解耦,API响应迅速,不同耗时的操作并行处理,有效解决了客户抱怨的“告警慢”问题。