当前位置: 首页 > web >正文

Spring Boot整合Kafka实战指南:从环境搭建到消息处理全解析

一、环境准备

  1. 安装 Kafka

    • 下载 Kafka:从 Apache Kafka 官网下载对应版本的 Kafka。

    • 解压并启动 Kafka:

      # 启动 Zookeeper(Kafka 依赖 Zookeeper)
      bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka
      bin/kafka-server-start.sh config/server.properties
  2. 创建主题

    bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  3. 添加依赖 在 Spring Boot 项目的 pom.xml 文件中添加 Kafka 依赖:

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId>
    </dependency>

二、基础配置(YML)

application.yml 文件中配置 Kafka 的基本参数:

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: test-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
  • bootstrap-servers:Kafka 服务的地址。

  • group-id:消费者组 ID,同一组的消费者消费不同的分区。

  • auto-offset-reset:当 Kafka 中没有初始偏移量或偏移量超出范围时的处理方式,可选值为 earliest(从最早记录开始读取)、latest(从最新记录开始读取)或 none(抛出异常)。

三、生产者与消费者代码实现

1. 生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}
  • KafkaTemplate:Spring 提供的 Kafka 操作类,用于发送消息。

2. 消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {@KafkaListener(topics = "test-topic")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}
  • @KafkaListener:指定监听的 Kafka 主题。

四、高级用法

1. 消息分区策略

自定义分区器
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic void configure(Map<String, ?> configs) {// 配置初始化}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区逻辑,例如根据 key 的哈希值取模return key.hashCode() % cluster.partitionCountForTopic(topic);}@Overridepublic void close() {// 关闭资源}
}

application.yml 中配置自定义分区器:

spring:kafka:producer:partitioner-class: com.example.CustomPartitioner

2. 消息序列化与反序列化

自定义序列化器
import org.apache.kafka.common.serialization.Serializer;public class CustomSerializer implements Serializer<MyObject> {@Overridepublic byte[] serialize(String topic, MyObject data) {// 自定义序列化逻辑,例如使用 JSON 序列化return data.toJson().getBytes();}
}

application.yml 中配置自定义序列化器:

spring:kafka:producer:value-serializer: com.example.CustomSerializer

3. 消费者并发处理

application.yml 中配置消费者并发数:

spring:kafka:listener:concurrency: 3
  • concurrency:每个消费者线程的并发数,可以根据需求调整。

4. 消费者手动提交偏移量

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {@KafkaListener(topics = "test-topic")public void receiveMessage(String message, Acknowledgment acknowledgment) {System.out.println("Received message: " + message);// 处理完消息后手动提交偏移量acknowledgment.acknowledge();}
}
  • Acknowledgment:手动提交偏移量的接口。

5. 消费者拦截器

自定义拦截器
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.util.HashMap;
import java.util.Map;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 消费前的逻辑,例如过滤消息return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {// 提交偏移量前的逻辑}@Overridepublic void close() {// 关闭资源}@Overridepublic void configure(Map<String, ?> configs) {// 配置初始化}
}

application.yml 中配置自定义拦截器:

spring:kafka:consumer:interceptor-classes: com.example.CustomConsumerInterceptor

6. 生产者拦截器

自定义拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 发送前的逻辑,例如修改消息内容return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 消息发送后的逻辑}@Overridepublic void close() {// 关闭资源}@Overridepublic void configure(Map<String, ?> configs) {// 配置初始化}
}

application.yml 中配置自定义拦截器:

spring:kafka:producer:interceptor-classes: com.example.CustomProducerInterceptor

7. 消费者重试机制

application.yml 中配置消费者重试机制:

spring:kafka:listener:retry:initial-interval: 1000msmax-attempts: 3back-off-multiplier: 2.0max-interval: 10000ms
  • initial-interval:初始重试间隔。

  • max-attempts:最大重试次数。

  • back-off-multiplier:重试间隔的倍数。

  • max-interval:最大重试间隔。

8. 消费者死信队列

在 Kafka 中创建死信队列主题:

bin/kafka-topics.sh --create --topic dlq-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

application.yml 中配置死信队列:

spring:kafka:listener:dead-letter:topic: dlq-topic

9. 消息过滤

application.yml 中配置消息过滤:

spring:kafka:listener:filter:expression: headers['type'] == 'important'
  • expression:SpEL 表达式,用于过滤消息。

10. 消费者负载均衡

Spring Boot 默认支持 Kafka 的消费者负载均衡,无需额外配置。只需在同一个消费者组中启动多个

http://www.xdnf.cn/news/5638.html

相关文章:

  • 【MCP】魔搭社区MCP服务(高德地图、everything文件搜索)
  • Ai网站流式渲染总结
  • c语言第一个小游戏:贪吃蛇小游戏03
  • #在 CentOS 7 中手动编译安装软件操作及原理
  • 03.Golang 切片(slice)源码分析(二、append实现)
  • 视频监控汇聚平台EasyCVR安防视频监控小知识:视频监控系统与监视器安装
  • 【Redis实战篇】分布式锁-Redisson
  • 最新AI产品库哪个平台好?最新AI工具网站平台推荐
  • C++中的std::allocator
  • 神经生物学+图论双buff,揭示大脑语言系统的拓扑结构
  • Android学习总结之线程池篇
  • 脑机接口重点产品发展路径分析:以四川省脑机接口及人机交互产业攻坚突破行动计划(2025-2030年)为例
  • Matlab 短时交通流预测AR模型
  • 【C#】ToArray的使用
  • 将本地文件上传到云服务器上
  • Matlab 模糊控制节水洗衣机模型
  • Next.js 知识框架总结
  • 212. 单词搜索 II【 力扣(LeetCode) 】
  • windows下docker 运行 ros2humble arm64
  • day 23
  • VIC-2D 7.0 为平面样件机械试验提供全视野位移及应变数据软件
  • MySQL是如何加行级锁的
  • Java大师成长计划之第19天:性能调优与GC原理
  • C# 中 static的使用
  • 计算机网络核心技术解析:从基础架构到应用实践
  • 2025年阿里云大数据ACP高级工程师认证模拟试题(附答案解析)
  • 基于Vue3.0的高德地图api教程004:自定义绘制点的颜色/修改绘制点/删除绘制点
  • RCE联系
  • 什么是ERP?ERP有哪些功能?小微企业ERP系统源码,SpringBoot+Vue+ElementUI+UniAPP
  • 基于LVS和Keepalived实现高可用负载均衡架构