当前位置: 首页 > ds >正文

kafka生产端和消费端的僵尸实例以及解决办法

目录

一 生产端僵尸

1.1 原因

1.2 问题

1.3解决办法

1.4 案例

1.4.1 案例1:生产者崩溃后重启 (同一 transactional.id)

1.4.2 案例2:短暂网络分区导致的脑裂

1.4.3 案例3:正确 - 解决僵尸

1.4.4 案例4:错误 - 无法解决僵尸

1.5 结论

二  消费端僵尸

2.1 原因

2.2 问题

2.3 解决办法

一 生产端僵尸

1.1 原因

一个生产者实例(producer)在发送消息过程中发生故障(如进程崩溃、网络隔离),但可能未被外部系统及时检测到。过一段时间后,该生产的新实例可能被重启(例如,在容器化环境中被调度器重启,或者运维手动重启)

1.2 问题

问题1:如果故障前的生产者实例在崩溃前可能已经成功发送了部分消息(但未完成事务提交),而新实例并不知道,它可能会重新发送这些消息,导致重复数据

问题2:脑裂问题: 在短暂的网络分区期间,可能出现两个都认为自己是“活动”的生产者实例同时向同一个分区写入数据,造成数据混乱。

1.3解决办法

1.kafka使用transaction id和producer epoch来解决生产者僵尸问题;epoch它本质上是该transactional.id的一个单调递增的序列号

2.Epoch比较只在相同 transactional.id 内有效:只有配置了相同 transactional.id 的生产者实例之间,它们的 Epoch 值才有比较的意义。Transaction Coordinator 会为新实例分配更高的 Epoch,并通知 Broker 拒绝任何携带旧 Epoch 的写入请求(针对该 transactional.id)。

也即:新启动的实例必须使用与旧实例完全相同的 transactional.id。这是 Epoch 比较和隔离机制生效的前提条件。

3.不同 transactional.id = 完全独立: 如果两个生产者实例使用不同的 transactional.id,无论它们的 Epoch 值是多少(即使其中一个为 1,另一个为 100),它们都会被 Kafka 视为两个完全独立、互不相干的生产者。Broker 不会对它们进行Epoch比较或相互隔离。它们可以同时向同一个分区发送消息(尽管可能因 Leader 处理顺序导致消息交叉),但 Kafka 不保证它们之间的顺序或原子性。

1.4 案例

1.4.1 案例1:生产者崩溃后重启 (同一 transactional.id)

旧实例崩溃,事务可能处于中间状态(消息已发送但未提交)。

新实例启动,向 Transaction Coordinator 初始化事务。

Transaction Coordinator 分配一个更高的新 Epoch (e.g., old=1, new=2),并隔离旧Epoch=1。

Transaction Coordinator 中止由旧实例启动的未完成事务(标记为 ABORT),确保那些部分写入的消息不会被消费者读取(在 read_committed 隔离级别下)。

新实例(Epoch=2)开始新的事务。它不会重复发送旧实例可能已发送过的消息,因为应用逻辑知道事务未提交,需要重新处理业务逻辑并发送新消息。

如果旧实例“僵尸复活”并尝试发送消息,Broker 会检查其 Epoch=1 < 当前最新 Epoch=2,拒绝写入。

1.4.2 案例2:短暂网络分区导致的脑裂

两个实例(可能是由于网络分区误判)都认为自己是活跃的,使用同一个 transactional.id。

其中一个实例(假设是重启后的新实例)成功联系到 Transaction Coordinator,获得了更高的 Epoch。

Transaction Coordinator 隔离了旧 Epoch。

当网络恢复时,持有旧 Epoch 的实例的任何写入请求都会被 Broker 拒绝(InvalidProducerEpoch)。

只有持有最新 Epoch 的实例的写入有效,避免了数据冲突。

1.4.3 案例3:正确 - 解决僵尸

生产者 A (Instance 1): transactional.id = "prod-app-1", Epoch = 1 (由 TC 分配)。

生产者 A 崩溃。

生产者 A (Instance 2 - 新实例) 启动,配置 transactional.id = "prod-app-1"。

Transaction Coordinator (TC) 检测到 "prod-app-1" 的新会话。

TC 为 "prod-app-1" 分配 新的 Epoch = 2。

TC 通知相关 Broker:对于 "prod-app-1",最新 Epoch 是 2,拒绝 Epoch <= 1 的请求。

如果旧实例 (Instance 1) 的进程“僵尸复活”并尝试发送消息 (携带 Epoch=1),Broker 会检查:"prod-app-1" 的当前最新 Epoch=2 > 请求中的 Epoch=1 → 拒绝写入 (InvalidProducerEpoch)。

新实例 (Instance 2) 使用 Epoch=2 可以正常写入。

1.4.4 案例4:错误 - 无法解决僵尸

生产者 A (Instance 1): transactional.id = "prod-app-1", Epoch = 1。

生产者 A 崩溃。

生产者 A (Instance 2 - 新实例) 启动,但错误地配置了 transactional.id = "prod-app-1-backup" (一个不同的 ID)。

TC 将 "prod-app-1-backup" 视为一个全新的、独立的生产者。

TC 为 "prod-app-1-backup" 分配 新的 Epoch = 1 (或其他初始值,与 "prod-app-1" 的 Epoch 无关)。

Broker 记录:

对于 "prod-app-1": 最新 Epoch = 1 (对应旧僵尸实例)。

对于 "prod-app-1-backup": 最新 Epoch = 1 (对应新实例)。

如果旧实例 (Instance 1) 的进程“僵尸复活”并尝试发送消息 (携带 transactional.id="prod-app-1", Epoch=1),Broker 检查:"prod-app-1" 的最新 Epoch=1 == 请求中的 Epoch=1 → 允许写入。

新实例 (Instance 2) 使用 "prod-app-1-backup", Epoch=1 也可以写入。

结果:两个实例同时写入相同的分区,造成数据混乱。Fencing 机制完全失效,因为新实例没有使用相同的 transactional.id 来“声明接管”。其实本质上是两个不同的transaction id,是两个独立的事务,并不相关,至于写入相同分区的相同内容可以使用去重,幂等机制来解决。

1.5 结论

transactional.id, Producer Epoch, 和 Kafka 事务协议共同构成了 Kafka 保障生产者高可用、防止僵尸实例破坏数据一致性的基石。

DeepSeek

二  消费端僵尸

2.1 原因

已经从物理上或逻辑上失效(如进程崩溃、网络隔离、长时间 GC 停顿、负载过高无响应)但 Kafka 集群(特别是 Group Coordinator)暂时还未将其识别为失效并从消费者组中移除的消费者实例 

2.2 问题

分区分配不均衡/浪费: 新启动的健康消费者无法分配到这些僵尸实例"霸占"的分区,导致部分分区无人消费,消息堆积。

2.3 解决办法

1.心跳机制和超时:健康的消费者会按照 心跳机制(heartbeat.interval.ms )的间隔定期向消费组内的协调者( Group Coordinator) 发送心跳。如果连续两次发送的心跳的间隔超时(超过session.timeout.ms),协调者它就判定该消费者已经死亡(失效)。

2.再平衡:再平衡会将原本分配给僵尸实例的分区重新分配给组内存活的其他健康消费者,解决消息堆积问题。

3.给拉取消费消息的方法poll()处理消费卡顿问题设置规定时间( max.poll.interval.ms)

如果消费者在规定的时间内( max.poll.interval.ms) 时间内没有再次调用 poll(),Group Coordinator 会认为该消费者处理能力不足或卡住了。Group Coordinator 会主动将该消费者从组中移除,触发再平衡

4.监控告警:及时进行人工干预。

https://chat.deepseek.com/a/chat/s/6e3234e3-fd76-4000-9326-01780a2fdb48

http://www.xdnf.cn/news/16046.html

相关文章:

  • `MYSQL`、`MYSQL_RES` 和 `MYSQL_FIELD`的含义与使用案例
  • 【硬件】GalaxyTabPro10.1(SM-T520)刷机/TWRP/LineageOS14/安卓7升级全过程
  • 浅谈 Vue 的双向数据绑定
  • Java 字符集(Charset)详解:从编码基础到实战应用,彻底掌握字符处理核心机制
  • 【数据结构】双向循环链表的实现
  • 基于机器视觉的迈克耳孙干涉环自动计数系统设计与实现
  • Node.js:函数、路由、全局对象
  • Docker Compose 配置
  • 如何5分钟快速搭建智能问答系统
  • 详解如何解决Mysql主从复制延迟
  • LINUX720 SWAP扩容;新增逻辑卷;逻辑卷扩容;数据库迁移;gdisk
  • Ajax简单介绍及Axios请求方式的别名
  • 复杂度+包装类型+泛型
  • 统计与大数据分析和数字经济:专业选择指南
  • spring-cloud使用
  • ptmalloc(glibc-2.12.1)整体结构
  • Linux:线程控制
  • 基于SpringBoot+MyBatis+MySQL+VUE实现的医疗挂号管理系统(附源码+数据库+毕业论文+答辩PPT+项目部署视频教程+项目所需软件工具)
  • LeetCode 刷题【8. 字符串转换整数 (atoi), 9. 回文数】
  • 学成在线项目
  • 手推OpenGL相机的正交投影矩阵和透视投影矩阵(附源码)
  • Unity 新旧输入系统对比
  • 开发工具缓存目录
  • Redis通用常见命令(含面试题)
  • [数据库]Neo4j图数据库搭建快速入门
  • 设备健康管理实施案例:中讯烛龙预测性维护系统的实战应用
  • 基于bert-lstm对微博评论的情感分析系统设计与实现
  • 新版 Java SE 集合框架 Map 篇
  • Pycharm的Terminal打开后默认是python环境
  • Kafka 在分布式系统中的关键特性与机制深度解析