当前位置: 首页 > news >正文

java每日精进 6.11【消息队列】

1.内存级Spring_Event

1.1 控制器层:StringTextController

/*** 字符串文本管理控制器* 提供通过消息队列异步获取文本信息的接口*/
@RestController
@RequestMapping("/api/string-text")
public class StringTextController {@Resourceprivate StringTextProducer stringTextProducer;/*** 通过消息队列异步查询字符串文本信息* * 流程:* 1. 接收前端查询参数* 2. 封装为消息对象发送到消息队列* 3. 立即返回"消息已发送"响应,不等待实际处理结果* * 优点:* - 避免长耗时查询阻塞HTTP连接* - 支持水平扩展处理能力* * @param pageReqVO 分页查询参数* @return 统一响应结果(仅包含消息发送状态)*/@GetMapping("/getStringTextByMQ")@Operation(summary = "通过消息队列获取test信息")public CommonResult<String> getStringTextByMQ(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到MQ请求,线程名: {}", Thread.currentThread().getName());try {// 发送消息到队列(异步处理)stringTextProducer.sendStringTextQueryMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("消息已发送,异步处理中");} catch (Exception e) {log.error("消息发送异常", e);return CommonResult.error(500, "消息发送失败");}}
}

1.2 消息生产者:StringTextProducer

/*** 字符串文本查询消息生产者* 负责将查询请求封装为消息并发送到消息队列* * 设计模式:* - 使用Spring事件机制作为轻量级消息队列* - 可无缝切换为Kafka/RocketMQ等真正的MQ系统*/
@Slf4j
@Component
public class StringTextProducer {@Resourceprivate ApplicationContext applicationContext;/*** 发送字符串查询消息* * @param text 查询文本(用于模糊匹配)* @param pageNo 页码* @param pageSize 每页大小* * 消息传递机制:* 1. 创建StringTextQueryMessage对象* 2. 通过Spring事件发布器将消息广播到事件总线* 3. 由StringTextConsumer监听并处理该消息*/public void sendStringTextQueryMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryMessage message = new StringTextQueryMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);log.info("[sendStringTextQueryMessage][发送消息: {}]", message);// 使用Spring事件机制发布消息// 注意:此处使用同步事件,通过@Async注解在Consumer端实现异步applicationContext.publishEvent(message);}
}

1.3 消息消费者:StringTextConsumer

/*** 字符串文本查询消息消费者* 负责从消息队列接收查询请求并执行实际查询操作* * 线程模型:* - 使用@Async注解指定专用线程池"curdAsyncExecutor"* - 实现请求处理与HTTP请求线程分离*/
@Slf4j
@Component
public class StringTextConsumer {@Resourceprivate StringTextService stringTextService;/*** 处理字符串查询消息* * @param message 查询消息对象* * 执行流程:* 1. 从消息中提取查询参数* 2. 调用Service层执行实际查询* 3. 记录查询结果(可扩展为保存到结果表或通知前端)*/@EventListener@Async("curdAsyncExecutor") // 使用专用异步线程池public void onMessage(StringTextQueryMessage message) {log.info("[onMessage][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());// 1. 将消息转换为请求对象StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());// 2. 执行实际查询(可能是数据库或外部系统)PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][处理结果: {}]", pageResult);// 3. 可扩展逻辑:// - 将结果保存到临时表// - 推送WebSocket通知前端结果就绪// - 触发后续处理流程}
}

1.4 消息模型:StringTextQueryMessage

/*** 字符串文本查询消息* 用于在生产者和消费者之间传递查询参数* * 设计特点:* - 实现Serializable接口以便于消息传输* - 使用JSR-303注解进行参数校验* - 字段与StringTextPageReqVO保持语义一致*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StringTextQueryMessage implements Serializable {private static final long serialVersionUID = 20230610L;/*** 查询的文本内容(用于模糊查询)*/private String text;/*** 页码(从1开始)*/@NotNull(message = "页码不能为空")private Integer pageNo;/*** 每页大小*/@NotNull(message = "每页大小不能为空")private Integer pageSize;
}

客户端 → HTTP请求 → StringTextController → StringTextProducer → 消息队列 → StringTextConsumer → StringTextService → 数据库查询

1.5 问题提出

1.5.1 生产者如何找到消费者?

  • StringTextProducer 通过 applicationContext.publishEvent 发布 StringTextQueryMessage 事件。
  • Spring 容器根据事件类型(StringTextQueryMessage)将事件分发给 StringTextConsumer 的 @EventListener 方法。
  • 匹配基于 Java 类型系统,参数类型必须兼容事件类型。


1.5.2 假如添加多个新的接收参数为StringTextQueryMessage的消费者,还有几个是接收的参数是StringTextQueryMessage及其子类,会全部接收到还是会匹配精度最高的?

  • 事件类型匹配
    • Spring 根据事件对象的 运行时类型(StringTextQueryMessage 或其子类)查找所有 @EventListener 方法,检查其参数类型是否与事件类型兼容。
    • 兼容的规则是:监听方法的参数类型可以是事件类型的 类本身超类接口
    • 例如,如果发布的事件是 StringTextQueryMessage,所有参数类型为 StringTextQueryMessage、其超类(如 Object)或接口的 @EventListener 方法都会被调用。
  • 多消费者行为
    • Spring Event 不基于“精度最高”选择单个消费者,而是 广播 事件给所有匹配的监听器。
    • 如果有多个消费者监听 StringTextQueryMessage 或其子类,Spring 会按顺序调用所有匹配的 @EventListener 方法。
  • 执行顺序
    • 默认情况下,Spring 不保证 @EventListener 方法的调用顺序。
    • 可以通过 @Order 注解或实现 Ordered 接口指定执行顺序(较低的 order 值优先执行)。
  • 异步性
    • 如果 @EventListener 方法标注了 @Async(如您的 StringTextConsumer),每个消费者的处理会在异步线程中执行,互不阻塞。

1.5.3以上情况能不能指定一个consumer匹配?

方法 1:使用条件注解(@EventListener(condition = ...))
  • 在 @EventListener 中添加 condition 属性,通过 SpEL(Spring Expression Language)过滤事件。
  • 示例:修改 StringTextConsumer1 和 StringTextConsumer2,添加条件:
  • @EventListener(condition = "#message.text == 'specific'")

  • 如果 message.text == "specific",只有 Consumer1 处理。

方法 2:使用自定义事件路由
  • 引入一个事件路由机制,通过事件对象的额外属性指定目标消费者。
  • 修改 StringTextQueryMessage,添加 consumerId 字段:
  • @Data

    public class StringTextQueryMessage {

        private String text;

        @NotNull(message = "页码不能为空")

        private Integer pageNo;

        @NotNull(message = "每页大小不能为空")

        private Integer pageSize;

        private String consumerId; // 新增:指定目标消费者

    }

  • @Slf4j

    @Component

    public class StringTextConsumer1 {

        @Resource

        private StringTextService stringTextService;

        @EventListener

        @Async("curdAsyncExecutor")

        public void onMessage(StringTextQueryMessage message) {

            if (!"consumer1".equals(message.getConsumerId())) {

                return; // 忽略不匹配的 consumerId

            }

            log.info("[Consumer1][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());

            StringTextPageReqVO reqVO = new StringTextPageReqVO();

            reqVO.setText(message.getText());

            reqVO.setPageNo(message.getPageNo());

            reqVO.setPageSize(message.getPageSize());

            PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);

            log.info("[Consumer1][处理结果: {}]", pageResult);

        }

    }

    @Slf4j

    @Component

    public class StringTextConsumer2 {

        @EventListener

        @Async("curdAsyncExecutor")

        public void onMessage(StringTextQueryMessage message) {

            if (!"consumer2".equals(message.getConsumerId())) {

                return;

            }

            log.info("[Consumer2][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());

        }

    }

方法 3:自定义事件分发器
  • 重写 Spring 的事件分发逻辑,使用自定义 ApplicationEventMulticaster:
    • 创建自定义 ApplicationEventMulticaster:
    •     @Component("applicationEventMulticaster")

          public class CustomEventMulticaster extends SimpleApplicationEventMulticaster {

              @Override

              public void multicastEvent(ApplicationEvent event) {

                  if (event instanceof StringTextQueryMessage) {

                      StringTextQueryMessage message = (StringTextQueryMessage) event;

                      // 根据条件选择特定消费者,例如 consumerId

                      getApplicationListeners(event).stream()

                              .filter(listener -> {

                                  // 假设 listener 的方法名或类名包含 consumerId

                                  return listener.getListenerId().contains(message.getConsumerId());

                              })

                              .forEach(listener -> listener.onApplicationEvent(event));

                  } else {

                      super.multicastEvent(event);

                  }

              }

          }

执行结果如下:

2025-06-10 10:39:03.349 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor    | [preHandle][开始请求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 参数({text=username})]Controller 方法路径:cn.iocoder.moyun.module.curd.controller.admin.CurdTestController(CurdTestController.java:57)
2025-06-10 10:39:03.355 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.c.admin.CurdTestController     | 控制器接收到MQ请求,线程名: http-nio-48080-exec-7
2025-06-10 10:39:03.355 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.m.p.sms.StringTextProducer     | [sendStringTextQueryMessage][发送消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10)]
2025-06-10 10:39:03.357 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor    | [afterCompletion][完成请求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 耗时(6 ms)]
2025-06-10 10:39:03.358 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer     | [onMessage][接收消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10),线程名: curd-async-2]
2025-06-10 10:39:03.374 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.s.s.StringTextServiceImpl      | 同步查询结果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"临时用户","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"测试用户","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理员","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)
2025-06-10 10:39:03.375 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer     | [onMessage][处理结果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"临时用户","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"测试用户","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理员","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)]

2. Redis消息队列

第一部分:Redis完成MQ的详细过程梳理

Yudao项目利用Redis实现了两种消息队列机制:

  • Redis Stream:用于集群消费,消息持久化存储,支持消费者组,适合需要可靠性和消息追踪的场景。
  • Redis Pub/Sub(Channel):用于广播消费,消息不持久化,适合实时性要求高的场景,所有订阅者都会收到消息。

我将模拟一个完整的消息发送和消费过程,分别讲解Stream和Channel的实现,详细说明每个类的作用、注册方式以及它们如何协作。

2.1 Redis Stream消息队列的完整过程

场景:通过/getStringTextByStream接口发送Stream消息

假设用户通过HTTP请求调用/getStringTextByStream接口,传入StringTextPageReqVO对象(包含text、pageNo、pageSize字段)。以下是消息从发送到消费的详细流程:


步骤1:接收HTTP请求(Controller层)
  • 相关类:@GetMapping("/getStringTextByStream")(未显式定义类名,假设为StringTextController)
    • 作用:这是一个Spring MVC控制器方法,负责接收前端的HTTP GET请求,处理通过Redis Stream发送消息的逻辑。
    • 功能
      • 接收StringTextPageReqVO参数,包含分页查询的信息(如text、pageNo、pageSize)。
      • 调用StringTextRedisProducer的sendStreamMessage方法发送消息。
      • 返回CommonResult表示消息已发送,异步处理中。
    • 框架注册
      • 通过Spring的@GetMapping注解,自动注册为一个HTTP端点。
      • 依赖Spring Boot的Web模块,控制器类通常标注@RestController,由Spring容器管理。
    • 与其他类的交互
      • 依赖StringTextRedisProducer来发送Stream消息。
      • 使用StringTextPageReqVO作为请求参数的VO(Value Object)。
  • 代码分析
@GetMapping("/getStringTextByStream")
public CommonResult<String> getStringTextByStream(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到 Stream 请求,线程名: {}", Thread.currentThread().getName());try {stringTextRedisProducer.sendStreamMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("Stream 消息已发送,异步处理中");} catch (Exception e) {log.error("Stream 消息发送异常", e);return CommonResult.error(500, "Stream 消息发送失败");}
}
  • 控制器接收到请求后,提取pageReqVO中的字段,传递给StringTextRedisProducer。
  • 异常处理确保即使发送失败,也能返回错误响应。
步骤2:生产者发送Stream消息
  • 相关类:StringTextRedisProducer
    • 作用:这是一个生产者类,负责创建并发送Redis Stream消息。
    • 功能
      • 提供sendStreamMessage方法,构造StringTextQueryStreamMessage对象,设置text、pageNo、pageSize字段。
      • 使用RedisMQTemplate的send方法将消息发送到Redis Stream。
      • 记录日志,跟踪消息发送情况。
    • 框架注册
      • 标注@Component,由Spring容器管理。
      • 通过@Resource注入RedisMQTemplate依赖。
    • 与其他类的交互
      • 依赖RedisMQTemplate执行实际的Redis Stream消息发送。
      • 使用StringTextQueryStreamMessage作为消息载体。
      • 被StringTextController调用。
  • 代码分析
@Component
public class StringTextRedisProducer {@Resourceprivate RedisMQTemplate redisMQTemplate;public void sendStreamMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryStreamMessage message = new StringTextQueryStreamMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);redisMQTemplate.send(message);log.info("[sendStreamMessage][发送 Stream 消息: {}]", message);}
}
  • 创建StringTextQueryStreamMessage对象,填充请求参数。
  • 调用redisMQTemplate.send(message),将消息发送到Redis Stream。
  • 相关类:StringTextQueryStreamMessage
    • 作用:Stream消息的载体,定义消息的结构和Stream Key。
    • 功能
      • 继承AbstractRedisStreamMessage,包含text、pageNo、pageSize字段。
      • 重写getStreamKey(),指定Stream的键为"string-text-query-stream"。
    • 框架注册
      • 作为普通Java类,无需Spring注册,直接由StringTextRedisProducer实例化。
    • 与其他类的交互
      • 被StringTextRedisProducer用于封装消息内容。
      • 被StringTextStreamConsumer用于解析消息内容。
  • 代码分析
@Data
public class StringTextQueryStreamMessage extends AbstractRedisStreamMessage {private String text;@NotNull(message = "页码不能为空")private Integer pageNo;@NotNull(message = "每页大小不能为空")private Integer pageSize;@Overridepublic String getStreamKey() {return "string-text-query-stream";}
}
  • 定义消息结构,确保pageNo和pageSize不为空。
  • 指定Stream Key,用于Redis Stream的消息存储。
步骤3:RedisMQTemplate发送消息
  • 相关类:RedisMQTemplate
    • 作用:Redis MQ操作的核心模板类,封装了Redis Stream和Pub/Sub的消息发送逻辑。
    • 功能
      • 提供send方法,针对Stream消息,将消息序列化为JSON并添加到指定的Stream Key。
      • 支持拦截器机制,在消息发送前后调用RedisMessageInterceptor的钩子方法。
      • 使用RedisTemplate执行底层的Redis操作。
    • 框架注册
      • 通过YudaoRedisMQProducerAutoConfiguration类注册为Spring Bean。
      • 依赖StringRedisTemplate和RedisMessageInterceptor列表。
    • 与其他类的交互
      • 被StringTextRedisProducer调用,用于发送Stream消息。
      • 依赖RedisTemplate执行Redis操作。
      • 调用RedisMessageInterceptor处理消息发送前后的扩展逻辑。
      • 被StringTextStreamConsumer用于ACK消息。
  • 代码分析
public class RedisMQTemplate {private final RedisTemplate<String, ?> redisTemplate;private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)).withStreamKey(message.getStreamKey()));} finally {sendMessageAfter(message);}}
}
  • 调用sendMessageBefore执行拦截器前置逻辑。
  • 使用redisTemplate.opsForStream().add将消息添加到Stream,返回RecordId。
  • 调用sendMessageAfter执行拦截器后置逻辑。
  • 相关类:YudaoRedisMQProducerAutoConfiguration
    • 作用:生产者配置类,负责注册RedisMQTemplate。
    • 功能
      • 创建RedisMQTemplate实例,注入StringRedisTemplate和拦截器列表。
      • 确保在YudaoRedisAutoConfiguration之后加载(依赖Redis配置)。
    • 框架注册
      • 标注@AutoConfiguration,由Spring Boot自动加载。
      • 使用@Bean注册RedisMQTemplate。
    • 与其他类的交互
      • 为StringTextRedisProducer和StringTextStreamConsumer提供RedisMQTemplate。
      • 依赖StringRedisTemplate和RedisMessageInterceptor。
  • 代码分析
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQProducerAutoConfiguration {@Beanpublic RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,List<RedisMessageInterceptor> interceptors) {RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);interceptors.forEach(redisMQTemplate::addInterceptor);return redisMQTemplate;}
}

相关类:RedisMessageInterceptor

  • 作用:消息拦截器接口,提供发送和消费消息前后的扩展点。
  • 功能
    • 定义sendMessageBefore、sendMessageAfter、consumeMessageBefore、consumeMessageAfter方法。
    • 适用于多租户场景或其他扩展需求(如日志记录、权限检查)。
  • 框架注册
    • 实现类需标注@Component,由Spring容器管理。
    • 由YudaoRedisMQProducerAutoConfiguration注入到RedisMQTemplate。
  • 与其他类的交互
    • 被RedisMQTemplate调用,用于消息处理扩展。
步骤4:消费者监听和处理Stream消息
  • 相关类:StringTextStreamConsumer
    • 作用:Stream消息的消费者,负责处理收到的StringTextQueryStreamMessage消息。
    • 功能
      • 继承AbstractRedisStreamMessageListener,实现onMessage方法。
      • 将消息转换为StringTextPageReqVO,调用StringTextService执行分页查询。
      • 记录处理结果日志。
    • 框架注册
      • 标注@Component,由Spring容器管理。
      • 通过YudaoRedisMQConsumerAutoConfiguration注册到StreamMessageListenerContainer。
    • 与其他类的交互
      • 依赖StringTextService执行业务逻辑。
      • 依赖RedisMQTemplate进行消息ACK。
      • 接收StringTextQueryStreamMessage消息。
  • 代码分析
@Component
public class StringTextStreamConsumer extends AbstractRedisStreamMessageListener<StringTextQueryStreamMessage> {@Resourceprivate StringTextService stringTextService;@Overridepublic void onMessage(StringTextQueryStreamMessage message) {log.info("[onMessage][Stream 消息: {},线程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Stream 处理结果: {}]", pageResult);}
}
  • 相关类:AbstractRedisStreamMessageListener
    • 作用:Stream消息监听器的抽象基类,封装通用消费逻辑。
    • 功能
      • 实现StreamListener接口,处理Redis Stream消息。
      • 解析消息为指定类型(StringTextQueryStreamMessage)。
      • 调用拦截器前置/后置逻辑,执行消息ACK。
    • 框架注册
      • 作为抽象类,不直接注册,其子类(如StringTextStreamConsumer)注册。
    • 与其他类的交互
      • 被StringTextStreamConsumer继承。
      • 依赖RedisMQTemplate进行ACK。
      • 被YudaoRedisMQConsumerAutoConfiguration用于注册监听器。
  • 相关类:YudaoRedisMQConsumerAutoConfiguration
    • 作用:消费者配置类,负责注册Stream和Pub/Sub的监听容器。
    • 功能
      • 创建StreamMessageListenerContainer,配置Stream监听。
      • 注册StringTextStreamConsumer到容器,绑定Stream Key和消费者组。
      • 创建RedisPendingMessageResendJob处理未消费消息。
    • 框架注册
      • 标注@AutoConfiguration,在YudaoRedisAutoConfiguration之后加载。
      • 使用@Bean注册StreamMessageListenerContainer和RedisPendingMessageResendJob。
    • 与其他类的交互
      • 依赖RedisMQTemplate和StringTextStreamConsumer。
      • 为RedisPendingMessageResendJob提供监听器列表。
  • 代码分析
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {// 配置容器,注册监听器}
}

相关类:RedisPendingMessageResendJob

  • 作用:定时任务,重新投递Stream中未消费的超时消息。
  • 功能
    • 每分钟检查Pending消息,超时(默认5分钟)后重新投递。
    • 使用分布式锁(Redisson)确保单实例执行。
  • 框架注册
    • 由YudaoRedisMQConsumerAutoConfiguration注册为Bean。
    • 标注@Scheduled启用定时任务。
  • 与其他类的交互
    • 依赖RedisMQTemplate和StringTextStreamConsumer的监听器列表。
    • 使用RedissonClient实现分布式锁。
步骤5:业务逻辑处理
  • 相关类:StringTextService
    • 作用:业务服务层,执行分页查询逻辑。
    • 功能
      • 提供getUserPage方法,根据StringTextPageReqVO查询分页数据。
      • 返回PageResult<StringTextDO>。
    • 框架注册
      • 通常标注@Service,由Spring容器管理。
    • 与其他类的交互
      • 被StringTextStreamConsumer调用。
  • 相关类:StringTextPageReqVO
    • 作用:分页请求的VO,定义查询参数。
    • 功能
      • 包含text、pageNo、pageSize等字段。
      • 继承PageParam,支持分页参数。
    • 框架注册
      • 作为普通Java类,无需注册。
    • 与其他类的交互
      • 被StringTextController接收前端参数。
      • 被StringTextStreamConsumer用于构造查询参数。

第二部分:具体类解析

2.2 RedisMQTemplate类
/*** Redis MQ 操作模板类* 封装了基于Redis的两种消息模型的发送操作:* 1. Pub/Sub模式:发布订阅模式,适合广播消息* 2. Stream模式:消息流模式,适合需要消息顺序和可靠性的场景* 同时提供了消息拦截器机制,允许在消息发送前后执行自定义逻辑*/
@AllArgsConstructor
public class RedisMQTemplate {@Getterprivate final RedisTemplate<String, ?> redisTemplate;/*** 拦截器数组* 用于在消息发送前后执行自定义逻辑,如日志记录、指标收集、分布式追踪等*/@Getterprivate final List<RedisMessageInterceptor> interceptors = new ArrayList<>();/*** 发送 Redis 消息,基于 Redis pub/sub 实现* * @param message 继承自AbstractRedisChannelMessage的消息对象* @return void 无返回值* @功能 基于Redis的Pub/Sub模式发布消息,在发送前后分别执行拦截器的前置和后置处理*       使用JSON序列化消息内容,通过message.getChannel()获取消息发布的频道名称*/public <T extends AbstractRedisChannelMessage> void send(T message) {try {sendMessageBefore(message);// 发送消息redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));} finally {sendMessageAfter(message);}}/*** 发送 Redis 消息,基于 Redis Stream 实现* * @param message 继承自AbstractRedisStreamMessage的消息对象* @return RecordId Redis Stream生成的消息ID,格式如"1686475200000-0"* @功能 基于Redis的Stream模式发送消息,在发送前后分别执行拦截器的前置和后置处理*       使用JSON序列化消息内容,通过message.getStreamKey()获取Stream的键名*/public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);// 发送消息return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)) // 设置内容.withStreamKey(message.getStreamKey())); // 设置 stream key} finally {sendMessageAfter(message);}}/*** 添加拦截器* * @param interceptor 消息拦截器对象* @return void 无返回值* @功能 向拦截器列表中添加一个消息拦截器,拦截器将在消息发送前后执行自定义逻辑*/public void addInterceptor(RedisMessageInterceptor interceptor) {interceptors.add(interceptor);}/*** 消息发送前执行拦截器逻辑* * @param message 消息对象* @return void 无返回值* @功能 在消息发送前调用所有拦截器的前置处理方法,按拦截器添加的顺序执行(正序)*/private void sendMessageBefore(AbstractRedisMessage message) {// 正序执行拦截器的前置处理方法interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));}/*** 消息发送后执行拦截器逻辑* * @param message 消息对象* @return void 无返回值* @功能 在消息发送后调用所有拦截器的后置处理方法,按拦截器添加顺序的逆序执行(倒序)*/private void sendMessageAfter(AbstractRedisMessage message) {// 倒序执行拦截器的后置处理方法for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).sendMessageAfter(message);}}}
2.3 消息父类及其子类
AbstractRedisChannelMessageListener自定义监听类父类
/*** Redis Pub/Sub 监听器抽象类,用于实现广播消费** @param <T> 消息类型。一定要填写噢,不然会报错*/
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {/*** 消息类型*/private final Class<T> messageType;/*** Redis Channel*/private final String channel;/*** RedisMQTemplate*/@Setterprivate RedisMQTemplate redisMQTemplate;/*** messageType 在 onMessage 方法中用于将 Redis 收到的消息(JSON 字符串)反序列化为 T 类型的对象* 通过 JsonUtils.parseObject(message.getBody(), messageType),明确的消息类型确保了正确的反序列化* channel 字段用于指定监听器订阅的 Redis Pub/Sub 通道* getChannel() 方法允许每个消息类型定义自己的通道,使监听器能够灵活适应不同的消息类型和通道* messageType 确定了后续消息反序列化的目标类型,确保消息被解析为 StringTextQueryChannelMessage。* channel 确定了监听器订阅的 Redis 通道,Spring Data Redis 会使用 getChannel() 的返回值("string.text.query.channel")订阅该通道。* @SneakyThrows 隐藏了反射相关的异常(如 NoSuchMethodException),假设 StringTextQueryChannelMessage 有无参构造函数且 getChannel() 可访问。*/@SneakyThrowsprotected AbstractRedisChannelMessageListener() {//通过反射获取子类的泛型参数this.messageType = getMessageClass();/*** 具体案例:* StringTextQueryChannelMessage 的无参构造函数* 调用 newInstance() 创建一个 StringTextQueryChannelMessage 实例。* 调用实例的 getChannel() 方法,获取通道名称(如 "string.text.query.channel")*/this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();}/*** 获得 Sub 订阅的 Redis Channel 通道** @return channel*/public final String getChannel() {return channel;}/*** Message message:Redis 传递的原始消息对象,包含消息体(message.getBody())和通道信息* T messageObj = JsonUtils.parseObject(message.getBody(), messageType)* 使用 JsonUtils.parseObject 将消息体(message.getBody(),字节数组)反序列化为 T 类型的对象* messageType 是构造函数中初始化的消息类(Class<T>),确保消息被正确解析为指定类型(如 OrderMessage)* 调用 this.onMessage(messageObj),这是抽象方法,子类必须实现以定义具体的消息处理* messageType 是 StringTextQueryChannelMessage.class(由构造函数设置)。* 具体案例:* JsonUtils.parseObject(message.getBody(), messageType)* 将 JSON 字节数组解析为 StringTextQueryChannelMessage 对象:* @param message* @param bytes*/@Overridepublic final void onMessage(Message message, byte[] bytes) {T messageObj = JsonUtils.parseObject(message.getBody(), messageType);try {consumeMessageBefore(messageObj);// 消费消息this.onMessage(messageObj);} finally {consumeMessageAfter(messageObj);}}/*** 处理消息** @param message 消息*/public abstract void onMessage(T message);/*** 通过解析类上的泛型,获得消息类型** @return 消息类型*/@SuppressWarnings("unchecked")private Class<T> getMessageClass() {Type type = TypeUtil.getTypeArgument(getClass(), 0);if (type == null) {throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));}return (Class<T>) type;}private void consumeMessageBefore(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 正序interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));}private void consumeMessageAfter(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 倒序for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).consumeMessageAfter(message);}}
StringTextChannelConsumerListener自定义监听器实现类
@Slf4j
@Component
public class StringTextChannelConsumerListener extends AbstractRedisChannelMessageListener<StringTextQueryChannelMessage> {@Resourceprivate StringTextService stringTextService;/*** 在 StringTextChannelConsumerListener 实例化时(由 Spring 容器管理)* 会调用父类 AbstractRedisChannelMessageListener 的构造函数* 当 Redis 通道 "string.text.query.channel" 收到消息时* Spring Data Redis 调用 StringTextChannelConsumerListener 的 onMessage(Message, byte[]) 方法(继承自父类)* @param message 消息*/@Overridepublic void onMessage(StringTextQueryChannelMessage message) {log.info("[onMessage][Channel 消息: {},线程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Channel 处理结果: {}]", pageResult);}
}

监听器执行逻辑:

  • Spring Data Redis 的调用
    • 当通道有消息时,Spring Data Redis 调用 MessageListener 的 onMessage(Message, byte[]),即 AbstractRedisChannelMessageListener 的实现。
  • 父类到子类的调用
    • 父类的 onMessage(Message, byte[]) 处理消息反序列化和拦截器逻辑,然后通过 this.onMessage(T) 调用子类的 onMessage(StringTextQueryChannelMessage)。
  • 子类没有直接调用父类
    • 子类的 onMessage 是父类调用的结果,不是子类主动调用父类。
    • 子类的 onMessage 只处理业务逻辑,依赖父类的预处理。
  • 构造函数的作用
    • 初始化 messageType 和 channel,确保消息反序列化和通道订阅正确,直接影响子类 onMessage 的执行。

这种设计通过模板方法模式实现了通用逻辑和业务逻辑的分离,父类控制流程,子类提供具体实现。

2.4 配置类
YudaoRabbitMQAutoConfiguration
/*** RabbitMQ 消息队列配置类* 配置RabbitMQ的消息转换器,使用Jackson进行JSON序列化* 当生产者发送消息时,将Java对象转换为JSON格式的字节数组* 当消费者接收消息时,将JSON格式的消息转换回Java对象*/
@AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class YudaoRabbitMQAutoConfiguration {/*** Jackson2JsonMessageConverter Bean:使用 jackson 序列化消息*/@Beanpublic MessageConverter createMessageConverter() {return new Jackson2JsonMessageConverter();}
}

YudaoRabbitMQAutoConfiguration:配置 RabbitMQ 消息转换器,使用 Jackson 进行 JSON 序列化。

YudaoRedisMQConsumerAutoConfiguration

/*** Redis 消息队列 Consumer 配置类* redisMessageListenerContainer(): 创建Pub/Sub消息监听容器* redisStreamMessageListenerContainer(): 创建Stream消息监听容器* redisPendingMessageResendJob(): 创建待处理消息重发任务*/
@Slf4j
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {/*** 创建 Redis Pub/Sub 广播消费的容器*/@Bean@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听public RedisMessageListenerContainer redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {// 创建 RedisMessageListenerContainer 对象RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 设置 RedisConnection 工厂。container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());// 添加监听器listeners.forEach(listener -> {listener.setRedisMQTemplate(redisMQTemplate);container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",listener.getChannel(), listener.getClass().getName());});return container;}/*** 创建 Redis Stream 重新消费的任务*/@Bean@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,RedisMQTemplate redisTemplate,@Value("${spring.application.name}") String groupName,RedissonClient redissonClient) {return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);}/*** 创建 Redis Stream 集群消费的容器** 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>*/@Bean(initMethod = "start", destroyMethod = "stop")@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();checkRedisVersion(redisTemplate);// 第一步,创建 StreamMessageListenerContainer 容器// 创建 options 配置StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10) // 一次性最多拉取多少条消息.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化.build();// 创建 container 对象StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);// 第二步,注册监听器,消费对应的 Stream 主题String consumerName = buildConsumerName();listeners.parallelStream().forEach(listener -> {log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",listener.getStreamKey(), listener.getClass().getName());// 创建 listener 对应的消费者分组try {redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());} catch (Exception ignore) {}// 设置 listener 对应的 redisTemplatelistener.setRedisMQTemplate(redisMQTemplate);// 创建 Consumer 对象Consumer consumer = Consumer.from(listener.getGroup(), consumerName);// 设置 Consumer 消费进度,以最小消费进度为准StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());// 设置 Consumer 监听StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(false) // 不自动 ack.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 falsecontainer.register(builder.build(), listener);log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",listener.getStreamKey(), listener.getClass().getName());});return container;}/*** 构建消费者名字,使用本地 IP + 进程编号的方式。* 参考自 RocketMQ clientId 的实现** @return 消费者名字*/private static String buildConsumerName() {return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());}/*** 校验 Redis 版本号,是否满足最低的版本号要求!*/private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {// 获得 Redis 版本Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);String version = MapUtil.getStr(info, "redis_version");// 校验最低版本必须大于等于 5.0.0int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));if (majorVersion < 5) {throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));}}}

方法 1: redisMessageListenerContainer

  • 作用
    • 创建并配置 RedisMessageListenerContainer,用于监听 Redis Pub/Sub 通道消息。
    • 注册所有 AbstractRedisChannelMessageListener 子类的监听器(如 StringTextChannelConsumerListener)。

方法 2: redisPendingMessageResendJob

作用

  • 创建 RedisPendingMessageResendJob Bean,定时重发 Redis Stream 的待处理(pending)消息。

方法 3: redisStreamMessageListenerContainer

  • 作用
    • 创建并配置 StreamMessageListenerContainer,用于监听 Redis Stream 消息。
    • 注册所有 AbstractRedisStreamMessageListener 子类的监听器。
  • 输入参数
    • RedisMQTemplate redisMQTemplate:提供 Redis 连接和拦截器。
    • List<AbstractRedisStreamMessageListener<?>> listeners:所有 Stream 监听器。
  • 输出:StreamMessageListenerContainer<String, ObjectRecord<String, String>>,用于 Stream 消息消费。
  • 实现逻辑
    1. 检查 Redis 版本
      • checkRedisVersion(redisTemplate); 确保 Redis 版本 ≥ 5.0.0(支持 Stream)。
    2. 创建容器
      • 配置 containerOptions:
        • batchSize(10):每次拉取最多 10 条消息。
        • targetType(String.class):消息体为 String 类型(JSON 字符串),由监听器反序列化。
      • 创建 StreamMessageListenerContainer。
    3. 注册监听器
      • 使用 parallelStream 并行处理监听器,提高效率。
      • 对每个监听器:
        • 创建消费者组(createGroup)。
        • 注入 redisMQTemplate。
        • 创建 Consumer(组名 + 消费者名)。
        • 设置 StreamOffset(从最后消费位置读取)。
        • 注册监听器,配置手动 ACK 和错误处理。
      • 日志记录注册过程。
2.5 RedisPendingMessageResendJob(定时任务类)
/*** 这个任务用于处理,crash 之后的消费者未消费完的消息* 重新分发信息(清理员)*/
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {private static final String LOCK_KEY = "redis:pending:msg:lock";/*** 消息超时时间,默认 5 分钟** 1. 超时的消息才会被重新投递* 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到*/private static final int EXPIRE_TIME = 5 * 60;private final List<AbstractRedisStreamMessageListener<?>> listeners;private final RedisMQTemplate redisTemplate;private final String groupName;private final RedissonClient redissonClient;/*** 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题*/@Scheduled(cron = "35 * * * * ?")public void messageResend() {RLock lock = redissonClient.getLock(LOCK_KEY);// 尝试加锁if (lock.tryLock()) {try {execute();} catch (Exception ex) {log.error("[messageResend][执行异常]", ex);} finally {lock.unlock();}}}/*** 执行清理逻辑** @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">讨论</a>*/private void execute() {//redisTemplate.getRedisTemplate().opsForStream() 获取 Redis Stream 的操作接口,用于执行 pending、range、add、acknowledge 等操作。StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();//检查每条传送带listeners.forEach(listener -> {//ops.pending(listener.getStreamKey(), groupName):查询 Stream 和消费者组的 pending 消息概况。PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));// 每个消费者的 pending 队列消息数量(检查每个传送带(Stream)上有哪些卡住的包裹(pending 消息))Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);// 每个消费者的 pending消息的详情信息,getStreamKey():返回 Stream 名称(如 "string.text.query.stream"),pendingMessagesPerConsumer:Map,键是消费者名称,值是 pending 消息数量PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);if (pendingMessages.isEmpty()) {return;}pendingMessages.forEach(pendingMessage -> {// 获取消息上一次传递到 consumer 的时间,long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();if (lastDelivery < EXPIRE_TIME){return;}// 获取指定 id 的消息体(对于超时的包裹,清理员去传送带上找到它的具体内容(records),比如包裹里装了啥(JSON 数据)。如果没找到(比如包裹被删了),就跳过)List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));if (CollUtil.isEmpty(records)) {return;}// 重新投递消息redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord().ofObject(records.get(0).getValue()) // 设置内容.withStreamKey(listener.getStreamKey()));// ack 消息消费完成redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());});});});}
}
  • RedisPendingMessageResendJob 是一个定时任务类,专门用于处理 Redis Stream 消息队列中因消费者异常(如崩溃)而未确认(ACK)的待处理(pending)消息。
  • 它通过定期扫描消费者组的 pending 消息,检查消息是否超时(默认 5 分钟),并将超时的消息重新投递到 Stream,同时确认(ACK)原消息,以避免消息丢失或重复处理。

3. 总结

3.1 整体背景

想象你在一家快递公司(你的系统)工作,Redis 消息队列就像公司的两条传送带:

  • Pub/Sub 传送带(广播模式):像广播电台,包裹(消息)发出去,所有听众(消费者)都能收到,但包裹不会保存。
  • Stream 传送带(集群模式):像物流流水线,包裹有编号(消息 ID),可以保存、追踪,多个快递员(消费者)分工处理。

实现代码CurdTestController 负责把包裹放上传送带,框架类代码负责管理传送带、派送包裹、处理异常(比如包裹卡住)。下面,我会先介绍每个角色的作用(类分析),然后讲一个包裹从发出到送达的故事(全过程)。


3.2 类结构与作用分析

3.2.1 框架的 Redis MQ 相关类
  1. YudaoRedisMQProducerAutoConfiguration
    • 作用:像快递公司的“生产设备管理员”,负责初始化消息发送工具(RedisMQTemplate)。
    • 职责
      • 创建 RedisMQTemplate Bean,注入 StringRedisTemplate 和拦截器(RedisMessageInterceptor)。
      • 配置拦截器,增强消息发送的扩展性(如日志、租户处理)。
    • 关键方法
      • redisMQTemplate:构造 RedisMQTemplate,添加拦截器。
    • 使用场景:Spring 启动时,自动配置 RedisMQTemplate,供生产者(如 StringTextRedisProducer)使用。
  2. YudaoRedisMQConsumerAutoConfiguration
    • 作用:像“派送中心管理员”,负责配置 Pub/Sub 和 Stream 的消息监听容器,以及异常包裹清理任务。
    • 职责
      • 配置 RedisMessageListenerContainer:监听 Pub/Sub 通道(如 string-text-query-channel)。
      • 配置 StreamMessageListenerContainer:监听 Stream 队列(如 string-text-query-stream)。
      • 创建 RedisPendingMessageResendJob:定时清理 Stream 的待处理(pending)包裹。
      • 检查 Redis 版本,确保支持 Stream(≥5.0.0)。
      • 生成消费者名称(如 192.168.1.1@1234)。
    • 关键方法
      • redisMessageListenerContainer:注册 Pub/Sub 监听器。
      • redisStreamMessageListenerContainer:注册 Stream 监听器,设置批处理(10 条/次)、手动 ACK。
      • redisPendingMessageResendJob:创建定时任务。
      • buildConsumerName:生成唯一消费者名称。
      • checkRedisVersion:验证 Redis 版本。
    • 使用场景:Spring 启动时,自动配置消费者容器,监听消息并处理异常。
  3. RedisMQTemplate
    • 作用:像“传送带操作员”,负责把包裹放上 Pub/Sub 或 Stream 传送带,并支持拦截器扩展。
    • 职责
      • 发送 Pub/Sub 消息:convertAndSend 到指定通道。
      • 发送 Stream 消息:opsForStream().add 到指定 Stream,返回消息 ID。
      • 执行拦截器:发送前后调用 sendMessageBefore/sendMessageAfter。
      • 提供 opsForStream() 等接口,供定时任务(如 RedisPendingMessageResendJob)操作 Stream。
    • 关键方法
      • send(T extends AbstractRedisChannelMessage):发送 Pub/Sub 消息。
      • send(T extends AbstractRedisStreamMessage):发送 Stream 消息。
      • addInterceptor:添加拦截器。
    • 使用场景:被生产者(如 StringTextRedisProducer)调用发送消息,被定时任务调用操作 Stream。
  4. RedisMessageInterceptor
    • 作用:像“包裹检查员”,在消息发送或消费前后进行额外处理(如日志、租户隔离)。
    • 职责
      • 定义接口:sendMessageBefore/After、consumeMessageBefore/After。
      • 默认空实现,允许自定义扩展。
    • 使用场景:由 RedisMQTemplate 在消息发送/消费时调用,典型用于多租户场景或日志记录。
  5. RedisPendingMessageResendJob
    • 作用:像“包裹清理员”,定时(每分钟 35 秒)检查 Stream 传送带上卡住的包裹(pending 消息),重新投递超时的包裹。
    • 职责
      • 使用分布式锁(RedissonClient)确保单一实例执行。
      • 扫描每个 Stream 的 pending 消息,检查超时(5 分钟)。
      • 重新投递超时消息,确认(ACK)原消息。
    • 关键方法
      • messageResend:定时任务入口,加锁调用 execute。
      • execute:扫描 pending 消息,重新投递并 ACK。
    • 使用场景:消费者崩溃未确认消息时,定时重发确保消息不丢失。
  6. AbstractRedisMessage
    • 作用:像“包裹模板”,定义消息的基本结构。
    • 职责
      • 提供 headers(键值对),存储元数据(如租户 ID)。
      • 抽象基类,供具体消息类继承。
    • 使用场景:被 AbstractRedisChannelMessage 和 AbstractRedisStreamMessage 继承。
  7. AbstractRedisChannelMessage
    • 作用:像“Pub/Sub 包裹模板”,定义 Pub/Sub 消息的结构。
    • 职责
      • 提供 getChannel(),默认返回类名,子类可自定义通道(如 string-text-query-channel)。
      • 忽略序列化通道名(@JsonIgnore)。
    • 使用场景:被你的 StringTextQueryChannelMessage 继承。
  8. AbstractRedisStreamMessage
    • 作用:像“Stream 包裹模板”,定义 Stream 消息的结构。
    • 职责
      • 提供 getStreamKey(),默认返回类名,子类可自定义 Stream(如 string-text-query-stream)。
      • 忽略序列化 Stream 键。
    • 使用场景:被你的 StringTextQueryStreamMessage 继承。
  9. AbstractRedisChannelMessageListener<T>
    • 作用:像“Pub/Sub 快递员”,监听 Pub/Sub 通道,处理消息。
    • 职责
      • 自动获取消息类型(messageType)和通道(channel)。
      • 反序列化消息(JsonUtils.parseObject)。
      • 执行拦截器(consumeMessageBefore/After)。
      • 调用子类的 onMessage 处理消息。
    • 关键方法
      • onMessage(Message, byte[]):处理原始 Redis 消息。
      • onMessage(T):抽象方法,子类实现具体逻辑。
    • 使用场景:被你的 StringTextChannelConsumerListener 继承。
  10. AbstractRedisStreamMessageListener<T>
    • 作用:像“Stream 快递员”,监听 Stream 队列,处理消息。
    • 职责
      • 自动获取消息类型(messageType)、Stream 键(streamKey)、消费者组(group)。
      • 反序列化消息,执行拦截器,调用子类 onMessage。
      • 手动 ACK 消息(opsForStream().acknowledge)。
    • 关键方法
      • onMessage(ObjectRecord):处理 Stream 消息。
      • onMessage(T):抽象方法,子类实现。
    • 使用场景:被你的 StringTextStreamConsumerListener 继承。

3.2.2 实现定义类
  1. CurdTestController
    • 作用:像“客户服务中心”,接收用户请求,触发消息发送或异步查询。
    • 职责
      • 提供 REST 接口:
        • /getStringText:异步查询(StringTextService.getUserPageAsync)。
        • /getStringTextByMQ:触发 MQ 消息(未实现具体生产者)。
        • /getStringTextByStream:发送 Stream 消息(StringTextRedisProducer.sendStreamMessage)。
        • /getStringTextByChannel:发送 Pub/Sub 消息(StringTextRedisProducer.sendChannelMessage)。
      • 记录日志,处理异常。
    • 使用场景:用户通过 HTTP 请求触发消息队列或异步处理。
  2. StringTextRedisProducer
    • 作用:像“包裹打包员”,创建并发送 Pub/Sub 和 Stream 消息。
    • 职责
      • sendStreamMessage:发送 StringTextQueryStreamMessage 到 string-text-query-stream。
      • sendChannelMessage:发送 StringTextQueryChannelMessage 到 string-text-query-channel。
      • 使用 RedisMQTemplate 发送消息,记录日志。
    • 使用场景:被 CurdTestController 调用,发送查询请求到消息队列。
  3. StringTextChannelConsumerListener
    • 作用:像“Pub/Sub 派送员”,监听 string-text-query-channel,处理消息。
    • 职责
      • 接收 StringTextQueryChannelMessage,转换为 StringTextPageReqVO。
      • 调用 StringTextService.getUserPage 查询数据。
      • 记录处理日志。
    • 使用场景:处理 Pub/Sub 广播消息,适合实时通知场景。
  4. StringTextStreamConsumerListener
    • 作用:像“Stream 派送员”,监听 string-text-query-stream,处理消息。
    • 职责
      • 接收 StringTextQueryStreamMessage,转换为 StringTextPageReqVO。
      • 调用 StringTextService.getUserPage 查询数据。
      • 手动 ACK 消息,记录日志。
    • 使用场景:处理 Stream 集群消息,适合持久化、可靠投递场景。
  5. StringTextQueryChannelMessage
    • 作用:像“Pub/Sub 包裹”,定义 Pub/Sub 消息结构。
    • 职责
      • 包含 text、pageNo、pageSize,校验非空。
      • 指定通道 string-text-query-channel。
    • 使用场景:由 StringTextRedisProducer 发送,StringTextChannelConsumerListener 消费。
  6. StringTextQueryStreamMessage
    • 作用:像“Stream 包裹”,定义 Stream 消息结构。
    • 职责
      • 包含 text、pageNo、pageSize,校验非空。
      • 指定 Stream string-text-query-stream。
    • 使用场景:由 StringTextRedisProducer 发送,StringTextStreamConsumerListener 消费。
  7. StringTextServiceImpl
    • 作用:像“数据仓库”,提供数据查询服务。
    • 职责
      • getUserPage:同步查询分页数据。
      • getUserPageAsync:异步查询,使用线程池(curdAsyncExecutor)。
      • 调用 StringTextMapper 访问数据库。
    • 使用场景:被控制器和消费者调用,处理查询逻辑。

3.3 文字图总结

3.3.1 图示总结
YudaoRedisMQProducerAutoConfiguration
└─ redisMQTemplate(StringRedisTemplate redisTemplate, List<RedisMessageInterceptor> interceptors)│  │  // 初始化消息发送工具,注入 Redis 模板和拦截器(默认空)│  ├─ Parameters:│  ├─ redisTemplate: StringRedisTemplate // Redis 操作模板│  └─ interceptors: List<RedisMessageInterceptor> // 拦截器列表(默认空)│  └─ Returns: RedisMQTemplate│  └─ RedisMQTemplate(redisTemplate)│  └─ addInterceptor(interceptor) // 添加拦截器(默认空,支持链式调用)
YudaoRedisMQConsumerAutoConfiguration
├─ checkRedisVersion(StringRedisTemplate redisTemplate)
│  │  
│  │  // 校验 Redis 版本 ≥ 5.0.0
│  │  
│  ├─ Parameters: redisTemplate: StringRedisTemplate
│  └─ Returns: void
│
├─ redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners)
│  │  
│  │  // 配置 Pub/Sub 监听容器(基于 Redis 发布订阅模式)
│  │  
│  ├─ Parameters:
│  │  ├─ redisMQTemplate: RedisMQTemplate
│  │  └─ listeners: List<AbstractRedisChannelMessageListener<?>> 
│  │       // 包含 StringTextChannelConsumerListener 实现类
│  │  
│  └─ Returns: RedisMessageListenerContainer
│       │  
│       └─ addMessageListener(StringTextChannelConsumerListener listener, ChannelTopic("string-text-query-channel"))
│           │  
│           │  // 绑定监听器到指定通道(channel)
│           │  
│           ├─ Parameters:
│           │  ├─ listener: StringTextChannelConsumerListener
│           │  └─ topic: ChannelTopic("string-text-query-channel") 
│           │       // 通道名称固定为 string-text-query-channel
│           │  
│           └─ StringTextChannelConsumerListener.setRedisMQTemplate(redisMQTemplate)
│               // 注入消息发送模板,用于消费时的响应逻辑
│
├─ redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners)
│  │  
│  │  // 配置 Stream 监听容器(基于 Redis Stream 数据结构)
│  │  
│  ├─ Parameters:
│  │  ├─ redisMQTemplate: RedisMQTemplate
│  │  └─ listeners: List<AbstractRedisStreamMessageListener<?>> 
│  │       // 包含 StringTextStreamConsumerListener 实现类
│  │  
│  └─ Returns: StreamMessageListenerContainer
│       │  
│       └─ register(StreamReadRequest("string-text-query-stream", Consumer("my-app", "192.168.1.1@1234"), StringTextStreamConsumerListener listener))
│           │  
│           │  // 绑定监听器到指定 Stream(支持消费者组模式)
│           │  
│           ├─ Parameters:
│           │  ├─ request: StreamReadRequest 
│           │  │  // StreamKey: string-text-query-stream 
│           │  │  // Consumer: my-app@192.168.1.1@1234(消费者组名+实例标识)
│           │  └─ listener: StringTextStreamConsumerListener
│           │  
│           ├─ redisTemplate.opsForStream().createGroup("string-text-query-stream", "my-app")
│           │  // 自动创建消费者组(若不存在),组名与应用名一致
│           │  
│           └─ StringTextStreamConsumerListener.setRedisMQTemplate(redisMQTemplate)
│               // 注入消息发送模板,用于消费时的响应逻辑
│
└─ redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient)│  │  // 创建定时任务,处理 Stream 中未确认的 pending 消息│  // 防止消息堆积,保证至少一次投递语义│  ├─ Parameters:│  ├─ listeners: List<AbstractRedisStreamMessageListener<?>> │  │       // 包含 StringTextStreamConsumerListener 实现类│  ├─ redisTemplate: RedisMQTemplate│  ├─ groupName: String // 默认使用 spring.application.name("my-app")│  └─ redissonClient: RedissonClient // 分布式锁,保证任务单点执行│  └─ Returns: RedisPendingMessageResendJob// 定时任务会定期扫描 pending 消息并重新投递
通过 Stream 发送消息的流程
CurdTestController
└─ getStringTextByStream(StringTextPageReqVO pageReqVO)│  │  // 处理 Stream 消息请求│  Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│  Returns: "Stream 消息已发送,异步处理中"│  └─ StringTextRedisProducer.sendStreamMessage(text, pageNo, pageSize)│  │  // 发送 Stream 消息(参数:example, 1, 10)│  Returns: void│  └─ RedisMQTemplate.send(StringTextQueryStreamMessage message)│  │  // 发送到 Stream(消息内容包含 text/pageNo/pageSize)│  Returns: RecordId(如 "1234567890-0")│  ├─ sendMessageBefore(message)│  // 调用消息发送前的拦截器(默认无操作)│  ├─ JsonUtils.toJsonString(message)│  // 序列化消息为 JSON│  Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│  ├─ redisTemplate.opsForStream().add(StreamRecords)│  // 添加到 Stream(StreamKey: string-text-query-stream)│  Parameters: JSON 字符串 + StreamKey│  Returns: RecordId(消息唯一标识)│  └─ sendMessageAfter(message)// 调用消息发送后的拦截器(默认无操作)
StreamMessageListenerContainer
└─ StringTextStreamConsumerListener.onMessage(ObjectRecord<String, String> message)│  │  // 处理 Stream 消息(参数:包含 StreamKey 和 JSON 内容的记录)│  Returns: void│  ├─ JsonUtils.parseObject(message.getValue(), StringTextQueryStreamMessage.class)│  // 反序列化 JSON 为消息对象│  Returns: {text="example", pageNo=1, pageSize=10}│  ├─ consumeMessageBefore(messageObj)│  // 调用消息消费前的拦截器(默认无操作)│  ├─ onMessage(StringTextQueryStreamMessage message)│  // 子类自定义处理逻辑(由业务实现)│  Parameters: 反序列化后的消息对象│  Returns: void│  │  └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│      │  │      │  // 业务逻辑:查询数据│      │  Parameters: {text="example", pageNo=1, pageSize=10}│      │  Returns: 分页结果(包含查询到的数据)│      │  │      └─ StringTextMapper.selectPage(reqVO)│          // 数据库层查询│          Returns: 从数据库获取的分页结果│  ├─ redisMQTemplate.getRedisTemplate().opsForStream().acknowledge("my-app", message)│  // 关键步骤:确认消息已消费│  Parameters: 消费者组名 + 消息记录│  Returns: void│  (若不确认,消息会被视为 pending 并由定时任务重新投递)│  └─ consumeMessageAfter(messageObj)// 调用消息消费后的拦截器(默认无操作)
通过 Pub/Sub 发送消息的流程
CurdTestController
└─ getStringTextByChannel(StringTextPageReqVO pageReqVO)│  │  // 处理 Pub/Sub 消息请求│  Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│  Returns: "Channel 消息已发送,异步处理中"│  └─ StringTextRedisProducer.sendChannelMessage(text, pageNo, pageSize)│  │  // 发送 Pub/Sub 消息(参数:example, 1, 10)│  Returns: void│  └─ RedisMQTemplate.send(StringTextQueryChannelMessage message)│  │  // 发送到通道(channel: string-text-query-channel)│  Returns: void│  ├─ sendMessageBefore(message)│  // 调用消息发送前的拦截器(默认无操作)│  ├─ JsonUtils.toJsonString(message)│  // 序列化消息为 JSON│  Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│  ├─ redisTemplate.convertAndSend("string-text-query-channel", json)│  // 发布到 Redis 通道│  Parameters: 通道名 + JSON 字符串│  Returns: void(无返回值,Fire-and-Forget 模式)│  └─ sendMessageAfter(message)// 调用消息发送后的拦截器(默认无操作)
RedisMessageListenerContainer
└─ StringTextChannelConsumerListener.onMessage(Message message, byte[] bytes)│  │  // 处理 Pub/Sub 消息(参数:消息对象+通道字节数组)│  Returns: void(无返回值,Fire-and-Forget 模式)│  ├─ JsonUtils.parseObject(message.getBody(), StringTextQueryChannelMessage.class)│  // 反序列化 JSON 为消息对象│  Returns: {text="example", pageNo=1, pageSize=10}│  ├─ consumeMessageBefore(messageObj)│  // 调用消息消费前的拦截器(默认无操作)│  ├─ onMessage(StringTextQueryChannelMessage message)│  // 子类自定义处理逻辑(由业务实现)│  Parameters: 反序列化后的消息对象│  Returns: void│  │  └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│      │  │      │  // 业务逻辑:查询数据│      │  Parameters: {text="example", pageNo=1, pageSize=10}│      │  Returns: 分页结果(包含查询到的数据)│      │  │      └─ StringTextMapper.selectPage(reqVO)│          // 数据库层查询│          Returns: 从数据库获取的分页结果│  └─ consumeMessageAfter(messageObj)// 调用消息消费后的拦截器(默认无操作)

RedisPendingMessageResendJob 定时任务流程
3.3.2 类作用总结
  • 框架类
    • YudaoRedisMQProducerAutoConfiguration:初始化生产工具。
    • YudaoRedisMQConsumerAutoConfiguration:配置消费容器和定时任务。
    • RedisMQTemplate:发送消息,操作 Stream.
    • RedisMessageInterceptor:扩展点(未使用)。
    • RedisPendingMessageResendJob:重发 pending 消息。
    • AbstractRedisMessage:消息基类。
    • AbstractRedisChannelMessage:Pub/Sub 消息模板。
    • AbstractRedisStreamMessage:Stream 消息模板。
    • AbstractRedisChannelMessageListener:Pub/Sub 消费者基类。
    • AbstractRedisStreamMessageListener:Stream 消费者基类。
  • 具体业务实现类
    • CurdTestController:触发消息发送。
    • StringTextRedisProducer:发送消息。
    • StringTextChannelConsumerListener:处理 Pub/Sub 消息。
    • StringTextStreamConsumerListener:处理 Stream 消息。
    • StringTextQueryChannelMessage:Pub/Sub 消息。
    • StringTextQueryStreamMessage:Stream 消息。
    • StringTextServiceImpl:查询数据。

http://www.xdnf.cn/news/986131.html

相关文章:

  • C++11的特性上
  • Cursor 编程实践 — 开发环境部署
  • 案例8 模型量化
  • 使用MyBatis-Plus实现数据权限功能
  • 【Unity3D优化】优化多语言字体包大小
  • swagger通过配置将enum自动添加到字段说明中
  • PHP如何检查一个字符串是否是email格式
  • 【微信小程序】| 在线咖啡点餐平台设计与实现
  • 华为云Flexus+DeepSeek征文 | 基于华为云ModelArts Studio打造AingDesk AI聊天助手
  • list类型
  • SCADA|测试KingSCADA4.0信创版采集汇川PLC AC810数据
  • 开源夜莺支持MySQL数据源,更方便做业务指标监控了
  • xss分析
  • C2f模块 vs Darknet-53——YOLOv8检测效率的提升
  • 9.IP数据包分片计算
  • HNCTF2025 - Misc、Osint、Crypto WriteUp
  • 第三讲 基础运算之整数运算
  • 什么是数字化项目风险管理?如何实现项目风险管理数字化?
  • IIS 实现 HTTPS:OpenSSL证书生成与配置完整指南
  • 突然虚拟机磁盘只剩下几十K
  • [特殊字符] React 与 Vue 源码级对比:5大核心差异与实战选择指南
  • # include<heαd.h>和# include″heαd.h″的区别
  • 成都国际数字影像产业园孵化培育模式的探索与突破
  • 人机交互设计知识点总结
  • 驻波比(VSWR)详解
  • 判断字符串子序列
  • OpenAI o3-pro深度解析:87%降价背后的AI战略,AGI发展迈入新阶段!
  • 自动托盘搬运车是什么?它的工作逻辑如何实现物流自动化?
  • Python训练营打卡 Day51
  • 日本滨松R669光电倍增管Hamamatsu直径51 mm 直径端窗型扩展红多碱光阴极面光谱灵敏度特性:300 至 900 nm