当前位置: 首页 > news >正文

RocketMq消费者动态订阅topic

一般情况就是生产者将消息发送到指定的topic,消费者订阅指定的topic下的消息进行消费,这种情况下的topic是写死的。

代码示例:

        // 初始化消费者,指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");//指定RocketMq服务器地址consumer.setNamesrvAddr("localhost:9876");//每次拉取最多拉取消息10条 consumer.setConsumeMessageBatchMaxSize(10);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅topic下的说有tagconsumer.subscribe("topic_test", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(msg->{log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));})return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();

简单了解一下注册监听器:

消费消息的方法consumeMessage有两个参数

  1. List<MessageExt> msgs:消费一次性从消息队列里面拉取的消息列表,可通过setConsumeMessageBatchMaxSize()方法来配置每次最多拉取消息条数
  2. ConsumeConcurrentlyContext context: 是 RocketMQ 并发消费上下文对象,它包含了当前消息消费的一些上下文信息和控制参数。

返回ConsumeConcurrentlyStatus类型:

  1. CONSUME_SUCCESS:用于确定同一批消息全部消费成功
  2. RECONSUME_LATER:如果消息消费失败,同一批的所有消息将重新被消费,直到达到最大重试次数。如果同一批的单条消息消费失败返回了RECONSUME_LATER,那么同一批的其他消息即使被成功消费了也会被重新消费。

现在有这么一种情况:生产者的topic可以由用户动态配置,消费者具体消费哪些topic下的消息是不知道的,当消费者正在运行时可以实现动态订阅topic。

实现方式:

@Slf4j
@Component
@Getter
public class DynamicMonitoringEvent {private DefaultMQPushConsumer consumer;private final List<String> topics = new CopyOnWriteArrayList<>();@PostConstructprivate void init(){getInstance();start();}public  DefaultMQPushConsumer getInstance(){log.info("初始化消费者");if (consumer == null){DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer(group);dconsumer.setNamesrvAddr(nameServer);dconsumer.setConsumeMessageBatchMaxSize(10);dconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);dconsumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));//处理业务逻辑}return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer = dconsumer;}return consumer;}public void start(){try {consumer.start();} catch (MQClientException e) {e.printStackTrace();}}// 动态设定topic的订阅// 注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖public void setConsumer(String topic) {try {if (!StringUtils.hasText(topic) || topics.contains(topic)){return;}topics.add(topic);// 动态添加topic监听consumer.subscribe(topic, "*");//也可以重新注册监听器//  dconsumer.registerMessageListener(new MessageListenerConcurrently() {//     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,//                                                     ConsumeConcurrentlyContext context) {//         for (MessageExt msg : msgs){//                 log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());//                 log.info("--body:"+new String(msg.getBody()));//         }//         return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// }log.info("动态订阅topic:{} 成功", topic);} catch (MQClientException e) {log.error("动态订阅topic:{} 失败", topic, e);}}
}

在外部调用setConsumer方法实现动态订阅topic

@Autowired
private DynamicMonitoringEvent dynamicMonitoringEvent;public void setDynamicTopic() {Set<String> topics = //获取topic列表for (String topic : topics){dynamicMonitoringEvent.setConsumer(topic);}}

注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖,所以我们需要做一下判断,防止重复订阅同一个topic,如果注册了新的监听器,会覆盖之前的监听器,一个消费者只能又一个监听器

http://www.xdnf.cn/news/1328779.html

相关文章:

  • RK3568 Linux驱动学习——Linux设备树
  • Linux下Mysql命令,创建mysql,删除mysql
  • Win/Linux笔记本合盖不睡眠设置指南
  • 小程序插件使用
  • RWA加密金融高峰论坛星链品牌全球发布 —— 稳定币与Web3的香港新篇章
  • Vue 2 项目中快速集成 Jest 单元测试(超详细教程)
  • 哈希:两数之和
  • 从零开始的云计算生活——第四十六天,铁杵成针,kubernetes模块之Configmap资源与Secret资源对象
  • 【技术揭秘】AI Agent操作系统架构演进:从单体到分布式智能的跃迁
  • 告别手写文档!Spring Boot API 文档终极解决方案:SpringDoc OpenAPI
  • 大数据数据库 —— 初见loTDB
  • 视觉采集模块的用法
  • A股大盘数据-20250819 分析
  • 云原生俱乐部-shell知识点归纳(1)
  • 力扣57:插入区间
  • 决策树剪枝及数据处理
  • AI 药物发现:化学分子到机器学习数值特征的转化——打通“化学空间”与“模型空间”关键路径
  • 【Git 子模块与动态路由映射技术分析文档】
  • Matplotlib数据可视化实战:Matplotlib子图布局与管理入门
  • 疏老师-python训练营-Day50预训练模型+CBAM注意力
  • PCL+Spigot服务器+python进行MC编程(使用Trae进行AI编程)---可以生成彩虹
  • Hugging Face 核心组件介绍
  • 35岁对工作的一些感悟
  • Ansible 中的文件包含与导入机制
  • noetic版本/ubuntu20 通过moveit控制真实机械臂
  • 常见的对比学习的损失函数
  • DataAnalytics之Tool:Metabase的简介、安装和使用方法、案例应用之详细攻略
  • 数字ic后端设计从入门到精通14(含fusion compiler, tcl教学)半定制后端设计
  • plantsimulation知识点25.8.19 工件不在RGV中心怎么办?
  • c#联合halcon的基础教程(案例:亮度计算、角度计算和缺陷检测)(含halcon代码)