Kafka Connect + Streams 用到极致从 CDC 到流处理的一套落地方案
关键目标:
- 零丢失:端到端 Exactly Once(Source 端事务 + Streams exactly_once_v2 + Sink DLQ)。
- 低延迟:Producer 端批量压缩 + Streams 缓存 + 合理 poll/commit 间隔。
- 可恢复:Connect/Streams 的 rebootstrap、backoff、standby、副本与快照。
二、把“数据源”和“数据去向”都交给它
2.1 Worker(Connect 集群)怎么配?
-
集群标识与元数据存储
group.id
:多个 worker 同属一个 Connect 集群(比如connect-realtime
)。config.storage.topic
/offset.storage.topic
/status.storage.topic
:存配置、偏移、状态的 Topic。生产建议 RF=3、分区数默认即可。
-
连接 Kafka
bootstrap.servers
:至少写 2~3 台,提升可达性。metadata.recovery.strategy=rebootstrap
:断联后自动重引导,配合reconnect.backoff.*
。
-
端到端一致性
exactly.once.source.support
:新集群直接设enabled
;老集群先preparing
,滚动升级后再enabled
。
-
REST & 插件
listeners=http://:8083
(或加admin.listeners
做隔离)plugin.path
:放 Debezium / 目标 Sink 的插件目录。plugin.discovery=hybrid_warn
:启动时能发现异常但不至于直接失败(生产逐步转service_load
)。
这些就是必须动的,其余如 CORS、metrics、SSL/SASL、backoff、Ciphers 等保持默认即可,文末完整清单可对照。
一份 Worker 示例(最小可用):
group.id=connect-realtime
bootstrap.servers=broker-a:9092,broker-b:9092,broker-c:9092config.storage.topic=_connect_configs
offset.storage.topic=_connect_offsets
status.storage.topic=_connect_status
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3listeners=http://:8083
plugin.path=/opt/connectors,/usr/local/share/kafka/plugins
exactly.once.source.support=enabledmetadata.recovery.strategy=rebootstrap
reconnect.backoff.ms=100
reconnect.backoff.max.ms=1000
request.timeout.ms=40000
2.2 把订单/库存 CDC推入 Kafka
必填三件套:
name
:debezium-orders-src
connector.class
:io.debezium.connector.mysql.MySqlConnector
tasks.max
:按库/表分片、数据库负载与 MySQL binlog 速率评估(如2~4
)
转换/容错:
transforms
/predicates
:常用 SMT(去字段、展平、补全业务时间)。errors.*
:生产建议errors.tolerance=all + DLQ
(Source 没有 DLQ?那就坚持 fail-fast或在 SMT 封装补偿逻辑)。- EOS 检查:
exactly.once.support=required
(若 Connector 未宣称支持则可requested
,但要读清文档)。
Topic 创建:
topic.creation.groups
:用 Connect 的 AdminClient 在禁 Broker 自动建 Topic时创建orders.cdc
/inventory.cdc
等,指定 RF/分区/清理策略。
其余如
config.action.reload
(外部密钥轮转自动重启)、header.converter
、offsets.storage.topic
(单独 offsets 主题)等,视规范决定。所有字段详见文末 Source 完整表。
2.3 把处理结果发去 ES/CK/缓存,并兜底 DLQ
必填:
name
/connector.class
/tasks.max
topics
或topics.regex
(二选一)。我们常用.regex
订阅一类结果主题(如^orders\.result\..+
)。
强烈建议打开 DLQ:
errors.deadletterqueue.topic.name=_dlq.orders.sink
errors.deadletterqueue.topic.replication.factor=3
errors.deadletterqueue.context.headers.enable=true
(带上上下文,更好排障)
重试与容忍:
errors.retry.timeout=-1
(无限重试,或业务接受的时间窗)errors.retry.delay.max.ms=60000
errors.tolerance=all
(不让单条脏数据拖垮任务)
三、把“订单 + 行为 + 库存”流在一起
目标作业:
- 以
orders.cdc
、inventory.cdc
、user.behavior
为输入; - 滚动聚合 5 分钟销量窗口、Join 出“超卖风险”,并实时写出
orders.result.picklist
、orders.result.alert
。
3.1 可靠性与并发
processing.guarantee=exactly_once_v2
:端到端事务(要求 Broker ≥ 2.5,生产建议三节点以上)。replication.factor=3
:内部 changelog / repartition topic 的 RF。num.stream.threads=2~4
:按分区数与主机核数定。num.standby.replicas=1
+max.warmup.replicas=2
+probing.rebalance.interval.ms=600000
:热备与平滑迁移,缩短 failover 暂停时间。
3.2 性能与乱序
- 缓存:
cache.max.bytes.buffering
(如 128MB)+statestore.cache.max.bytes
(默认 10MB,可上调)。 - 乱序控制:
max.task.idle.ms=0
(默认不等生产者)。如果你跨多流 Join 严格按时间输出,可以给一点 idle(>0)等待其它分区追上;极限低延迟就设-1
不等。 - 提交节奏:
commit.interval.ms=100
(在 exactly_once_v2 下默认就是 100ms)。
3.3 状态存储与可观测
state.dir=/var/lib/kafka-streams/orders-etl
(每实例唯一)。default.dsl.store=rocksDB
(默认)+ 可自定义rocksdb.config.setter
做压缩/并发。- 指标:
enable.metrics.push=true
+metric.reporters
(接入你的监控栈)。 - 运维:
metadata.recovery.strategy=rebootstrap
,配合reconnect.backoff.*
;log.summary.interval.ms=120000
。
一份 Streams 示例:
application.id=orders-etl
bootstrap.servers=broker-a:9092,broker-b:9092,broker-c:9092processing.guarantee=exactly_once_v2
replication.factor=3
num.stream.threads=3
num.standby.replicas=1
max.warmup.replicas=2
probing.rebalance.interval.ms=600000cache.max.bytes.buffering=134217728
statestore.cache.max.bytes=33554432
state.dir=/var/lib/kafka-streams/orders-etlcommit.interval.ms=100
topology.optimization=all
max.task.idle.ms=0metadata.recovery.strategy=rebootstrap
reconnect.backoff.ms=50
reconnect.backoff.max.ms=1000
request.timeout.ms=40000
四、把配置联到“可交付”的工单
-
SLA:端到端 P95 ≤ 2s;可用性 99.9%;恢复时间 ≤ 1 分钟。
-
Topic 规划:
orders.cdc
(compact+delete,7 天),inventory.cdc
(同上),user.behavior
(delete,3 天)。- Repartition/Changelog 由 Streams 代管,RF=3。
-
Connect 工单:
- Worker:上面示例即可,插件路径、EOS 开启;
- Source(Debezium):库账号只读、binlog 参数校验、分库/表白名单;
- Sink:目标端地址、批量/并发、DLQ 主题与保留策略;
-
Streams 工单:
- 副本与 standby、EXACTLY_ONCE_V2、状态目录与清理、监控看板与告警(commit 延时 / 处理速率 / record-lag-max)。
五、完整参数速查表
下面是逐项列名 + 含义/类型/默认值/取值范围的压缩清单,与上文场景选择一一对应;你可以直接用于代码审查或 CMDB 入库。
为避免刷屏,每个条目只保留“最重要的语义”,字段一个不落:
5.1 Kafka Connect — Worker 级(3.5)
High:
config.storage.topic
(存 connector 配置的 Topic)|group.id
(Connect 集群 ID)|key.converter
(键格式转换)|offset.storage.topic
(Source 偏移存储)|status.storage.topic
(状态存储)|value.converter
(值格式转换)|bootstrap.servers
(引导)|exactly.once.source.support
(Source 端 EOS:disabled/preparing/enabled)|heartbeat.interval.ms
|rebalance.timeout.ms
|session.timeout.ms
|一组 ssl.*(key/trust/keystore/password/location/type/provider/…)
Medium:
client.dns.lookup
|connections.max.idle.ms
|connector.client.config.override.policy(All/None/Principal)
|receive.buffer.bytes
|request.timeout.ms
|sasl.client.callback.handler.class
|sasl.jaas.config
|sasl.kerberos.service.name
|sasl.login.callback.handler.class
|sasl.login.class
|sasl.mechanism
|sasl.oauthbearer.jwks.endpoint.url
|sasl.oauthbearer.token.endpoint.url
|security.protocol
|send.buffer.bytes
|ssl.enabled.protocols
|ssl.keystore.type(JKS/PKCS12/PEM)
|ssl.protocol(TLSv1.3)
|ssl.provider
|ssl.truststore.type
|worker.sync.timeout.ms
|worker.unsync.backoff.ms
Low:
access.control.allow.methods
|access.control.allow.origin
|admin.listeners
|client.id
|config.providers
|config.storage.replication.factor
|connect.protocol(eager/compatible/sessioned)
|header.converter(SimpleHeaderConverter)
|inter.worker.key.generation.algorithm(HmacSHA256)
|inter.worker.key.size
|inter.worker.key.ttl.ms
|inter.worker.signature.algorithm(HmacSHA256)
|inter.worker.verification.algorithms
|listeners
|metadata.max.age.ms
|metadata.recovery.rebootstrap.trigger.ms
|metadata.recovery.strategy(rebootstrap/none)
|metric.reporters(JmxReporter)
|metrics.num.samples
|metrics.recording.level(INFO/DEBUG)
|metrics.sample.window.ms
|offset.flush.interval.ms
|offset.flush.timeout.ms
|offset.storage.partitions(25)
|offset.storage.replication.factor(3)
|plugin.discovery(only_scan/hybrid_warn/hybrid_fail/service_load)
|plugin.path
|reconnect.backoff.max.ms
|reconnect.backoff.ms
|response.http.headers.config
|rest.advertised.host.name
|rest.advertised.listener
|rest.advertised.port
|rest.extension.classes
|retry.backoff.max.ms
|retry.backoff.ms
|sasl.kerberos.kinit.cmd
|sasl.kerberos.min.time.before.relogin
|sasl.kerberos.ticket.renew.jitter
|sasl.kerberos.ticket.renew.window.factor
|sasl.login.connect.timeout.ms
|sasl.login.read.timeout.ms
|sasl.login.refresh.buffer.seconds
|sasl.login.refresh.min.period.seconds
|sasl.login.refresh.window.factor
|sasl.login.refresh.window.jitter
|sasl.login.retry.backoff.max.ms
|sasl.login.retry.backoff.ms
|sasl.oauthbearer.clock.skew.seconds
|sasl.oauthbearer.expected.audience
|sasl.oauthbearer.expected.issuer
|sasl.oauthbearer.header.urlencode
|sasl.oauthbearer.jwks.endpoint.refresh.ms
|sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms
|sasl.oauthbearer.jwks.endpoint.retry.backoff.ms
|sasl.oauthbearer.scope.claim.name
|sasl.oauthbearer.sub.claim.name
|scheduled.rebalance.max.delay.ms
|socket.connection.setup.timeout.max.ms
|socket.connection.setup.timeout.ms
|ssl.cipher.suites
|ssl.client.auth(required/requested/none)
|ssl.endpoint.identification.algorithm(https)
|ssl.engine.factory.class
|ssl.keymanager.algorithm
|ssl.secure.random.implementation
|ssl.trustmanager.algorithm
|status.storage.partitions(5)
|status.storage.replication.factor(3)
|task.shutdown.graceful.timeout.ms
|topic.creation.enable
|topic.tracking.allow.reset
|topic.tracking.enable
5.2 Kafka Connect — Source Connector
High:
name
|connector.class
|tasks.max
Low/Medium(全部列出):
tasks.max.enforce(deprecated)
|key.converter
|value.converter
|header.converter
|config.action.reload(none/restart)
|transforms
|predicates
|errors.retry.timeout
|errors.retry.delay.max.ms
|errors.tolerance(none/all)
|errors.log.enable
|errors.log.include.messages
|topic.creation.groups
|exactly.once.support(requested/required)
|transaction.boundary(poll/interval/connector)
|transaction.boundary.interval.ms
|offsets.storage.topic
5.3 Kafka Connect — Sink Connector
High:
name
|connector.class
|tasks.max
|topics
|topics.regex
Medium/Low(全部列出):
tasks.max.enforce(deprecated)
|key.converter
|value.converter
|header.converter
|config.action.reload(none/restart)
|transforms
|predicates
|errors.retry.timeout
|errors.retry.delay.max.ms
|errors.tolerance
|errors.log.enable
|errors.log.include.messages
|errors.deadletterqueue.topic.name
|errors.deadletterqueue.topic.replication.factor
|errors.deadletterqueue.context.headers.enable
5.4 Kafka Streams
High:
application.id
|bootstrap.servers
|num.standby.replicas
|state.dir
Medium(全部列出):
acceptable.recovery.lag
|cache.max.bytes.buffering
|client.id
|default.deserialization.exception.handler
|default.key.serde
|default.list.key.serde.inner
|default.list.key.serde.type
|default.list.value.serde.inner
|default.list.value.serde.type
|default.production.exception.handler
|default.timestamp.extractor
|default.value.serde
|deserialization.exception.handler
|max.task.idle.ms
|max.warmup.replicas
|num.stream.threads
|processing.exception.handler
|processing.guarantee(at_least_once/exactly_once_v2)
|production.exception.handler
|replication.factor
|security.protocol
|statestore.cache.max.bytes
|task.assignor.class
|task.timeout.ms
|topology.optimization(all/none/reuse.ktable.source.topics/merge.repartition.topics/single.store.self.join)
Low(全部列出):
application.server
|buffered.records.per.partition
|built.in.metrics.version(latest)
|commit.interval.ms
|connections.max.idle.ms
|default.client.supplier
|default.dsl.store(rocksDB/in_memory)
|dsl.store.suppliers.class
|enable.metrics.push
|log.summary.interval.ms
|metadata.max.age.ms
|metadata.recovery.rebootstrap.trigger.ms
|metadata.recovery.strategy(rebootstrap/none)
|metric.reporters
|metrics.num.samples
|metrics.recording.level(INFO/DEBUG/TRACE)
|metrics.sample.window.ms
|poll.ms
|probing.rebalance.interval.ms
|processor.wrapper.class
|rack.aware.assignment.non_overlap_cost
|rack.aware.assignment.strategy(none/min_traffic/balance_subtopology)
|rack.aware.assignment.tags
|rack.aware.assignment.traffic_cost
|receive.buffer.bytes
|reconnect.backoff.max.ms
|reconnect.backoff.ms
|repartition.purge.interval.ms
|request.timeout.ms
|retry.backoff.ms
|rocksdb.config.setter
|send.buffer.bytes
|state.cleanup.delay.ms
|upgrade.from(列出所有允许的历史版本或 null)
|window.size.ms
|windowed.inner.class.serde(仅供普通Consumer)
|windowstore.changelog.additional.retention.ms
六、落地建议(把“可读配置”固化到模板)
- 把上面 Worker/Source/Sink/Streams 的“示例 + 速查表”做成你们仓库的标准模板(
*.properties
+ 注释)。 - CMDB 建模:把所有键值录入,强校验 RF/分区/安全参数与 “二选一/必填/默认值” 规则。
- 一次压测:把 Streams 的
cache.max.bytes.buffering
、num.stream.threads
、max.task.idle.ms
、commit.interval.ms
作为四个旋钮做 222*2 组合测试,确定延迟/吞吐拐点。 - 观测面:给 Connect & Streams 拉一套固定看板(rebalance 次数、lag、commit latency、DLQ TPS、task error rate)。