当前位置: 首页 > ops >正文

基于SpringBoot3集成Kafka集群

1. build.gradle依赖引入

implementation 'org.springframework.kafka:spring-kafka:3.2.0'

2. 新增kafka-log.yml文件

在resource/config下面新增kafka-log.yml,配置主题与消费者组

# Kafka消费者群组
kafka:consumer:group:log-data: log-data-grouptopic:log-data: log-data-topicauto-startup: false

加载自定义yml文件

@Configuration
public class YmlConfiguration {@Beanpublic PropertySourcesPlaceholderConfigurer properties() {PropertySourcesPlaceholderConfigurer configurer = new PropertySourcesPlaceholderConfigurer();YamlPropertiesFactoryBean yaml = new YamlPropertiesFactoryBean();yaml.setResources(new ClassPathResource[]{new ClassPathResource("config/kafka-log.yml")});configurer.setProperties(yaml.getObject());return configurer;}
}

3. application.yml文件配置

spring:kafka:bootstrap-servers: 192.168.0.81:9092,192.168.0.82:9092,192.168.0.83:9092producer:retries: 0batch-size: 16384buffer-memory: 254288key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerauto-topic-creation:auto-create: trueproperties:linger.ms: 1session.timeout.ms: 15000sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXTsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="your-password";consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: log-data-groupauto-offset-reset: latestproperties:session.timeout.ms: 15000sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXTsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"  password="your-password";
# 按需在不同环境配置值,如开发环境默认不启动
kafka:consumer:auto-startup: false

4. 生产者实现

@Service
@Slf4j
public class KafkaProducer {private final KafkaTemplate<Integer, String> kafkaTemplate;public KafkaProducer(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String data) {CompletableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);future.whenComplete((sendResult, ex) -> {if (ex != null) {log.error("Kafka send message error = {}, topic = {}, data = {}", ex.getMessage(), topic, data);} else {// Handle the successful sendSystem.out.println("Message sent successfully: " + sendResult);}});}
}

5. 消费者实现

@Component
public class KafkaConsumerGroupManager {private KafkaAdmin kafkaAdmin;private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;public KafkaConsumerGroupManager(KafkaAdmin kafkaAdmin, ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {this.kafkaAdmin = kafkaAdmin;this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;}public void ensureConsumerGroupExists(String groupId) {try {// 获取 AdminClientAdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());// 检查消费者组是否存在ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();Collection<org.apache.kafka.clients.admin.ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get();boolean groupExists = consumerGroupListings.stream().anyMatch(consumerGroupListing -> consumerGroupListing.groupId().equals(groupId));if (!groupExists) {// 如果不存在,则创建消费者组kafkaListenerContainerFactory.getContainerProperties().setGroupId(groupId);}} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Failed to check consumer group existence", e);}}
}
@Service
@Slf4j
public class KafkaConsumer {private ElasticsearchOperations elasticsearchOperations206;public KafkaConsumer(ElasticsearchOperations elasticsearchOperations206) {this.elasticsearchOperations206 = elasticsearchOperations206;}/*** 日志数据消费** @param message*/@KafkaListener(topics = {"${kafka.consumer.topic.log-data}"}, groupId = "${kafka.consumer.group.log-data}", autoStartup = "${kafka.consumer.auto-startup}")public void consumer(String message) {this.bulkIndexJsonData(message);}public void bulkIndexJsonData(String jsonData) {List<IndexQuery> queries = new ArrayList<>();IndexQuery query = new IndexQuery();query.setSource(jsonData);query.setOpType(IndexQuery.OpType.INDEX);queries.add(query);elasticsearchOperations206.bulkIndex(queries, IndexCoordinates.of("log"));}
}

OK, 至此完毕。在某次集群宕机后,我们发现日志无法查询,经排查,是因为最初配置了auto-offset-reset: earliest 导致从头开始重新消费,幸好ES做了幂等性处理

http://www.xdnf.cn/news/15276.html

相关文章:

  • CSS个人笔记分享【仅供学习交流】
  • Utils系列之内存池(MultiSizePool)
  • 电商系统未来三年趋势:体验升级、技术赋能与模式重构
  • 关于ISO 26262的Single-Point Fault/Residual Fault/Latent Fault/Dual-Point Fault的整理
  • Android 响应式编程完整指南:StateFlow、SharedFlow、LiveData 详解
  • Docker 基于 Cgroups 实现资源限制详解【实战+源码】
  • CAU数据挖掘第四章 分类问题
  • Linux修炼:开发工具
  • 软件开发中的瀑布式开发与敏捷开发
  • 2025湖北省信息安全管理与评估赛项一阶段技能书
  • 在 JetBrains 系列 IDE(如 IntelliJ IDEA、PyCharm 等)中如何新建一个 PlantUML 文件
  • 新手向:使用Python构建高效的日志处理系统
  • Llama系列:Llama1, Llama2,Llama3内容概述
  • Web攻防-PHP反序列化魔术方法触发条件POP链构造变量属性修改黑白盒角度
  • Python爬虫实战:研究xlwings库相关技术
  • Qt 3D模块加载复杂模型
  • CA复习功课
  • 前端进阶之路-从传统前端到VUE-JS(第五期-路由应用)
  • react中为啥使用剪头函数
  • 【Java入门到精通】(三)Java基础语法(下)
  • 博途多重背景、参数实例--(二)
  • 多线程的区别和联系
  • 子数组最大平均数 I
  • Leetcode力扣解题记录--第3题(滑动窗口)
  • WildCard野卡已跑路(包含gpt plus升级方案)
  • 程序改错---字符串
  • 【notes】注意力和KV Cache
  • 检查输入有效性(指针是否为NULL)和检查字符串长度是否为0
  • 阻有形,容无声——STA 签核之RC Corner
  • 加法器学习