当前位置: 首页 > java >正文

如何使用@KafkaListener实现从nacos中动态获取监听的topic

1、简介

        对于经常需要变更kafka主题的场景,为了实现动态监听topic的功能,可以使用以下方式。

2、使用步骤
2.1、添加依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.1</version>
</dependency>
2.2、nacos中配置
    # kafka 配置
spring:kafka:bootstrap-servers: ip地址:9092topics: topic1,tpic2producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerenable-idempotence: trueacks: alltransactional-id: kafka-groupconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: kafka-clickhouse-groupauto-offset-reset: latestenable-auto-commit: falseisolation-level: read_committedallow-auto-create-topics: truelistener:ack-mode: MANUAL_IMMEDIATEconcurrency: 3
2.3、配置类
package org.aecsi.kafkadatatock.config;import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;@Configuration
@RequiredArgsConstructor
@EnableKafka
@RefreshScope
public class KafkaConfig {private final KafkaProperties kafkaProperties;@Beanpublic KafkaAdmin kafkaAdmin() {return new KafkaAdmin(kafkaProperties.buildAdminProperties());}@Beanpublic AdminClient adminClient(KafkaAdmin kafkaAdmin) {return AdminClient.create(kafkaAdmin.getConfigurationProperties());}@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildProducerProperties());configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka-clickhouse-producer");DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(configProps);factory.setTransactionIdPrefix("kafka-clickhouse-producer-");return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}@Bean@RefreshScopepublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildConsumerProperties());configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, true);configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,KafkaTransactionManager<String, String> transactionManager) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setTransactionManager(transactionManager);return factory;}@Bean@RefreshScopepublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setAutoStartup(true);return factory;}@Beanpublic ApplicationRunner kafkaListenerStarter(KafkaListenerEndpointRegistry registry) {return args -> {// 启动所有 Kafka 监听器registry.start();};}
}

接收消息类

@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}", autoStartup = "false")@Transactional(transactionManager = "transactionManager")public void processMessage(ConsumerRecord<String, String> record,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {try {log.info("kafka 接受 topic: {} 消息", topic);
//          处理消息acknowledgment.acknowledge();} catch (Exception e) {log.error("Error processing message for topic {}: {}", topic, e.getMessage());throw e;}}

 主启动类添加一个注解

@EnableConfigurationProperties(KafkaProperties.class)
3、总结

      实现kafka动态获取topic还有其他方式,博主目前只验证这一种,其他方式待更新。

http://www.xdnf.cn/news/1950.html

相关文章:

  • 【Hive入门】Hive数据导出完全指南:从HDFS到本地文件系统的专业实践
  • 利用JMeter代理服务器方式实现高效压测
  • Leetcode 2845 题解
  • C++_数据结构_详解红黑树
  • 微软官网Win10镜像下载快速获取ISO文件
  • 第18章:MCP在创作领域中的应用
  • Java集成Redisson实现分布式锁(实战)
  • 学生管理系统微服务方式实现
  • WebUI可视化:第3章:Gradio入门实战
  • FerretDB:基于PostgreSQL的MongoDB替代产品
  • 2、Ubuntu 环境下安装RabbitMQ
  • PDFMathTranslate:基于LLM的PDF文档翻译及双语对照的工具【使用教程】
  • Golang | 迭代器模式
  • 学习整理在centos7上安装mysql8.0版本教程
  • 同步定时器的用户数要和线程组保持一致,否则jmeter会出现接口不执行’stop‘和‘×’的情况
  • 基于线性LDA算法对鸢尾花数据集进行分类
  • 【uniapp】vue2 搜索文字高亮显示
  • 【Java】BitSet简介
  • 10.ArkUI Grid的介绍和使用
  • HTML 地理定位(Geolocation)教程
  • 从 “制造” 到 “品牌”:官网建设助力中国企业突围东南亚
  • python打印颜色(python颜色、python print颜色、python打印彩色文字、python print彩色、python彩色文字)
  • ECharts 地图开发入门
  • 【华为HCIP | 华为数通工程师】821—多选解析—第十七页
  • 缓存与数据库一致性深度解析与解决方案
  • 铃木一郎女儿是奥运会选手吗·棒球1号位
  • 【Pandas】pandas DataFrame rsub
  • opencv--通道,彩色和灰度
  • Appium自动化开发环境搭建
  • CT三视图显示及插值切片显示-全网独家(复制即用)