当前位置: 首页 > news >正文

kafka records deletion policy

kafka records deletion policy

Time-based retention

就是保存partition records 一定时间后删除。
这个是默认的策略,需要配置 cleanup.policy=deleteretention.ms
retention.ms 的默认值是 604800000 ms (7 days)。 这些是默认设置。

By default, retention is not based on message timestamp 除非明确设置为CreateTime, kafka uses the log append time.

CreateTime: The timestamp set by the producer when it creates the message.Embedded in the message when sent to Kafka.

LogAppendTime: The timestamp is set by the Kafka broker at the moment the message is written to the log.

Size-based retention

就是根据partition 消耗的空间来保存records,可以通过配置 retention.bytes 来配置。

Log compaction

Compaction is a key-based retention mechanism. The goal of compaction is to keep the most recent value for a given key. However, historical data will be lost, so it may not always be the best choice.

Compaction also provides a way to completely remove a key, by appending an event with that key and a null value. If this null value, also known as a tombstone, is the most recent value for that key, then it will be marked for deletion along with any older occurrences of that key. This could be necessary for things like GDPR compliance.

Enable 这个feature 需要设置 cleanup.policy=compact 然后配置其他参数控制什么时候触发.
下面其中任一条件满足, 一个topic partition 就会触发 compaction.

1. dirty/ total > min.cleanable.dirty.ratio and
message timestamp < current time - min.compaction.lag.ms2. message timestamp > current time - min.compaction.lag.ms

Normally, compaction will be triggered when the ratio of dirty data to total data reaches the threshold set by the min.cleanable.dirty.ratio configuration, which defaults to 50 percent. However, there are a couple of other configurations that can affect this decision.

To prevent overly aggressive cleaning, we can set the value of min.compaction.lag.ms. With this set, compaction won’t start more frequently than its value even if the dirty/total ratio has been exceeded. This provides a minimum period of time for applications to see an event prior to its deletion.

On the other end of the spectrum, we can set max.compaction.lag.ms, which will trigger a compaction once the time limit has been reached, regardless of the amount of dirty data.

理解compaction 要了解 kafka partition 中的 dirty segmentcleaned segmentactive segment .

这是个非常关键的问题,理解 Kafka 中的 dirty segmentcleaned segmentactive segment 有助于你深入掌握 Kafka 的日志管理和压缩机制。

Kafka 中的三种 Segment 状态概念

Kafka 中每个 分区 是由多个 log segment 文件 组成的,按时间或大小滚动创建。Kafka 对 segment 有不同的角色分类:


✅ 1. Active Segment(活跃段)
  • 当前正在被写入的 segment 文件。
  • 始终是 最新的一个 segment
  • 永远不会被压缩或删除(直到被“滚动”)。
  • 文件名示例:00000000000000000000.log

📌 特征:

  • 只写不压缩。
  • 随着写入增长,超过 segment.mssegment.bytes 后会被关闭,变成“非活跃段”。

✅ 2. Dirty Segment(脏段)
  • 指的是自上次压缩以后还没有被清理过的 segment。
  • 这些段中可能包含重复 key、已删除 key 的消息。
  • Kafka 将这些段视为 “需要压缩的候选段”

📌 判断方式:

  • Segment 是否包含在 logEndOffsetlastCleanOffset 之间。
  • 只要 segment 中的数据 尚未经过 log compaction,就是 dirty 的。

✅ 3. Cleaned Segment(已压缩段)
  • 已被 LogCleaner 线程处理过的 segment。
  • 每个 key 只保留了最新的 value(或 tombstone,如果删除未过期)。
  • 清理后 Kafka 会生成新的 segment,替换旧的 dirty segment。

📌 特征:

  • 被视为“干净”的,无需再次压缩。
  • 压缩后写入新的 .cleaned 文件(内部临时文件),最终替换原始 segment。

🧠 三者的关系图示(简化)
[segment-0]  ← Cleaned Segment
[segment-1]  ← Cleaned Segment
[segment-2]  ← Dirty Segment
[segment-3]  ← Dirty Segment
[segment-4]  ← Active Segment
  • segment-0/1:已经被压缩,Kafka 只保留了每个 key 的最新值。
  • segment-2/3:等待压缩,LogCleaner 会根据 dirty ratio 进行处理。
  • segment-4:正在写入,尚未封存,永远不会被压缩。

🔍 如何查看当前 segment 状态?

Kafka 没有直接命令输出这些状态,但你可以通过查看 Kafka 日志文件和运行时指标判断:

  • 查看 broker 数据目录(如 /kafka-logs/your-topic-0/),文件以 offset 命名。

  • 监控指标中有:

    • log-cleaner-cleanerManager-cleaner-0 线程的运行状态
    • kafka.log:type=LogCleaner,name=MaxBufferUtilization
    • kafka.log:type=Log,name=LogEndOffset

🧠 总结对比表
名称是否写入?是否压缩?说明
Active Segment✅ 是❌ 否当前活跃写入的 segment
Dirty Segment❌ 否✅ 是可压缩的旧 segment
Cleaned Segment❌ 否❌ 否已压缩过的数据段

kafka records deletion 注意点

1. Data Is Deleted Even If Not Consumed

  • Kafka does not track consumer state when deleting messages.
  • If a consumer is slow or offline, it may miss messages if they are deleted before being read.

Solution:

  • Ensure consumers keep up.
  • Use longer retention.ms if data must remain available longer.
  • Consider log compaction if you need retention per key.

2. Retention Applies at the Partition Level

  • retention.ms and retention.bytes apply per partition, not per topic.
  • This can lead to imbalanced data storage if partition sizes differ.

Solution:

  • Use good partitioning logic to distribute data evenly.

3. Segments Are Only Deleted After Being Fully Aged

  • Kafka deletes entire log segments, not individual records.
  • If a segment contains even one unexpired message, the whole segment stays.

Solution:

  • Tune segment.ms or segment.bytes for faster cleanup granularity.

4. Timestamps Must Be Handled Properly

Kafka determines “message age” using:

  • Log append time (default), or
  • Create time (if explicitly configured).

Misconfigured timestamp settings can lead to:

  • Messages being deleted too early or not at all.

Solution:

log.message.timestamp.type=CreateTime

Use this only if producers set timestamps reliably.


5. Disk Space Pressure Is Still Possible

  • If producers publish too much data within the retention window, Kafka may run out of disk space before cleanup triggers.

Solution:

  • Set retention.bytes in addition to retention.ms for dual control.
  • Monitor broker disk usage carefully.

6. No Per-Consumer Retention

  • Unlike traditional queues, Kafka cannot retain data per consumer.

Solution:
Use tools like Kafka Connect, Kafka Streams, or external state stores if per-consumer retention is needed.


7. Retention Delay

  • The deletion process is handled by Kafka’s log cleaner thread, which runs periodically.
  • There may be a delay after retention.ms is reached before data is actually deleted.

Solution:
Expect a slight lag in cleanup; don’t rely on exact timing.

http://www.xdnf.cn/news/342001.html

相关文章:

  • 如何设置内网映射端口到外网访问?哪些软件可以进行端口映射?
  • 2025.05.07-携程春招笔试第二题
  • flutter build apk出现的一些奇怪的编译错误
  • K8s网络从0到1
  • 《易语言学习大全》
  • k8s术语之DaemonSet
  • [python] 函数基础
  • 深入解析asyncio的实现与应用
  • C#简易Modbus从站仿真器
  • 如何将 Build at、Hash 和 Time git 的 Tag 号等构建信息,自动写入一个 JSON 文件
  • sql serve 多表联合查询,根据一个表字段值动态改变查询条件
  • 【Dify系列教程重置精品版】第七章:在Dify对话中显示本地图片之FastAPI与Uvicorn
  • PCL点云按指定方向进行聚类(指定类的宽度)
  • mission planner烧录ardupilot固件报错死机
  • ESP32开发之freeRTOS的互斥量
  • 网络协议之DHCP和PXE分析
  • QT中多线程的实现
  • Rust包、crate与模块管理
  • 领域驱动设计(DDD)解析
  • 2025年4月AI算力领域热点事件全景报告
  • 配置Hadoop集群环境-使用脚本命令实现集群文件同步
  • 手撕基于AMQP协议的简易消息队列-1(项目介绍与开发环境的搭建)
  • uniapp|实现多终端聊天对话组件、表情选择、消息发送
  • onlyoffice 源码调试指南-ARM和x86双模式安装支持
  • 前端面试宝典---JavaScript import 与 Node.js require 的区别
  • uni-pages-hot-modules插件:uni-app的pages.json的模块化及模块热重载
  • JavaScript基础 (二)
  • 苍穹外卖(数据统计-图形报表)
  • QtGUI模块功能详细说明, 字体和文本渲染(四)
  • 单片机-STM32部分:8、外部中断