当前位置: 首页 > news >正文

实现基于数据库 flag 状态的消息消费控制

网罗开发(小红书、快手、视频号同名)

  大家好,我是 展菲,目前在上市企业从事人工智能项目研发管理工作,平时热衷于分享各种编程领域的软硬技能知识以及前沿技术,包括iOS、前端、Harmony OS、Java、Python等方向。在移动端开发、鸿蒙开发、物联网、嵌入式、云原生、开源等领域有深厚造诣。

图书作者:《ESP32-C3 物联网工程开发实战》
图书作者:《SwiftUI 入门,进阶与实战》
超级个体:COC上海社区主理人
特约讲师:大学讲师,谷歌亚马逊分享嘉宾
科技博主:华为HDE/HDG

我的博客内容涵盖广泛,主要分享技术教程、Bug解决方案、开发工具使用、前沿科技资讯、产品评测与使用体验。我特别关注云服务产品评测、AI 产品对比、开发板性能测试以及技术报告,同时也会提供产品优缺点分析、横向对比,并分享技术沙龙与行业大会的参会体验。我的目标是为读者提供有深度、有实用价值的技术洞察与分析。

展菲:您的前沿技术领航员
👋 大家好,我是展菲!
📱 全网搜索“展菲”,即可纵览我在各大平台的知识足迹。
📣 公众号“Swift社区”,每周定时推送干货满满的技术长文,从新兴框架的剖析到运维实战的复盘,助您技术进阶之路畅通无阻。
💬 微信端添加好友“fzhanfei”,与我直接交流,不管是项目瓶颈的求助,还是行业趋势的探讨,随时畅所欲言。
📅 最新动态:2025 年 3 月 17 日
快来加入技术社区,一起挖掘技术的无限潜能,携手迈向数字化新征程!


文章目录

    • 摘要
    • 引言
    • 思路分析
    • 项目结构
    • 核心代码示例
      • 1. 消费者类 FlagCheckingConsumer
      • 2. 业务判断类 FlagService
      • 3. 数据访问层 FlagRepository
      • 4. 消息模型 MessagePayload
    • 场景举例
      • 场景一:订单支付后才能发货
      • 场景二:用户实名认证后才能开通服务
      • 场景三:批量导入异步任务
    • QA 环节
    • 总结

摘要

在分布式系统中,消息队列(比如 RocketMQ、Kafka、RabbitMQ)是常见的解耦手段。但是实际业务里经常遇到这种需求:
消费者拿到消息以后,不能立刻消费,而是要根据数据库里的某个业务标志(flag)来判断是否应该消费。

举个例子:一个订单的消息到了,只有当数据库里 flag=true 才允许真正消费,否则就需要跳过或者延迟处理。

本文会结合 RocketMQ,写一个基于数据库 flag 状态的消息消费控制 Demo。

引言

为什么会有这样的需求?

  • 某些业务有“前置条件”,比如订单必须支付完成、审核通过,才能进行后续处理;
  • 消息到达时,数据库可能还没更新状态,如果直接消费会造成数据不一致;
  • 我们需要一种“柔性控制”,让消息的消费时机依赖于数据库里的 flag 状态。

在这种场景下,消息的消费逻辑就不再是单纯的“来了就消费”,而是要先查询数据库。

思路分析

实现逻辑可以拆成几步:

  1. 消息里必须带有业务 ID(如订单 ID、用户 ID);

  2. 消费者收到消息后,先查数据库,看这个 ID 对应的 flag 值;

  3. 如果 flag=true,则继续消费业务逻辑;

  4. 如果 flag=false,则跳过消息,可以选择:

    • 直接忽略;
    • 写入日志方便排查;
    • 重新投递到延迟队列,稍后再试。

这样可以保证业务的一致性。

项目结构

我们搭建一个 Spring Boot + RocketMQ 的小项目,目录大致如下:

rocketmq-flag-consumer/
├── src/
│   ├── main/
│   │   ├── java/com/example/rocketmq/
│   │   │   ├── RocketmqFlagConsumerApplication.java       # 启动类
│   │   │   ├── consumer/FlagCheckingConsumer.java         # 消费者逻辑
│   │   │   ├── service/FlagService.java                   # 业务判断
│   │   │   ├── repository/FlagRepository.java             # 数据库查询
│   │   │   ├── model/MessagePayload.java                  # 消息体模型
│   │   │   └── util/JsonUtils.java                        # JSON 工具类(可选)
│   │   └── resources/
│   │       ├── application.yml                            # 配置文件
│   │       └── mapper/FlagMapper.xml                      # MyBatis SQL 映射(可选)
├── pom.xml                                                # 依赖
└── README.md

核心代码示例

1. 消费者类 FlagCheckingConsumer

@RocketMQMessageListener(topic = "your-topic", consumerGroup = "your-consumer-group")
@Component
public class FlagCheckingConsumer implements RocketMQListener<MessagePayload> {@Autowiredprivate FlagService flagService;@Overridepublic void onMessage(MessagePayload message) {boolean shouldConsume = flagService.shouldConsume(message.getId());if (shouldConsume) {System.out.println("消费消息:" + message);// TODO: 处理业务逻辑,比如订单处理} else {System.out.println("跳过消息,flag=false:" + message);// TODO: 这里可以选择把消息投递到延迟队列,或者写一张“待处理表”}}
}

解析:

  • 消息拿到后,不是立刻处理,而是交给 FlagService 先判断;
  • 如果 flag=false,就直接跳过,避免错误消费。

2. 业务判断类 FlagService

@Service
public class FlagService {@Autowiredprivate FlagRepository flagRepository;public boolean shouldConsume(Long id) {Boolean flag = flagRepository.getFlagById(id);return Boolean.TRUE.equals(flag);}
}

解析:

  • 查询数据库,判断 flag 是否为 true
  • 防御性写法:用 Boolean.TRUE.equals(flag),避免 NPE。

3. 数据访问层 FlagRepository

@Mapper
public interface FlagRepository {@Select("SELECT flag FROM your_table WHERE id = #{id}")Boolean getFlagById(@Param("id") Long id);
}

解析:

  • 直接查数据库 flag 值;
  • 可用注解或 XML 映射都行。

4. 消息模型 MessagePayload

@Data
public class MessagePayload {private Long id;       // 业务 ID,比如订单 IDprivate String content; // 业务内容
}

解析:

  • 消息里必须带有业务 ID,否则无法去数据库查 flag;
  • 其他字段根据业务需要扩展。

场景举例

场景一:订单支付后才能发货

  • 用户下单 → 订单消息进入 MQ;
  • 消费者拿到消息 → 查数据库订单 isPaid 字段;
  • 如果 isPaid=true → 正常发货;
  • 如果 isPaid=false → 跳过,稍后重试。

场景二:用户实名认证后才能开通服务

  • 消息带有用户 ID;
  • 消费者查询数据库 isVerified
  • 未实名的消息丢到延迟队列,等用户实名后再消费。

场景三:批量导入异步任务

  • 每条消息里有任务 ID;
  • 只有当任务状态 flag=READY 时,才允许消费并执行。

QA 环节

Q: 如果消息跳过了,后续怎么保证能消费?
A: 有两种常见做法:

  1. 把消息丢到延迟队列(RocketMQ 支持延时等级);
  2. 消息写到数据库“待消费表”,由定时任务扫描重投。

Q: 如果数据库压力太大怎么办?
A: 可以考虑:

  • 用缓存(Redis)存储 flag;
  • 或者在生产端就判断 flag,避免进入 MQ。

Q: 会不会造成消息乱序?
A: 有可能,如果强依赖顺序,要结合 RocketMQ 的顺序消费功能。

总结

  • 普通消息消费是“来了就消费”;
  • 本文介绍了一个“条件消费”的思路:消费者先查数据库 flag,只有满足条件才处理;
  • 这种方式在订单、任务、用户状态控制等场景很常见;
  • 可以结合 RocketMQ 延迟队列或者“待处理表”机制,让未消费的消息在 flag 改变后再处理。

这样既能保证数据一致性,又能灵活控制消息的消费时机。

http://www.xdnf.cn/news/1373311.html

相关文章:

  • PMP项目管理知识点-⑭【①-⑬流程总结】→图片直观表示
  • 让ai写一个类github首页
  • 从文本到二进制:HTTP/2不止于性能,更是对HTTP/1核心语义的传承与革新
  • 深度学习11 Deep Reinforcement Learning
  • 深度学习12 Reinforcement Learning with Human Feedback
  • 如何在阿里云百炼中使用钉钉MCP
  • 深度学习——激活函数
  • 【stm32简单外设篇】-4×4 薄膜键盘
  • 区块链技术探索与应用:从密码学奇迹到产业变革引擎
  • 【PS实战】制作hello标志设计:从选区到色彩填充的流程(大学作业)
  • 开发electron时候Chromium 报 Not allowed to load local resource → 空白页。
  • 【分布式技术】Kafka 数据积压全面解析:原因、诊断与解决方案
  • 基于muduo库的图床云共享存储项目(一)
  • More Effective C++ 条款10:在构造函数中防止资源泄漏
  • Tomcat的VM options
  • 广告推荐模型3:域感知因子分解机(Field-aware Factorization Machine, FFM)
  • 变压器副边电流计算
  • ARP地址解析协议
  • 嵌入式C语言进阶:结构体封装函数的艺术与实践
  • Java 集合笔记
  • 宝石组合(蓝桥杯)
  • 2025最新的软件测试热点面试题(答案+解析)
  • 【Linux 34】Linux-主从复制
  • plantsimulation知识点 RGV小车前端与后端区别
  • CNN 中 3×3 卷积核等设计背后的底层逻辑
  • spring如何通过实现BeanPostProcessor接口计算并打印每一个bean的加载耗时
  • 如何下载MySQL小白指南 (以 Windows 为例)
  • 基础|Golang内存分配
  • 学习游戏制作记录(保存装备物品技能树和删除存档文件)8.26
  • 数据结构的线性表 之 链表