rocketmq优先级控制 + 并发度控制
背景
最近在做大模型的项目,算法部门提供的文档解析接口, 并发度为1, 业务这边需要在ai问答和上传文档时进行解析和向量化,文档解析只能单线程跑,问答的文档解析需要高优先级处理。
采用 rocketmq 做文档上传和解析的解耦(项目背景在,无法用其它 mq 替换)。
由于rocketmq不支持优先级队列,需要自己实现优先级队列的效果。
基本思路
创建两个 topic 和两个 group, 分别对应高优先级任务(HighPriorityTopic)和低优先级任务(LowPriorityTopic)。
采用 pull 模式,手动拉取消息,如果 HighPriorityTopic 拉取为空,再去 LowPriorityTopic 拉取消息。
结果
在两个消息队列都为空的时候,上传文档,解析任务会延迟大概5min才消费。
原因是 dev环境文档解析大概需要2min, 而为了避免消息重复消费,我把 mq 的 invisibleDuration 改到了 5min。
当消息错过这个时间窗口的时候,只能等到下个时间窗口才能消费。
解决办法
高优先级队列 push 模式, 低优先级队列 pull 模式。采用 AtomicInteger 做状态判断。
测试结果mq 消息消费几乎没延迟。
代码
private void consumeMessage(ClientConfiguration clientConfiguration) throws ClientException, InterruptedException {// 创建并初始化消费者FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);// 高优先级消费 push 模式clientServiceProvider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(docParseHighPriorityGroup).setSubscriptionExpressions(Map.of(docParseHighPriorityTopic, filterExpression)).setMessageListener(messageView -> {highPriorityTaskCount.incrementAndGet();try {log.info("[文档解析] 消费高优先级解析任务:topic = {}, messageId = {}", messageView.getTopic(), messageView.getMessageId());this.dealMessage(messageView);return ConsumeResult.SUCCESS;} finally {highPriorityTaskCount.decrementAndGet();}}).build();// 低优先级消费 pull 模式SimpleConsumer lowPriorityConsumer = clientServiceProvider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(docParseGroup).setAwaitDuration(Duration.ofSeconds(10)).setSubscriptionExpressions(Map.of(docParseTaskTopic, filterExpression)).build();int maxMessageNum = 1;Duration invisibleDuration = Duration.ofMinutes(5);while (true) {if (highPriorityTaskCount.get() == 0) { // 高优先级没消息了才开始消费List<MessageView> receive = lowPriorityConsumer.receive(maxMessageNum, invisibleDuration);log.info("[文档解析] 消费低优先级解析任务,收到消息数量:{}", receive.size());this.handleMessages(lowPriorityConsumer, receive);} else {Thread.sleep(1_000);}}}