当前位置: 首页 > backend >正文

rocketmq优先级控制 + 并发度控制

背景

最近在做大模型的项目,算法部门提供的文档解析接口, 并发度为1, 业务这边需要在ai问答和上传文档时进行解析和向量化,文档解析只能单线程跑,问答的文档解析需要高优先级处理。

采用 rocketmq 做文档上传和解析的解耦(项目背景在,无法用其它 mq 替换)。

由于rocketmq不支持优先级队列,需要自己实现优先级队列的效果。

基本思路

创建两个 topic 和两个 group, 分别对应高优先级任务(HighPriorityTopic)和低优先级任务(LowPriorityTopic)。
采用 pull 模式,手动拉取消息,如果 HighPriorityTopic 拉取为空,再去 LowPriorityTopic 拉取消息。

结果

在两个消息队列都为空的时候,上传文档,解析任务会延迟大概5min才消费。
原因是 dev环境文档解析大概需要2min, 而为了避免消息重复消费,我把 mq 的 invisibleDuration 改到了 5min。
当消息错过这个时间窗口的时候,只能等到下个时间窗口才能消费。

解决办法

高优先级队列 push 模式, 低优先级队列 pull 模式。采用 AtomicInteger 做状态判断。
测试结果mq 消息消费几乎没延迟。

代码

private void consumeMessage(ClientConfiguration clientConfiguration) throws ClientException, InterruptedException {// 创建并初始化消费者FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);// 高优先级消费 push 模式clientServiceProvider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(docParseHighPriorityGroup).setSubscriptionExpressions(Map.of(docParseHighPriorityTopic, filterExpression)).setMessageListener(messageView -> {highPriorityTaskCount.incrementAndGet();try {log.info("[文档解析] 消费高优先级解析任务:topic = {}, messageId = {}", messageView.getTopic(), messageView.getMessageId());this.dealMessage(messageView);return ConsumeResult.SUCCESS;} finally {highPriorityTaskCount.decrementAndGet();}}).build();// 低优先级消费 pull 模式SimpleConsumer lowPriorityConsumer = clientServiceProvider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(docParseGroup).setAwaitDuration(Duration.ofSeconds(10)).setSubscriptionExpressions(Map.of(docParseTaskTopic, filterExpression)).build();int maxMessageNum = 1;Duration invisibleDuration = Duration.ofMinutes(5);while (true) {if (highPriorityTaskCount.get() == 0) { // 高优先级没消息了才开始消费List<MessageView> receive = lowPriorityConsumer.receive(maxMessageNum, invisibleDuration);log.info("[文档解析] 消费低优先级解析任务,收到消息数量:{}", receive.size());this.handleMessages(lowPriorityConsumer, receive);} else {Thread.sleep(1_000);}}}
http://www.xdnf.cn/news/7862.html

相关文章:

  • 85本适合AI入门的人工智能书籍合集免费资源
  • 游戏引擎学习第301天:使用精灵边界进行排序
  • 数据湖和数据仓库的区别
  • 线程、线程池、异步
  • 人脸识别,使用 deepface + api + flask, 改写 + 调试
  • 【沉浸式求职学习day46】【华为5.7暑期机试题目讲解】
  • 广东省省考备考(第十六天5.21)—言语:语句排序题(听课后强化)
  • Mcu_Bsdiff_Upgrade
  • 数据结构与算法——堆
  • ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的使用场景
  • (vue)前端实现下载后端提供的URL文件
  • 设计模式1 ——单例模式
  • 前后端的双精度浮点数精度不一致问题解决方案,自定义Spring的消息转换器处理JSON转换
  • LeetCode117_填充每个结点的下一个右侧结点指针Ⅱ
  • WPS深度适配鸿蒙电脑折叠形态,国产替代下的未来何在?
  • L53.【LeetCode题解】二分法习题集2
  • 关于收集 Android Telephony 网络信息的设计思考2
  • WinForms 应用中集成 OpenCvSharp 实现基础图像处理
  • 基于AI大语言模型的历史文献分析在气候与灾害重建中的技术-以海南岛千年台风序列重建为例
  • C++初阶-vector的模拟实现2
  • 前端(小程序)学习笔记(CLASS 1):组件
  • 强化学习入门:RL开发框架Gym简介
  • App 出海:全渠道营销如何通过性能监控与精准归因实现增长
  • 【209. 长度最小的子数组】
  • shell脚本之函数详细解释及运用
  • 【深度估计 Depth Estimation】数据集介绍
  • [Java实战]Spring Boot整合Seata:分布式事务一致性解决方案(三十一)
  • 云祺容灾备份系统公有云备份与恢复实操-华为云
  • 【机器学习】支持向量机(SVM)
  • Suricata 3规则介绍、以及使用