当前位置: 首页 > backend >正文

kafka的shell操作

Kafka 提供了丰富的 shell 命令工具,位于 Kafka 安装目录的 bin/ 目录下(Windows 系统为 bin/windows/)。这些命令用于管理主题、生产者、消费者、分区等核心组件。以下是常用的 Kafka shell 操作大全:

一、主题(Topic)操作

1. 创建主题
# 创建一个名为 test-topic 的主题,3个分区,2个副本
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--create \--topic test-topic \--partitions 3 \--replication-factor 2

  • --bootstrap-server:指定 Kafka 集群地址(broker 列表)
  • --partitions:分区数量(提高并行度)
  • --replication-factor:副本数量(提高可用性,不能超过 broker 数量)
2. 查看所有主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--list
3. 查看主题详情
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--describe \--topic test-topic

输出包含分区数、副本分布、ISR(同步副本)等信息。

4. 修改主题(仅支持分区数增加)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--alter \--topic test-topic \--partitions 5  # 只能增加,不能减少
5. 删除主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--delete \--topic test-topic
  • 需确保 server.properties 中 delete.topic.enable=true(默认开启)

二、生产者(Producer)操作

1. 启动控制台生产者
# 向 test-topic 发送消息(输入内容后按回车发送)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \--topic test-topic
2. 带键值对的生产者
# 发送格式为 "key:value" 的消息(需指定分隔符)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \--topic test-topic \--property "parse.key=true" \--property "key.separator=:"

输入示例:user1:hello(key 为 user1,value 为 hello)

三、消费者(Consumer)操作

1. 启动控制台消费者(从最新消息开始消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic
2. 从最早消息开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic \--from-beginning
3. 消费指定分区的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic \--partition 0  # 消费第0个分区
4. 带消费组的消费者
# 加入名为 test-group 的消费组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic \--group test-group

四、消费组(Consumer Group)操作

1. 查看所有消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--list
2. 查看消费组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe \--group test-group

输出包含:

  • 消费组 ID
  • 主题名称
  • 分区分配情况
  • 当前消费偏移量(CURRENT-OFFSET)
  • 日志末尾偏移量(LOG-END-OFFSET)
  • 未消费消息数(LAG)
3. 重置消费组偏移量
# 将 test-group 对 test-topic 的偏移量重置为最早
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--group test-group \--topic test-topic \--reset-offsets \--to-earliest \--execute

其他重置选项:

  • --to-latest:重置到最新
  • --to-offset <offset>:重置到指定偏移量
  • --shift-by <number>:相对当前偏移量移动(正数向前,负数向后)

五、分区(Partition)操作

1. 查看分区 Leader 分布
# 结合主题详情查看
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--describe \--topic test-topic | grep "Leader:"

六、其他常用操作

1. 查看 broker 元数据
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
2. 查看主题消息数量(近似值)
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \--bootstrap-server localhost:9092 \--topic test-topic \--time -1  # -1 表示最新偏移量,-2 表示最早偏移量

计算总消息数:各分区最新偏移量之和。

七、Windows 系统注意事项

  • 所有 .sh 命令替换为 .bat(如 kafka-topics.bat
  • 路径分隔符使用 \ 而非 /

以上命令覆盖了 Kafka 日常运维的核心场景,实际使用时需根据集群地址(--bootstrap-server)和具体需求调整参数。

http://www.xdnf.cn/news/16177.html

相关文章:

  • 多源信息融合智能投资【“图神经网络+强化学习“的融合架构】【低配显卡正常运行】
  • MapStruct类型转换接口未自动注入到spring容器中
  • 快速将前端得依赖打为tar包(yarn.lock版本)并且推送至nexus私有依赖仓库(笔记)
  • 《C++》面向对象编程--类(下)
  • LLM中的位置嵌入矩阵(Position Embedding Matrix)是什么
  • matrix-breakout-2-morpheus靶机通关教程
  • DBA常用数据库查询语句
  • Python爬虫案例:Scrapy+XPath解析当当网网页结构
  • Lua(模块与包)
  • 【docker | 部署 】Jetson Orin与AMD平台容器化部署概述
  • Java 实现 B/S 架构详解:从基础到实战,彻底掌握浏览器/服务器编程
  • 前端性能新纪元:Rust + WebAssembly 如何在浏览器中实现10倍性能提升(以视频处理为例)
  • 【RAG优化】RAG应用中图文表格混合内容的终极检索与生成策略
  • VUE的学习
  • iOS WebView 加载失败与缓存刷新问题排查实战指南
  • 医疗行业新变革:AR 培训系统助力手术培训精准高效​
  • Oracle国产化替代:一线DBA的技术决策突围战
  • 华为OpenStack架构学习9篇 连载—— 01 OpenStack架构介绍【附全文阅读】
  • 【C++】使用箱线图算法剔除数据样本中的异常值
  • Vue 项目中的组件引用如何实现,依赖组件间的数据功能交互及示例演示
  • CIRL:因果启发的表征学习框架——从域泛化到奖励分解的因果革命
  • Spring MVC中常用注解_笔记
  • 【Linux】linux基础开发工具(一) 软件包管理器yum、编辑器vim使用与相关命令
  • MCU(微控制器)中的高电平与低电平?
  • 实战演练11:生成式对话机器人(Bloom)
  • 输电线路微气象在线监测装置:保障电网安全的科技屏障
  • [网安工具] 自动化威胁检测工具 —— D 盾 · 使用手册
  • 多模态LLM/Diffusion推理加速
  • 11.2 yolov8用自己的数据集训练语义分割模型
  • Android Camera createCaptureSession