当前位置: 首页 > backend >正文

Kafka 4.0 五大 API 选型指南、依赖坐标、上手示例与最佳实践

一、怎么选:五大 API 的“场景分工”

  • Producer API:把事件写进某个 Topic。——“我产生日志、订单、埋点、传感器数据,要写进 Kafka。”
  • Consumer API:从 Topic 拉取事件并消费处理。——“我做风控、告警、入库、实时特征,订阅并处理。”
  • Streams API:把输入 Topic 的数据变换输出 Topic(过滤、聚合、窗口、Join、状态)。——“我做实时计算/微服务,既读又写,还要有状态。”
  • Connect API:以连接器持续同步外部系统 ↔ Kafka。——“我想把 MySQL/对象存储/ES 等批量接入/导出,少写代码甚至零代码。”
  • Admin API:管理与巡检主题、Broker、ACL 等对象。——“我需要创建/修改 Topic、查状态、运维脚本化。”

Kafka 的功能通过与语言无关的协议提供,多语言客户端很多;但仅 Java 客户端由主项目维护,其余为独立开源实现。

二、依赖坐标(Maven)

统一用 4.0.0。Gradle 可自行换写法。

Producer / Consumer / Admin(同一坐标):

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version>
</dependency>

Streams(Java/Scala 通用核心库):

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>4.0.0</version>
</dependency>

Streams(Scala 2.13 可选 DSL 封装):

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams-scala_2.13</artifactId><version>4.0.0</version>
</dependency>

三、Producer API:最小可用写入

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class DemoProducer {public static void main(String[] args) {Properties p = new Properties();p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 生产建议p.put(ProducerConfig.ACKS_CONFIG, "all");               // 强一致写入p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 幂等// Kafka 4.0 默认 linger.ms=5,批量更高效try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {ProducerRecord<String,String> rec = new ProducerRecord<>("quickstart-events", "key1", "hello kafka");producer.send(rec, (md, ex) -> {if (ex != null) ex.printStackTrace();else System.out.printf("OK topic=%s partition=%d offset=%d%n", md.topic(), md.partition(), md.offset());});producer.flush();}}
}

要点

  • acks=all + 幂等(enable.idempotence=true)是可靠写的默认组合。
  • 4.0 默认 linger.ms=5,通常更高吞吐且延迟不劣

四、Consumer API:订阅与拉取

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;public class DemoConsumer {public static void main(String[] args) {Properties p = new Properties();p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());p.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 新组从头开始try (KafkaConsumer<String, String> c = new KafkaConsumer<>(p)) {c.subscribe(List.of("quickstart-events"));while (true) {// 4.0 用 poll(Duration),不再使用 poll(long)ConsumerRecords<String,String> recs = c.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> r : recs) {System.out.printf("key=%s value=%s offset=%d%n", r.key(), r.value(), r.offset());}// 手动提交示例c.commitAsync();}}}
}

要点

  • 4.0 不再提供 poll(long),请使用 poll(Duration)
  • 消费组并行度 = 分区数;按需扩展分区或消费者实例。

五、Streams API:把 Topic 当作“输入表/输出表”

一个最小单词计数拓扑(输入 input-topic → 输出 output-topic):

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;public class WordCountApp {public static void main(String[] args) {Properties p = new Properties();p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();KStream<String,String> text = builder.stream("input-topic");KTable<String,Long> counts = text.flatMapValues(v -> Arrays.asList(v.toLowerCase().split("\\W+"))).groupBy((k, word) -> word).count();counts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));KafkaStreams streams = new KafkaStreams(builder.build(), p);streams.start();Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}

要点

  • Streams 把 Kafka 作为“日志 + 状态快照”的存储,支持窗口、聚合、Join、事务(恰好一次) 等。
  • 4.0 清理了过时 API;常用 DSL 基本不受影响。

六、Connect API:零/少代码联通外部系统

多数场景无需编写自定义连接器:直接使用预构建 Source/Sink 即可(如 JDBC、对象存储、ES、BigQuery…)。

典型启动(Standalone 示例):

bin/connect-standalone.sh \config/connect-standalone.properties \config/connect-file-source.properties \config/connect-file-sink.properties

建议

  • 生产使用 分布式模式(多 worker、弹性容错)。
  • 设计好 DLT(死信主题) 与重试策略,避免“毒数据”卡死。
  • 有强 Schema 需求:结合 Schema Registry 管控演进。

七、Admin API:脚本化管理运维

与 Producer/Consumer 使用同一 kafka-clients 依赖。

创建 Topic(示例)

import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;public class CreateTopicDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties p = new Properties();p.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");try (Admin admin = Admin.create(p)) {NewTopic t = new NewTopic("demo-admin-topic", 3, (short)1);admin.createTopics(List.of(t)).all().get();System.out.println("Created.");}}
}

修改配置(4.0 用 incrementalAlterConfigs)

ConfigResource res = new ConfigResource(ConfigResource.Type.TOPIC, "demo-admin-topic");
AlterConfigOp op = new AlterConfigOp(new ConfigEntry("min.insync.replicas", "2"),AlterConfigOp.OpType.SET);
admin.incrementalAlterConfigs(Map.of(res, List.of(op))).all().get();

八、从“单点调用”到“端到端应用”的组合拳

  1. 数据进入:业务服务 → Producer APIorders.created

  2. 实时处理Streams API 读取 orders.created,聚合出用户维度统计 → user.order.stats

  3. 下游分发

    • Consumer API:风控服务订阅 orders.created 实时判定
    • Connect:把 user.order.stats Sink 到 OLAP/湖仓
  4. 运维管理Admin API 创建/巡检 Topic、配额、ACL

九、Kafka 4.0 相关注意事项(API 侧)

  • 仅 Java 客户端为官方维护;其他语言(Go、Python、.NET、C/C++…)社区实现良多,但请留意版本兼容与协议支持。
  • Consumer:使用 poll(Duration)committed(TopicPartition) 等旧签名在 4.0 已移除对应变体(请用集合参数形式)。
  • Admin:使用 incrementalAlterConfigs,不要再用已移除的 alterConfigs
  • Producer:默认 linger.ms=5;幂等与事务结合使用时注意 max.in.flight.requests.per.connection 的设置。

十、落地清单(Checklist)

  • 统一依赖版本到 4.0.0kafka-clients / kafka-streams / kafka-streams-scala_2.13
  • Producer:acks=all、幂等、压测确认批量参数(linger.msbatch.size
  • Consumer:poll(Duration) 改造、消费组并行度与重试/DLT
  • Streams:状态存储大小与容错、窗口语义(事件时间/水位线)
  • Connect:选对 Source/Sink、分布式部署、DLT/重试、监控指标
  • Admin:脚本化创建/变更 Topic 与配额、滚动发布流程与回滚方案
http://www.xdnf.cn/news/18910.html

相关文章:

  • 项目实战4:TrinityCore框架学习
  • 科技守护古树魂:古树制茶行业的数字化转型之路
  • 把llamafacoty微调后的模型导出ollama模型文件
  • 【前端教程】JavaScript入门核心:使用方式、执行机制与核心语法全解析
  • Oracle 数据库权限管理的艺术:从入门到精通
  • 目标检测领域基本概念
  • 第6篇:链路追踪系统 - 分布式环境下的请求跟踪
  • JSP程序设计之JSP指令
  • 【Python】QT(PySide2、PyQt5):Qt Designer,VS Code使用designer,可能的报错
  • Java学习笔记之——通过分页查询样例感受JDBC、Mybatis以及MybatisPlus(一)
  • 上海控安:汽车API安全-风险与防护策略解析
  • Java 实现HTML转Word:从HTML文件与字符串到可编辑Word文档
  • Nginx + Certbot配置 HTTPS / SSL 证书(简化版已测试)
  • 机器视觉学习-day07-图像镜像旋转
  • 【Deepseek】Windows MFC/Win32 常用核心 API 汇总
  • 【PyTorch】基于YOLO的多目标检测项目(一)
  • 【Redis】数据分片机制和集群机制
  • 【Java SE】基于多态与接口实现图书管理系统:从设计到编码全解析
  • C/C++---前缀和(Prefix Sum)
  • 微服务的编程测评系统17-判题功能-代码沙箱
  • MQTT broker 安装与基础配置实战指南(一)
  • 题目—移除元素
  • PyTorch中的激活函数
  • AI需求优先级:数据价值密度×算法成熟度
  • HSA35NV001美光固态闪存NQ482NQ470
  • 达可替尼-
  • SpringBoot整合RabbitMQ:从消息队列基础到高可用架构实战指南
  • 浏览器网页路径扫描器(脚本)
  • 改造thinkphp6的命令行工具和分批次导出大量数据
  • MySQL 基础:DDL、DML、DQL、DCL 四大类 SQL 语句全解析