如何监控Kafka的Lag(消费延迟)?
监控Kafka消费延迟(Lag)主要有以下几种方法:
- 使用Kafka自带命令行工具
# 查看消费者组lag
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group your_group --describe
输出结果中的LAG
列显示各分区的延迟消息数,CURRENT-OFFSET
是当前消费位移,LOG-END-OFFSET
是生产端最新位移
- 通过JMX指标监控
Kafka Broker暴露的JMX指标:
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.\w]+)
下的records-lag-max
kafka.consumer:type=consumer-fetch-manager-metrics,partition=([-.\w]+),topic=([-.\w]+),client-id=([-.\w]+)
下的records-lag
- 使用Prometheus + Grafana监控
安装Kafka Exporter:
wget https://github.com/danielqsj/kafka_exporter/releases/download/v1.6.1/kafka_exporter-1.6.1.windows-amd64.tar.gz
tar -xzf kafka_exporter-1.6.1.windows-amd64.tar.gz
cd kafka_exporter-1.6.1.windows-amd64
kafka_exporter.exe --kafka.server=localhost:9092
在Grafana中导入Kafka仪表板(如ID 7589)
- 编程方式获取(Java示例)
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");try (AdminClient admin = AdminClient.create(props)) {ListConsumerGroupOffsetsResult result = admin.listConsumerGroupOffsets("your_group");Map<TopicPartition, OffsetAndMetadata> offsets = result.partitionsToOffsetAndMetadata().get();// 获取各分区最新offsetList<TopicPartition> partitions = offsets.keySet().stream().collect(Collectors.toList());Map<TopicPartition, Long> endOffsets = admin.listOffsets(partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))).all().get().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));// 计算lagoffsets.forEach((tp, offsetMeta) -> {long lag = endOffsets.get(tp) - offsetMeta.offset();System.out.printf("分区 %s Lag: %d%n", tp, lag);});
}
- 使用Confluent Control Center(需企业版)
提供可视化监控界面,可以直接查看消费者组的实时Lag指标和历史趋势
建议同时监控:
- 单个分区的最大Lag值
- 消费者组的平均Lag
- Lag随时间的变化趋势
- 设置阈值告警(如Lag持续超过1000条消息时触发告警)