当前位置: 首页 > news >正文

Kafka运维实战 15 - kafka 重设消费者组位移入门和实战【实战】

💻 Kafka运维实战 (15篇)
  • 📝Kafka运维实战 15 - kafka 重设消费者组位移入门和实战【实战】
  • 📝Kafka运维实战 14 - kafka消费者组消费进度(Lag)深入理解【实战】
  • 📝Kafka运维实战 13 - kafka 动态调整Broker, Topic的配置【实战】
  • 📝Kafka运维实战 12 - kafka主题管理详解【实战】
  • 📝Kafka运维实战 11 - kafka查看消息的具体内容【实战】
  • 📝Kafka运维实战 10 -kafka 生产和消费 性能测试工具【实战】
  • 📝Kafka运维实战 09 - kafka 生产消息和消费消息用法【实战】
  • 📝Kafka运维实战 08 - kafka 3.7脚本工具最全整理汇总【建议收藏】
  • 📝Kafka运维实战 07 - kafka 三节点集群部署(混合模式)(KRaft 版本3.7.0)
  • 📝Kafka运维实战 06 - kafka 单机部署指南(混合模式)(KRaft 版本3.7.0)
  • 📝Kafka运维实战 05 - kafka 消费者组和重平衡(Rebalance)
  • 📝Kafka运维实战 04 - Kafka 控制器(Controller)详解:架构、原理与实战
  • 📝Kafka运维实战 03 - Kafka 配置参数详解:ZooKeeper 模式与 KRaft 模式对比
  • 📝Kafka运维实战 02 - 深入理解 Kafka主题、分区与副本的协同机制
  • 📝Kafka运维实战 01 - Kafka入门和基础配置

目录

    • 什么是重设消费者组位移
    • 重设消费者位移 的场景
    • 重设位移策略
      • 1. 基于位置的策略
        • (1)重置到最早位置(Earliest)
        • (2)重置到最新位置(Latest)
        • (3)重置到指定偏移量(Specific Offset)
        • (4)相对当前位移偏移(Shift by Offset)
      • 2. 基于时间的重设策略
        • (1)重置到指定时间点(DateTime)
        • (2)相对当前时间偏移(Shift by Duration)
      • 策略选择建议
    • 实验演示过程
      • Earliest 策略 演示
        • 创建Topic service-logs
        • 生产者,发送消息
        • 消费者,接收消息
        • 消费者消费了几个消息后,查看消费者消费进度
        • 现在重设消费者位移到–to-earliest
        • 再次启动消费者进程
      • Shift-By-N 策略
        • 向前消费10条 --shift-by -10
        • 消费者再次消费

什么是重设消费者组位移

传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。

反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。

重设消费者位移 的场景

重设消费者位移的场景主要包括以下几种:

  • 修正消费逻辑错误:当发现消费者的消费逻辑存在错误,导致消息处理结果不正确时,需要重设消费者位移。例如,在数据统计场景中,由于消费逻辑错误,导致统计结果不准确,修正逻辑后,需要重设位移,重新消费历史消息以获取正确的统计结果。
  • 业务需求变更:业务需求发生变化,可能需要重新处理历史数据。比如,需要对过去一段时间内的订单数据进行重新计算,或者将数据补写到新的下游存储系统中,此时可以通过重设消费者位移来实现重新消费历史订单数据。
  • 跳过异常消息:当消费者遇到无法处理的异常消息时,如消息格式损坏(corrupted)或消费逻辑抛出异常,可以通过重设消费者位移来跳过这些无效消息,继续从正确的位置开始消费。例如,在处理一批文本消息时,其中某一条消息的格式不符合预期,导致消费程序抛出异常,此时可以重设位移,跳过该异常消息,继续处理后续消息。
  • 动态调整消费进度:根据业务需求,需要灵活调整消费起点。例如,需要消费近30分钟的数据,或者从最新位置开始消费新产生的消息,都可以通过重设消费者位移来实现。
  • 回滚消费进度:在对消费者代码进行变更后,如果发现变更后的代码存在问题,需要回滚到之前的代码版本,同时也需要将消费位移回滚到历史位置,以确保能够继续从正确的位置进行消费。

重设位移策略

不论是哪种设置方式,重设位移大致可以从两个维度来进行。

  • 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。
  • 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。

1. 基于位置的策略

(1)重置到最早位置(Earliest)
  • 操作方式:将位移设置为分区的最小偏移量(0 或分区创建后的第一条消息位置)。
  • 适用场景
    • 首次消费某个主题,需要全量消费历史数据(如初始化数据仓库、全量同步历史数据)。
    • 消费逻辑修复后,需要重新处理所有历史消息(如之前因逻辑错误导致数据处理不完整)。
  • 命令示例
    # 对所有主题重置到最早位置
    bin/kafka-consumer-groups.sh --bootstrap-server broker:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute
    
  • 注意:如果主题数据量大,可能导致消费者压力骤增,需评估性能影响。
(2)重置到最新位置(Latest)
  • 操作方式:将位移设置为分区的最大偏移量(即下一条待生产的消息位置,跳过所有未消费的历史消息)。
  • 适用场景
    • 只关心未来的新消息,不需要处理历史数据(如实时监控、日志告警等场景)。
    • 历史数据已过期或无需处理,希望消费者快速跟上最新进度。
  • 命令示例
    # 对特定主题重置到最新位置
    bin/kafka-consumer-groups.sh --bootstrap-server broker:9092 --group my-group --reset-offsets --to-latest --topic my-topic --execute
    
(3)重置到指定偏移量(Specific Offset)
  • 操作方式:手动指定某个分区的具体偏移量(如 1000),适用于精确控制消费起点。
  • 适用场景
    • 已知某条消息存在问题,需要从该消息的前一个位置重新消费(如跳过异常消息后从正确位置继续)。
    • 基于业务日志定位到需要重新处理的起始点(如某时间点后的数据处理异常,需从该时间点对应的偏移量开始)。
  • 命令示例
    # 对特定主题的特定分区重置到偏移量 1000
    bin/kafka-consumer-groups.sh --bootstrap-server broker:9092 --group my-group --reset-offsets --to-offset 1000 --topic my-topic:0 --execute
    
  • 注意:需先通过 --describe 命令查询当前分区的偏移量范围,避免指定无效值(如超过最大偏移量)。
(4)相对当前位移偏移(Shift by Offset)
  • 操作方式:在当前位移的基础上,增加或减少指定数量的偏移量(如 +100 表示向前跳过100条,-50 表示向后回溯50条)。
  • 适用场景
    • 跳过少量异常消息(如某条消息格式错误,向前调整1条跳过它)。
    • 回溯少量数据进行重新处理(如最近10条消息处理失败,向后调整10条重消费)。
  • 命令示例
    # 向后回溯50条消息(当前位移-50)
    bin/kafka-consumer-groups.sh --bootstrap-server broker:9092 --group my-group --reset-offsets --shift-by -50 --topic my-topic --execute
    
  • 注意:调整后的值不能小于 0 或大于分区最大偏移量,否则会被自动修正为有效范围。

2. 基于时间的重设策略

通过指定时间点,将位移重置到该时间点对应的消息位置,适用于按时间范围重新消费的场景。

(1)重置到指定时间点(DateTime)
  • 操作方式:根据时间字符串(如 2024-07-01T12:00:00.000),自动计算每个分区中该时间点之后的第一条消息的偏移量,并将位移设为该值。
  • 适用场景
    • 业务需要重新处理某个时间范围内的数据(如每日凌晨3点到5点的数据处理失败,需从3点开始重消费)。
    • 数据回溯需求(如补算过去7天的统计指标)。
  • 命令示例
    # 重置到 2024-07-01 12:00:00 之后的消息
    bin/kafka-consumer-groups.sh --bootstrap-server broker:9092 --group my-group --reset-offsets --to-datetime 2024-07-01T12:00:00.000 --topic my-topic --execute
    
  • 注意:时间需精确到毫秒,且 Kafka 会找到时间点之后的第一条消息(非严格等于该时间点的消息)。
(2)相对当前时间偏移(Shift by Duration)
  • 操作方式:基于当前位移,向前或向后调整一定时间(如 PT1H 表示1小时),计算目标位移。
  • 适用场景
    • 需要重新消费最近一段时间的数据(如过去1小时的数据因处理错误需重算)。
    • 临时调整消费窗口(如向前回溯30分钟的数据进行校验)。
  • 命令示例
    # 向前调整1小时(重新消费1小时前到现在的消息)
    bin/kafka-consumer-groups.sh --bootstrap-server broker:9092 --group my-group --reset-offsets --shift-by-duration PT1H --topic my-topic --execute
    
  • 时间格式:采用 ISO 8601 标准(如 PT5M 表示5分钟,PT1H30M 表示1小时30分钟)。

策略选择建议

  1. 全量重消费:选 --to-earliest
  2. 只消费新消息:选 --to-latest
  3. 按时间范围重消费:选 --to-datetime--shift-by-duration
  4. 精确调整少量消息:选 --shift-by--to-offset
  5. 代码动态调整:用 seek()offsetsForTimes() 结合业务逻辑。

实验演示过程

Earliest 策略 演示

创建Topic service-logs
[root@test-10 kafka_2.13-3.7.0]# ./bin/kafka-topics.sh --create --bootstrap-server 192.168.37.10:9092 --topic  service-logs --partitions 3 --replication-factor 1
Created topic service-logs.
生产者,发送消息
./bin/kafka-console-producer.sh --bootstrap-server 192.168.37.10:9092  --topic service-logs --property "parse.key=true" --property "key.separator=:"

在这里插入图片描述

消费者,接收消息
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.37.10:9092 --topic  service-logs --group group-service-logs   --property "print.key=true"

在这里插入图片描述

消费者消费了几个消息后,查看消费者消费进度

在这里插入图片描述

现在重设消费者位移到–to-earliest
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.37.10:9092 --group group-service-logs --reset-offsets --to-earliest --all-topics --execute

注意要停止消费者进程
在这里插入图片描述
再次查询消费者组情况,发现Lag 由 0 变成了 16

再次启动消费者进程

在这里插入图片描述
可以看到,消费者将之前所有的消息都再次消费了一遍。

Shift-By-N 策略

向前消费10条 --shift-by -10

在这里插入图片描述
通过上面图,可以看到Lag 又 增加了 10条

消费者再次消费

在这里插入图片描述

http://www.xdnf.cn/news/1184509.html

相关文章:

  • 时间和空间复杂度
  • 八股文之JVM
  • DNS 服务正反向解析与 Web 集成实战:从配置到验证全流程
  • Day 21: 常见的降维算法
  • 专题:2025电商增长新势力洞察报告:区域裂变、平台垄断与银发平权|附260+报告PDF、原数据表汇总下载
  • 小米8(dipper)刷入kernelSU内核root定制rom系统教程以及安装LSPosed模块
  • Windows-WSL-Docker端口开放
  • FunASR实时多人对话语音识别、分析、端点检测
  • NLP验证自动化脚本优化
  • 从热点到刚需:SmartMediaKit为何聚焦B端视频系统建设?
  • 【lucene】AttributeSource概述
  • Ethereum:Geth + Clef 本地开发环境,如何优雅地签名并发送一笔以太坊交易?
  • Linux 内存深度剖析:栈与堆的底层机制与实战指南
  • 汽车免拆诊断案例 | 2010款奔驰E200 CGI车EPS OFF灯异常点亮
  • MCP 与传统集成方案深度对决:REST API、GraphQL、gRPC 全方位技术解析
  • Linux725 磁盘阵列RAID0 RAID1
  • Linux库——库的制作和原理(1)_回顾动静态库、制作使用库
  • docker-compose:未找到命令的检查步骤和修复
  • 从数据孤岛到融合共生:KES V9 2025 构建 AI 时代数据基础设施
  • 65.第二阶段x64游戏实战-替换游戏lua打印可接任务
  • 【论文阅读】-《GenAttack: Practical Black-box Attacks with Gradient-Free Optimization》
  • 人工智能概述
  • 智慧电视:开启养老新时代
  • Linux 设备驱动模型
  • LLM:Day3
  • 计算机算术4-整形乘法
  • UE5多人MOBA+GAS 30、技能升级机制
  • Android补全计划 DrawerLayout使用
  • Chromadb 1.0.15 索引全解析:从原理到实战的向量检索优化指南
  • 飞行控制领军者 | 边界智控携高安全级飞控系统亮相2025深圳eVTOL展