Kafka 4.0 五大 API 选型指南、依赖坐标、上手示例与最佳实践
一、怎么选:五大 API 的“场景分工”
- Producer API:把事件写进某个 Topic。——“我产生日志、订单、埋点、传感器数据,要写进 Kafka。”
- Consumer API:从 Topic 拉取事件并消费处理。——“我做风控、告警、入库、实时特征,订阅并处理。”
- Streams API:把输入 Topic 的数据变换为输出 Topic(过滤、聚合、窗口、Join、状态)。——“我做实时计算/微服务,既读又写,还要有状态。”
- Connect API:以连接器持续同步外部系统 ↔ Kafka。——“我想把 MySQL/对象存储/ES 等批量接入/导出,少写代码甚至零代码。”
- Admin API:管理与巡检主题、Broker、ACL 等对象。——“我需要创建/修改 Topic、查状态、运维脚本化。”
Kafka 的功能通过与语言无关的协议提供,多语言客户端很多;但仅 Java 客户端由主项目维护,其余为独立开源实现。
二、依赖坐标(Maven)
统一用 4.0.0。Gradle 可自行换写法。
Producer / Consumer / Admin(同一坐标):
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version>
</dependency>
Streams(Java/Scala 通用核心库):
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>4.0.0</version>
</dependency>
Streams(Scala 2.13 可选 DSL 封装):
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams-scala_2.13</artifactId><version>4.0.0</version>
</dependency>
三、Producer API:最小可用写入
import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class DemoProducer {public static void main(String[] args) {Properties p = new Properties();p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 生产建议p.put(ProducerConfig.ACKS_CONFIG, "all"); // 强一致写入p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 幂等// Kafka 4.0 默认 linger.ms=5,批量更高效try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {ProducerRecord<String,String> rec = new ProducerRecord<>("quickstart-events", "key1", "hello kafka");producer.send(rec, (md, ex) -> {if (ex != null) ex.printStackTrace();else System.out.printf("OK topic=%s partition=%d offset=%d%n", md.topic(), md.partition(), md.offset());});producer.flush();}}
}
要点
acks=all
+ 幂等(enable.idempotence=true
)是可靠写的默认组合。- 4.0 默认
linger.ms=5
,通常更高吞吐且延迟不劣。
四、Consumer API:订阅与拉取
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;public class DemoConsumer {public static void main(String[] args) {Properties p = new Properties();p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());p.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 新组从头开始try (KafkaConsumer<String, String> c = new KafkaConsumer<>(p)) {c.subscribe(List.of("quickstart-events"));while (true) {// 4.0 用 poll(Duration),不再使用 poll(long)ConsumerRecords<String,String> recs = c.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> r : recs) {System.out.printf("key=%s value=%s offset=%d%n", r.key(), r.value(), r.offset());}// 手动提交示例c.commitAsync();}}}
}
要点
- 4.0 不再提供
poll(long)
,请使用poll(Duration)
。 - 消费组并行度 = 分区数;按需扩展分区或消费者实例。
五、Streams API:把 Topic 当作“输入表/输出表”
一个最小单词计数拓扑(输入 input-topic
→ 输出 output-topic
):
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;public class WordCountApp {public static void main(String[] args) {Properties p = new Properties();p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();KStream<String,String> text = builder.stream("input-topic");KTable<String,Long> counts = text.flatMapValues(v -> Arrays.asList(v.toLowerCase().split("\\W+"))).groupBy((k, word) -> word).count();counts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));KafkaStreams streams = new KafkaStreams(builder.build(), p);streams.start();Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}
要点
- Streams 把 Kafka 作为“日志 + 状态快照”的存储,支持窗口、聚合、Join、事务(恰好一次) 等。
- 4.0 清理了过时 API;常用 DSL 基本不受影响。
六、Connect API:零/少代码联通外部系统
多数场景无需编写自定义连接器:直接使用预构建 Source/Sink 即可(如 JDBC、对象存储、ES、BigQuery…)。
典型启动(Standalone 示例):
bin/connect-standalone.sh \config/connect-standalone.properties \config/connect-file-source.properties \config/connect-file-sink.properties
建议
- 生产使用 分布式模式(多 worker、弹性容错)。
- 设计好 DLT(死信主题) 与重试策略,避免“毒数据”卡死。
- 有强 Schema 需求:结合 Schema Registry 管控演进。
七、Admin API:脚本化管理运维
与 Producer/Consumer 使用同一
kafka-clients
依赖。
创建 Topic(示例)
import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;public class CreateTopicDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties p = new Properties();p.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");try (Admin admin = Admin.create(p)) {NewTopic t = new NewTopic("demo-admin-topic", 3, (short)1);admin.createTopics(List.of(t)).all().get();System.out.println("Created.");}}
}
修改配置(4.0 用 incrementalAlterConfigs)
ConfigResource res = new ConfigResource(ConfigResource.Type.TOPIC, "demo-admin-topic");
AlterConfigOp op = new AlterConfigOp(new ConfigEntry("min.insync.replicas", "2"),AlterConfigOp.OpType.SET);
admin.incrementalAlterConfigs(Map.of(res, List.of(op))).all().get();
八、从“单点调用”到“端到端应用”的组合拳
-
数据进入:业务服务 → Producer API →
orders.created
-
实时处理:Streams API 读取
orders.created
,聚合出用户维度统计 →user.order.stats
-
下游分发:
- Consumer API:风控服务订阅
orders.created
实时判定 - Connect:把
user.order.stats
Sink 到 OLAP/湖仓
- Consumer API:风控服务订阅
-
运维管理:Admin API 创建/巡检 Topic、配额、ACL
九、Kafka 4.0 相关注意事项(API 侧)
- 仅 Java 客户端为官方维护;其他语言(Go、Python、.NET、C/C++…)社区实现良多,但请留意版本兼容与协议支持。
- Consumer:使用
poll(Duration)
;committed(TopicPartition)
等旧签名在 4.0 已移除对应变体(请用集合参数形式)。 - Admin:使用
incrementalAlterConfigs
,不要再用已移除的alterConfigs
。 - Producer:默认
linger.ms=5
;幂等与事务结合使用时注意max.in.flight.requests.per.connection
的设置。
十、落地清单(Checklist)
- 统一依赖版本到 4.0.0(
kafka-clients
/kafka-streams
/kafka-streams-scala_2.13
) - Producer:
acks=all
、幂等、压测确认批量参数(linger.ms
、batch.size
) - Consumer:
poll(Duration)
改造、消费组并行度与重试/DLT - Streams:状态存储大小与容错、窗口语义(事件时间/水位线)
- Connect:选对 Source/Sink、分布式部署、DLT/重试、监控指标
- Admin:脚本化创建/变更 Topic 与配额、滚动发布流程与回滚方案