当前位置: 首页 > news >正文

3D打印机管理后台与RabbitMQ集成的业务场景

3D打印机管理后台与RabbitMQ集成的业务场景

文章目录

  • 3D打印机管理后台与RabbitMQ集成的业务场景
    • 问题
    • 分析
    • 解决方案:引入RabbitMQ进行异步解耦
    • 实战演示
      • 核心思路:
      • 项目结构:
      • 1. `pom.xml` (添加依赖)
      • 2. `application.yml` (配置RabbitMQ连接和消费者)
      • 3. `PrinterAlarmEvent.java` (数据传输对象)
      • 4. `RabbitMQConfig.java` (RabbitMQ配置类)
      • 5. `AlarmEventPublisher.java` (告警事件发布者)
      • 6. `AlarmController.java` (告警接收API)
      • 7.三个消费者
      • 8. 主启动类
      • Postman进行测试
      • 查看Rabbitmq控制台
      • 完整执行过程
        • 1. **系统启动阶段**
        • 2. **告警产生与接收阶段**
        • 3.消息发布到mq阶段
        • 4.**RabbitMQ路由阶段**
        • 5. **异步消费者处理阶段**

问题

系统管理着成千上万台分布在全球各地的工业级3D打印机。这些打印机在长时间打印过程中可能会发生各种异常,例如:

  • 喷头堵塞 (Extruder Jam)
  • 热床温度异常 (Heated Bed Temperature Error)
  • 材料耗尽 (Filament Runout)
  • 设备离线 (Device Offline)

遇到的问题:客户投诉**「告警慢」**

系统有严重延迟!有时候打印机已经停摆报警了10分钟,你们的平台后台才弹出一条通知消息。这导致了我们的产线停工,损失巨大!

分析

根本原因分析(为什么不用数据库,而用RabbitMQ?)

如果系统设计是打印机 -> HTTP API -> 直接写数据库,那么在高并发场景下(50台设备同时上报),会遇到以下瓶颈:

  1. 数据库写入压力:50台设备几乎同时告警,意味着50个INSERT请求瞬间到达数据库,极易造成数据库连接池爆满、写入缓慢。
  2. 同步处理阻塞:在将告警信息写入数据库后,系统还需要进行一系列后续操作(如下文所述),这些操作如果是同步的,会严重拖慢整个API的响应时间,让设备等待,从而形成瓶颈。

解决方案:引入RabbitMQ进行异步解耦

架构如下图所示:

数据与通知
云印后端系统
打印机设备端
异步消费者集群
HTTP API
HTTP API
HTTP API
将报警事件发布至
路由至
路由至
路由至
报警记录数据库
运维人员手机/邮箱
浏览器实时监控大屏
API服务器
接收报警 生成事件
RabbitMQ
Exchange: alarm_events
消费者C1
持久化至数据库
Queue: alarm_db_writer
消费者C2
发送邮件/SMS
Queue: alarm_notifier
消费者C3
WebSocket推送至控制台
Queue: alarm_dashboard
报警接收API
打印机A
喷头堵塞
打印机B
材料耗尽
打印机C
设备离线

如上图所示,整个高效、解耦的处理流程是:

1.接收告警 (API):设备通过HTTP API上报告警。API服务只做两件事:验证请求、将告警信息作为一个消息体(JSON格式)立即发布(Producer) 到RabbitMQ的一个名为 alarm_events的Exchange中。API随即返回202 Accepted给设备,告知其“告警已收到”,处理非常迅速。

2.消息路由 (RabbitMQ):RabbitMQ的 alarm_eventsExchange会根据设定的路由键(routing key),将消息同时分发到三个不同的Queue中:

  • alarm_db_queue(用于数据持久化)
  • alarm_notify_queue(用于发送通知)
  • alarm_dashboard_queue(用于前台大屏实时展示)

3.异步处理 (Consumers):后台运行着多个消费者(Consumer) 服务,各自独立地从上述Queue中获取消息并处理:

消费者C1 (DB Writer):从 alarm_db_queue取消息,异步写入数据库。即使数据库偶尔缓慢,也不会影响其他操作。

消费者C2 (Notifier):从 alarm_notify_queue取消息,调用短信或邮件服务发送告警通知。这是通知延迟的根源,但确保了通知最终必达。

消费者C3 (Dashboard Pusher):从 alarm_dashboard_queue取消息,通过WebSocket实时推送到运维控制台的大屏页面。这是速度最快的一环。

在这个真实场景中,RabbitMQ的作用是:

  • 异步解耦:将耗时的操作(写库、通知)与核心API分离,快速响应设备。
  • 削峰填谷:瞬间的50条告警被队列缓存起来,由消费者逐步消化,避免系统被冲垮。
  • 提高可靠性:即使某个消费者服务(如发短信的)暂时崩溃,告警消息也会在队列中存活,待服务恢复后继续处理,确保消息不丢失。
  • 弹性扩展:如果告警数量持续巨大,您可以简单地启动更多个消费者实例来并行处理队列中的消息。

实战演示

核心思路:

  1. 模拟打印机设备:创建一个简单的HTTP客户端,模拟向后端API发送告警。
  2. 告警接收API:创建一个Spring Boot控制器,接收告警,快速验证并将告警事件发布到RabbitMQ。
  3. RabbitMQ配置:定义交换机(alarm_events)、队列(alarm_db_writer, alarm_notifier, alarm_dashboard)以及它们之间的绑定关系。
  4. 异步消费者
    • 模拟数据库写入消费者:监听alarm_db_writer队列,模拟耗时的数据库操作。
    • 模拟通知发送消费者:监听alarm_notifier队列,模拟发送邮件/SMS。
    • 模拟大屏推送消费者:监听alarm_dashboard队列,模拟通过WebSocket推送(这里简化为打印日志)。

项目结构:

printer-alarm-system/
├── src/main/java/com/example/printer/
│   ├── PrinterAlarmSystemApplication.java (主启动类)
│   ├── config/
│   │   └── RabbitMQConfig.java (RabbitMQ配置)
│   ├── controller/
│   │   └── AlarmController.java (告警接收API)
│   ├── dto/
│   │   └── PrinterAlarmEvent.java (告警事件数据传输对象)
│   ├── producer/
│   │   └── AlarmEventPublisher.java (告警事件发布者)
│   └── consumer/
│       ├── DbWriterConsumer.java (数据库写入消费者)
│       ├── NotifierConsumer.java (通知发送消费者)
│       └── DashboardConsumer.java (大屏推送消费者)
├── src/main/resources/
│   └── application.yml (Spring Boot配置)
└── pom.xml (Maven依赖)

1. pom.xml (添加依赖)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.grant.code</groupId><artifactId>printer-alarm-system</artifactId><version>0.0.1-SNAPSHOT</version><name>printer-alarm-system</name><description>Demo project for Spring Boot + RabbitMQ with 3D Printer Alarms</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot AMQP Starter for RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Jackson for JSON processing --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><dependency><groupId>javax.validation</groupId><artifactId>validation-api</artifactId><version>2.0.1.Final</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2. application.yml (配置RabbitMQ连接和消费者)

spring:rabbitmq:host: XXX # 根据你的RabbitMQ服务器地址修改port: XXX     # 根据你的RabbitMQ服务器端口修改username: XXX # 根据你的RabbitMQ用户名修改password: XXX # 根据你的RabbitMQ密码修改virtual-host: /listener:simple:# 手动确认模式,确保消息处理成功后再确认acknowledge-mode: manual# 预取消息数量,控制消费者负载prefetch: 10# 重试配置retry:enabled: truemax-attempts: 3 # 最大重试次数initial-interval: 2000ms # 初始重试间隔# 发布者确认publisher-confirm-type: correlatedpublisher-returns: true# 自定义配置
app:rabbitmq:exchange: alarm_eventsqueue:db: alarm_db_writernotify: alarm_notifierdashboard: alarm_dashboardrouting-key: alarm.eventlogging:level:com.example.printer: DEBUG # 启用DEBUG日志查看详细流程server:port: 8080 # API服务端口

3. PrinterAlarmEvent.java (数据传输对象)

package com.grant.code.printeralarmsystem.dto;import com.fasterxml.jackson.annotation.JsonFormat;import java.time.LocalDateTime;
import java.util.Objects;public class PrinterAlarmEvent {private String printerId; // 打印机IDprivate String alarmType; // 指定告警类型,例如:喷头堵塞 (Extruder Jam)、材料耗尽 (Filament Runout)private String description; // 具体描述private String severity; // 这个指示告警的严重程度,例如:"Low", "Medium", "High"@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// Constructorspublic PrinterAlarmEvent() {}public PrinterAlarmEvent(String printerId, String alarmType, String description, String severity) {this.printerId = printerId;this.alarmType = alarmType;this.description = description;this.severity = severity;this.timestamp = LocalDateTime.now();}// Getters and Setterspublic String getPrinterId() { return printerId; }public void setPrinterId(String printerId) { this.printerId = printerId; }public String getAlarmType() { return alarmType; }public void setAlarmType(String alarmType) { this.alarmType = alarmType; }public String getDescription() { return description; }public void setDescription(String description) { this.description = description; }public String getSeverity() { return severity; }public void setSeverity(String severity) { this.severity = severity; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "PrinterAlarmEvent{" +"printerId='" + printerId + '\'' +", alarmType='" + alarmType + '\'' +", description='" + description + '\'' +", severity='" + severity + '\'' +", timestamp=" + timestamp +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;PrinterAlarmEvent that = (PrinterAlarmEvent) o;// 只比较 printerId 和 alarmType,即如果printerId和alarmType相同,则认为两个事件相同return Objects.equals(printerId, that.printerId) && Objects.equals(alarmType, that.alarmType);}@Overridepublic int hashCode() {return Objects.hash(printerId, alarmType);}
}

4. RabbitMQConfig.java (RabbitMQ配置类)

package com.grant.code.printeralarmsystem.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ配置类*/
@Configuration
public class RabbitMQConfig {@Value("${app.rabbitmq.exchange}")private String exchangeName; // 交换机名称@Value("${app.rabbitmq.queue.db}")private String dbQueueName; // 数据库写入队列名称@Value("${app.rabbitmq.queue.notify}")private String notifyQueueName; // 通知队列名称@Value("${app.rabbitmq.queue.dashboard}")private String dashboardQueueName; // 大屏显示队列名称@Value("${app.rabbitmq.routing-key}")private String routingKey; // 路由键,用于匹配队列// 创建交换机@Beanpublic DirectExchange alarmEventExchange() {return new DirectExchange(exchangeName, true, false); // durable=true表示持久化, autoDelete=false表示不自动删除}// 创建队列@Beanpublic Queue alarmDbWriterQueue() {return QueueBuilder.durable(dbQueueName).build(); // durable=true表示持久化,这里创建的是数据库写入队列}@Beanpublic Queue alarmNotifierQueue() {return QueueBuilder.durable(notifyQueueName).build(); // durable=true表示持久化,这里创建的是通知队列}@Beanpublic Queue alarmDashboardQueue() {return QueueBuilder.durable(dashboardQueueName).build(); // durable=true表示持久化,这里创建的是大屏显示队列}// 绑定队列到交换机@Beanpublic Binding bindingDbWriter(Queue alarmDbWriterQueue, DirectExchange alarmEventExchange) {return BindingBuilder.bind(alarmDbWriterQueue).to(alarmEventExchange).with(routingKey);}@Beanpublic Binding bindingNotifier(Queue alarmNotifierQueue, DirectExchange alarmEventExchange) {return BindingBuilder.bind(alarmNotifierQueue).to(alarmEventExchange).with(routingKey);}@Beanpublic Binding bindingDashboard(Queue alarmDashboardQueue, DirectExchange alarmEventExchange) {return BindingBuilder.bind(alarmDashboardQueue).to(alarmEventExchange).with(routingKey);}// 配置消息转换器为JSON@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate使用JSON转换器和确认回调@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 创建RabbitTemplate,connectionFactory为连接工厂rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); // 设置消息转换器为JSON// 开启发布确认rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println(" [√] 消息发送成功: " + correlationData);} else {System.err.println(" [×] 消息发送失败: " + cause);}});// 开启发布返回 (针对无法路由的消息)rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(returned -> {System.err.println(" [!] 消息无法路由: " + returned);});return rabbitTemplate;}
}

5. AlarmEventPublisher.java (告警事件发布者)

package com.grant.code.printeralarmsystem.publisher;import com.grant.code.printeralarmsystem.config.RabbitMQConfig;
import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;/*** 告警事件发布者*/
@Service
public class AlarmEventPublisher {private static final Logger logger = LoggerFactory.getLogger(AlarmEventPublisher.class);@Autowiredprivate RabbitTemplate rabbitTemplate;// 注入配置的交换机名称@org.springframework.beans.factory.annotation.Value("${app.rabbitmq.exchange}")private String exchangeName;// 注入配置的路由键@org.springframework.beans.factory.annotation.Value("${app.rabbitmq.routing-key}")private String routingKey;public void publishAlarmEvent(PrinterAlarmEvent event) {// 生成唯一ID用于确认CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info(" [x] 发布告警事件: {} (ID: {})", event, correlationData.getId());// 发布到交换机,使用配置的路由键rabbitTemplate.convertAndSend(exchangeName, routingKey, event, correlationData);}
}

6. AlarmController.java (告警接收API)

package com.grant.code.printeralarmsystem.controller;import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import com.grant.code.printeralarmsystem.publisher.AlarmEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import javax.validation.Valid;/*** 告警接收API*/
@RestController
@RequestMapping("/api/alarms")
public class AlarmController {private static final Logger logger = LoggerFactory.getLogger(AlarmController.class);@Autowiredprivate AlarmEventPublisher alarmEventPublisher;/*** 接收打印机告警的API端点* @param alarmEvent 告警事件对象* @return ResponseEntity 202 Accepted 表示已接收*/@PostMapping("/receive")public ResponseEntity<String> receiveAlarm(@RequestBody @Valid PrinterAlarmEvent alarmEvent) {logger.info(" [x] 接收到告警: {}", alarmEvent);// 1. 快速验证(这里简化)if (alarmEvent.getPrinterId() == null || alarmEvent.getPrinterId().isEmpty()) {logger.warn(" [!] 无效的打印机ID");return ResponseEntity.badRequest().body("Invalid printer ID");}// 2. 将告警事件发布到RabbitMQ (异步操作)alarmEventPublisher.publishAlarmEvent(alarmEvent);// 3. 立即返回202 Accepted,表示请求已被接受处理logger.info(" [x] 告警已发布至队列,返回202 Accepted");return ResponseEntity.status(HttpStatus.ACCEPTED).body("Alarm received and queued for processing.");}
}

7.三个消费者

package com.grant.code.printeralarmsystem.consumer;import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;/*** 大屏消费者*/
@Service
public class DashboardConsumer {private static final Logger logger = LoggerFactory.getLogger(DashboardConsumer.class);/*** 监听 alarm_dashboard 队列* @param event 告警事件* @param channel AMQP Channel 用于手动确认* @param tag Delivery Tag 用于确认特定消息*/@RabbitListener(queues = "${app.rabbitmq.queue.dashboard}")public void handleDashboardPush(PrinterAlarmEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {logger.info(" [Dashboard Consumer] 开始处理大屏推送告警事件: {}", event);// 模拟非常快速的推送操作 (例如 0.1秒)Thread.sleep(100);// 这里应该是实际的WebSocket推送逻辑// webSocketService.sendToDashboard("/topic/alarms", event);logger.info(" [Dashboard Consumer] 已将告警推送到大屏: {}", event.getPrinterId());// 手动确认消息channel.basicAck(tag, false);logger.debug(" [Dashboard Consumer] 消息确认成功: {}", tag);} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error(" [Dashboard Consumer] 处理中断: {}", event, e);try {channel.basicNack(tag, false, true);} catch (IOException ioException) {logger.error(" [Dashboard Consumer] 无法NACK消息: {}", tag, ioException);}} catch (Exception e) {logger.error(" [Dashboard Consumer] 推送大屏时发生错误: {}", event, e);try {channel.basicNack(tag, false, false);} catch (IOException ioException) {logger.error(" [Dashboard Consumer] 无法NACK消息: {}", tag, ioException);}}}
}
package com.grant.code.printeralarmsystem.consumer;import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;
/** 告警事件写入数据库消费者*/
@Service
public class DbWriterConsumer {private static final Logger logger = LoggerFactory.getLogger(DbWriterConsumer.class);/*** 监听 alarm_db_writer 队列* @param event 告警事件* @param channel AMQP Channel 用于手动确认* @param tag Delivery Tag 用于确认特定消息* header注解:用于获取消息的 Delivery Tag,Delivery Tag是消息的唯一标识符,用于确认消息处理成功或失败*/@RabbitListener(queues = "${app.rabbitmq.queue.db}") // 监听 alarm_db_writer 队列public void handleDbWrite(PrinterAlarmEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {logger.info(" [DB Consumer] 开始处理告警事件: {}", event);// 模拟耗时的数据库写入操作 (例如 2秒)Thread.sleep(2000);// 这里应该是实际的数据库插入逻辑// repository.save(alarmRecord);logger.info(" [DB Consumer] 成功将告警写入数据库: {}", event.getPrinterId());// 手动确认消息channel.basicAck(tag, false);logger.debug(" [DB Consumer] 消息确认成功: {}", tag);} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error(" [DB Consumer] 处理中断: {}", event, e);try {// 拒绝消息并重新入队channel.basicNack(tag, false, true);} catch (IOException ioException) {logger.error(" [DB Consumer] 无法NACK消息: {}", tag, ioException);}} catch (Exception e) {logger.error(" [DB Consumer] 处理告警事件时发生错误: {}", event, e);try {// 拒绝消息,不重新入队 (可以配置死信队列处理)channel.basicNack(tag, false, false);} catch (IOException ioException) {logger.error(" [DB Consumer] 无法NACK消息: {}", tag, ioException);}}}
}
package com.grant.code.printeralarmsystem.consumer;import com.grant.code.printeralarmsystem.dto.PrinterAlarmEvent;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;/*** 告警通知消费者*/
@Service
public class NotifierConsumer {private static final Logger logger = LoggerFactory.getLogger(NotifierConsumer.class);/*** 监听 alarm_notifier 队列* @param event 告警事件* @param channel AMQP Channel 用于手动确认* @param tag Delivery Tag 用于确认特定消息*/@RabbitListener(queues = "${app.rabbitmq.queue.notify}")public void handleNotification(PrinterAlarmEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {logger.info(" [Notifier Consumer] 开始处理通知告警事件: {}", event);// 模拟耗时的通知发送操作 (例如 3秒)Thread.sleep(3000);// 这里应该是实际的邮件/SMS发送逻辑// emailService.send("admin@example.com", "Printer Alarm", event.toString());logger.info(" [Notifier Consumer] 已发送告警通知给运维人员: {}", event.getPrinterId());// 手动确认消息channel.basicAck(tag, false);logger.debug(" [Notifier Consumer] 消息确认成功: {}", tag);} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error(" [Notifier Consumer] 处理中断: {}", event, e);try {channel.basicNack(tag, false, true);} catch (IOException ioException) {logger.error(" [Notifier Consumer] 无法NACK消息: {}", tag, ioException);}} catch (Exception e) {logger.error(" [Notifier Consumer] 发送通知时发生错误: {}", event, e);try {channel.basicNack(tag, false, false);} catch (IOException ioException) {logger.error(" [Notifier Consumer] 无法NACK消息: {}", tag, ioException);}}}
}

8. 主启动类

package com.grant.code.printeralarmsystem;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PrinterAlarmSystemApplication {public static void main(String[] args) {SpringApplication.run(PrinterAlarmSystemApplication.class, args);}}

启动服务后…

Postman进行测试

在这里插入图片描述

控制台输出

2025-08-24T17:46:08.251+08:00  INFO 20384 --- [nio-8080-exec-3] c.g.c.p.controller.AlarmController       :  [x] 接收到告警: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null}
2025-08-24T17:46:08.251+08:00  INFO 20384 --- [nio-8080-exec-3] c.g.c.p.publisher.AlarmEventPublisher    :  [x] 发布告警事件: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null} (ID: 3eacb1ac-c8b0-4cd6-afcd-f119e67a4a05)
2025-08-24T17:46:08.253+08:00  INFO 20384 --- [nio-8080-exec-3] c.g.c.p.controller.AlarmController       :  [x] 告警已发布至队列,返回202 Accepted
2025-08-24T17:46:08.266+08:00  INFO 20384 --- [ntContainer#2-1] c.g.c.p.consumer.NotifierConsumer        :  [Notifier Consumer] 开始处理通知告警事件: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null}
2025-08-24T17:46:08.266+08:00  INFO 20384 --- [ntContainer#0-1] c.g.c.p.consumer.DashboardConsumer       :  [Dashboard Consumer] 开始处理大屏推送告警事件: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null}[] 消息发送成功: CorrelationData [id=3eacb1ac-c8b0-4cd6-afcd-f119e67a4a05]
2025-08-24T17:46:08.267+08:00  INFO 20384 --- [ntContainer#1-1] c.g.c.p.consumer.DbWriterConsumer        :  [DB Consumer] 开始处理告警事件: PrinterAlarmEvent{printerId='PRINTER_001', alarmType='喷头堵塞', description='打印头堵塞,请检查耗材。', severity='严重', timestamp=null}
2025-08-24T17:46:08.376+08:00  INFO 20384 --- [ntContainer#0-1] c.g.c.p.consumer.DashboardConsumer       :  [Dashboard Consumer] 已将告警推送到大屏: PRINTER_001
2025-08-24T17:46:10.270+08:00  INFO 20384 --- [ntContainer#1-1] c.g.c.p.consumer.DbWriterConsumer        :  [DB Consumer] 成功将告警写入数据库: PRINTER_001
2025-08-24T17:46:11.269+08:00  INFO 20384 --- [ntContainer#2-1] c.g.c.p.consumer.NotifierConsumer        :  [Notifier Consumer] 已发送告警通知给运维人员: PRINTER_001

在这里插入图片描述

可以看到,API几乎立即返回了202,而耗时的数据库写入(2秒)和通知发送(3秒)是在后台异步进行的,这解决了客户抱怨的“告警慢”问题。大屏推送是最快的(0.1秒)。这种架构实现了异步解耦、削峰填谷和提高可靠性。

查看Rabbitmq控制台

可以发现,自动创建了我们要的交换机,队列,以及绑定关系和使用的routeKey

在这里插入图片描述

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

随便选择一个队列,点击get message,发现队列消息为空,是因为消息已经被消费了,所以不再存在于队列当中

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

完整执行过程

核心流程:打印机上报告警 -> 系统接收 -> 异步处理

1. 系统启动阶段
  1. 启动Spring Boot应用 (PrinterAlarmSystemApplication.main):
    • Spring Boot框架启动,加载所有配置和Bean。
  2. RabbitMQ连接与配置 (RabbitMQConfig):
    • 应用根据 application.yml 中的配置连接到RabbitMQ服务器。
    • RabbitMQConfig类中的 @Bean方法被执行:
      • 创建一个名为 alarm_eventsDirect Exchange (交换机)。
      • 创建三个 Queue (队列):alarm_db_writer, alarm_notifier, alarm_dashboard。这些队列被设置为持久化(durable),即使RabbitMQ重启,队列也不会丢失。
      • 通过 Binding (绑定) 将这三个队列都绑定到 alarm_events 交换机上,并使用相同的 Routing Key (alarm.event)。这意味着发送到 alarm_events 交换机且 Routing Key 为 alarm.event 的消息,会被路由到所有这三个队列。
      • 配置 RabbitTemplate 使用 Jackson2JsonMessageConverter 来自动序列化/反序列化Java对象(PrinterAlarmEvent)为JSON格式进行传输,并设置发布确认回调。
  3. 消费者监听启动 (@RabbitListener):
    • Spring AMQP框架扫描到 DbWriterConsumer, NotifierConsumer, DashboardConsumer 类中的 @RabbitListener 注解。
    • 它分别为每个消费者在对应的队列 (alarm_db_writer, alarm_notifier, alarm_dashboard) 上启动监听器,等待消息的到来。

此时,系统基础设施(API、RabbitMQ交换机/队列/绑定、消费者监听)已准备就绪,等待打印机发送告警。

2. 告警产生与接收阶段
  1. 模拟打印机发送告警

    用postman发出请求:localhost:8080/api/alarms/receive

    注意:Content-Type:application/json

    {"printerId": "PRINTER_001","alarmType": "喷头堵塞","description": "打印头堵塞,请检查耗材。","severity": "严重"
    }
    
  2. API处理 (AlarmController.receiveAlarm):

  • Spring MVC框架接收到请求,将JSON体反序列化为 PrinterAlarmEvent 对象。
  • 控制器方法 receiveAlarm 被调用。
  • 进行简单的参数校验(例如检查 printerId 是否为空)。
  • 关键步骤:调用 AlarmEventPublisher.publishAlarmEvent(event) 方法,将接收到的 PrinterAlarmEvent 对象交给RabbitMQ处理。
  • 立即响应:不等待RabbitMQ处理完成或消费者执行完毕,直接返回HTTP状态码 202 Accepted 给打印机。这实现了文档中提到的“快速响应设备”。
3.消息发布到mq阶段

发布消息 (AlarmEventPublisher.publishAlarmEvent):

  • 该方法通过注入的 RabbitTemplate 对象,调用 convertAndSend 方法。
  • RabbitTemplate 使用之前配置的 Jackson2JsonMessageConverterPrinterAlarmEvent 对象序列化为JSON格式的消息体。
  • 消息被发送到之前配置的 alarm_events 交换机 (exchangeName),并附带路由键 alarm.event (routingKey)。
  • RabbitTemplateConfirmCallback 被触发(异步),确认消息是否成功发送到了RabbitMQ Broker。日志会打印 [x] 消息发送成功[!] 消息发送失败
4.RabbitMQ路由阶段

交换机路由:

  • RabbitMQ Broker接收到发送到 alarm_events 交换机的消息。
  • 交换机根据绑定规则(Binding)检查消息的路由键 alarm.event
  • 因为三个队列 (alarm_db_writer, alarm_notifier, alarm_dashboard) 都绑定了相同的路由键,所以交换机会将这条消息的一个副本分别放入这三个队列中。

此时,一条告警消息已经成功进入RabbitMQ系统,并被复制分发到三个专门的处理队列中。

5. 异步消费者处理阶段

三个消费者的处理是 完全独立且并发 进行的。

消费者C3 (大屏推送 DashboardConsumer)

  1. 消息获取: RabbitMQ将 alarm_dashboard 队列中的消息推送给监听该队列的 DashboardConsumer 实例。
  2. 处理逻辑 (handleDashboardPush):
    • 方法被调用,接收到 PrinterAlarmEvent 对象和RabbitMQ的 ChanneldeliveryTag
    • 执行模拟的快速推送操作(Thread.sleep(100))。
    • 手动确认: 调用 channel.basicAck(tag, false) 向RabbitMQ发送确认信号,告知消息已成功处理,RabbitMQ会从 alarm_dashboard 队列中移除此消息。
    • 日志: 打印处理开始、推送成功和确认成功的日志。

消费者C1 (数据库写入 DbWriterConsumer)

  1. 消息获取: RabbitMQ将 alarm_db_writer 队列中的消息推送给监听该队列的 DbWriterConsumer 实例。
  2. 处理逻辑 (handleDbWrite):
    • 方法被调用。
    • 执行模拟的耗时数据库写入操作(Thread.sleep(2000))。
    • 手动确认: 处理成功后,调用 channel.basicAck 确认消息。
    • 错误处理: 如果在处理过程中发生异常(如 InterruptedException 或其他 Exception),会调用 channel.basicNack 来否定消息。根据配置(requeue=true/false),消息可能会被重新放回队列或丢弃(或进入死信队列,如果配置了的话)。
    • 日志: 打印处理开始、写库成功和确认成功的日志。

消费者C2 (通知发送 NotifierConsumer)

  1. 消息获取: RabbitMQ将 alarm_notifier 队列中的消息推送给监听该队列的 NotifierConsumer 实例。
  2. 处理逻辑 (handleNotification):
    • 方法被调用。
    • 执行模拟的耗时通知发送操作(Thread.sleep(3000))。
    • 手动确认: 处理成功后,调用 channel.basicAck 确认消息。
    • 错误处理: 同样有 try-catchbasicNack 逻辑来处理失败情况。
    • 日志: 打印处理开始、发送通知成功和确认成功的日志。

tMQ会从 alarm_dashboard 队列中移除此消息。

  • 日志: 打印处理开始、推送成功和确认成功的日志。

消费者C1 (数据库写入 DbWriterConsumer)

  1. 消息获取: RabbitMQ将 alarm_db_writer 队列中的消息推送给监听该队列的 DbWriterConsumer 实例。
  2. 处理逻辑 (handleDbWrite):
    • 方法被调用。
    • 执行模拟的耗时数据库写入操作(Thread.sleep(2000))。
    • 手动确认: 处理成功后,调用 channel.basicAck 确认消息。
    • 错误处理: 如果在处理过程中发生异常(如 InterruptedException 或其他 Exception),会调用 channel.basicNack 来否定消息。根据配置(requeue=true/false),消息可能会被重新放回队列或丢弃(或进入死信队列,如果配置了的话)。
    • 日志: 打印处理开始、写库成功和确认成功的日志。

消费者C2 (通知发送 NotifierConsumer)

  1. 消息获取: RabbitMQ将 alarm_notifier 队列中的消息推送给监听该队列的 NotifierConsumer 实例。
  2. 处理逻辑 (handleNotification):
    • 方法被调用。
    • 执行模拟的耗时通知发送操作(Thread.sleep(3000))。
    • 手动确认: 处理成功后,调用 channel.basicAck 确认消息。
    • 错误处理: 同样有 try-catchbasicNack 逻辑来处理失败情况。
    • 日志: 打印处理开始、发送通知成功和确认成功的日志。

整个过程通过RabbitMQ实现了异步解耦,API响应迅速,不同耗时的操作并行处理,有效解决了客户抱怨的“告警慢”问题。

http://www.xdnf.cn/news/1357633.html

相关文章:

  • Windows Server存储副本智能同步优化方案
  • 【RAGFlow代码详解-4】数据存储层
  • 第四章:大模型(LLM)】07.Prompt工程-(12)其他prompt方法
  • 人工智能之数学基础:离散型随机变量
  • 【中文教材】13. 资本流动与外汇市场
  • Redis 高可用开发指南
  • 支持多种模型,无限AI生图工具来了
  • HTTP 接口调用工具类(OkHttp 版)
  • 华为网路设备学习-30(BGP协议 五)Community、
  • pytorch线性回归(二)
  • elasticsearch 7.x elasticsearch 使用scroll滚动查询中超时问题案例
  • MySQL官方C/C++ 接口入门
  • Ubuntu24.04 安装 Zabbix
  • ComfyUI ZLUDA AMD conda 使用遇到的问题
  • rust语言 (1.88) egui (0.32.1) 学习笔记(逐行注释)(十五)网格布局
  • 【229页PPT】某大型制药集团企业数字化转型SAP蓝图设计解决方案(附下载方式)
  • 目标检测数据集 第006期-基于yolo标注格式的汽车事故检测数据集(含免费分享)
  • 网络协议UDP、TCP
  • 管道符在渗透测试与网络安全中的全面应用指南
  • 【信息安全】英飞凌TC3xx安全调试口功能实现(调试口保护)
  • OSG库子动态库和插件等文件介绍
  • AlmaLinux 上 Python 3.6 切换到 Python 3.11
  • 从 JUnit 深入理解 Java 注解与反射机制
  • Flink元空间异常深度解析:从原理到实战调优指南
  • 数字防线:现代企业网络安全运维实战指南
  • Maven项目中settings.xml终极优化指南
  • 得物25年春招-安卓部分笔试题1
  • Flink 实时加购数据“维表补全”实战:从 Kafka 到 HBase 再到 Redis 的完整链路
  • GaussDB 数据库架构师修炼(十八) SQL引擎-分布式计划
  • vimware unbuntu18.04 安装之后,没有网络解决方案