当前位置: 首页 > news >正文

【kafka】基本命令

创建 Kafka Topic 的命令

以下是创建 Kafka Topic 的几种常用方法:

1. 使用 kafka-topics.sh 基础命令(Kafka 自带工具)

bin/kafka-topics.sh --create \--bootstrap-server <broker地址:端口> \--topic <topic名称> \--partitions <分区数> \--replication-factor <副本数>
bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic my_new_topic \--partitions 3 \--replication-factor 2

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test01_topic --partitions 2 --replication-factor 2

2. 带额外配置的创建命令

bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic my_config_topic \--partitions 5 \--replication-factor 1 \--config retention.ms=172800000 \--config segment.bytes=1073741824

3. 使用 ZooKeeper 的旧版命令(Kafka 2.2 之前版本)

bin/kafka-topics.sh --create \--zookeeper localhost:2181 \--topic legacy_topic \--partitions 2 \--replication-factor 1

4. 使用 Kafka API (Java) 创建 Topic

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);NewTopic newTopic = new NewTopic("my_api_topic", 3, (short) 1);
newTopic.configs(Map.of("retention.ms", "86400000"));CreateTopicsResult result = admin.createTopics(Collections.singletonList(newTopic));
result.all().get(); // 等待创建完成

5. 验证 Topic 是否创建成功

bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic my_new_topic --bootstrap-server localhost:9092

常用配置参数

可以在创建时通过 --config 指定:

  • retention.ms - 消息保留时间(毫秒)

  • `segment.bytes

查看 Kafka 所有 Topic 的命令

以下是几种查看 Kafka 中所有 Topic 的常用方法:

1. 使用 kafka-topics.sh 基础命令(推荐)

bin/kafka-topics.sh --list --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

2. 查看所有 Topic 的详细信息(包括分区、副本等)

bin/kafka-topics.sh --describe --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092

3. 使用 ZooKeeper 的旧版命令(Kafka 2.2 之前版本)

bin/kafka-topics.sh --list --zookeeper <zookeeper地址:端口>
bin/kafka-topics.sh --list --zookeeper localhost:2181

4. 使用 kafkacat 工具

kafkacat -L -b <broker地址:端口>
kafkacat -L -b localhost:9092

5. 使用 Kafka API (Java)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);ListTopicsResult topics = admin.listTopics();
Set<String> topicNames = topics.names().get();topicNames.forEach(System.out::println);

6. 查看包含内部 Topic 的所有 Topic(如 __consumer_offsets)

bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --exclude-internal

7. 使用正则表达式过滤 Topic

bin/kafka-topics.sh --list --bootstrap-server localhost:9092 | grep "your_pattern"

注意事项

  1. 新版本 Kafka(2.2+)推荐使用 --bootstrap-server 而不是 --zookeeper

  2. 生产环境中可能需要添加认证参数,如:

    bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --command-config admin.properties
  3. 对于大型集群,列出所有 Topic 可能需要一些时间

查看 Kafka Topic 分区信息的命令

以下是几种查看 Kafka Topic 分区信息的常用方法:

1. 使用 kafka-topics.sh 工具(Kafka 自带)

bin/kafka-topics.sh --describe --topic <topic名称> --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092

2. 查看所有 topic 的分区信息

bin/kafka-topics.sh --list --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --describe --bootstrap-server <broker地址:端口>

bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092
 

3. 使用 kafkacat 工具

kafkacat -L -b <broker地址:端口> -t <topic名称>

4. 使用 Kafka API (Java)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);DescribeTopicsResult result = admin.describeTopics(Collections.singletonList("my_topic"));
TopicDescription description = result.all().get().get("my_topic");description.partitions().forEach(partition -> {System.out.println("Partition: " + partition.partition());System.out.println("Leader: " + partition.leader());System.out.println("Replicas: " + partition.replicas());System.out.println("ISR: " + partition.isr());
});

输出信息解释

Topic: my_topic	PartitionCount: 3	ReplicationFactor: 2	Configs:Topic: my_topic	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2Topic: my_topic	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0Topic: my_topic	Partition: 2	Leader: 0	Replicas: 0,1	Isr: 0,1

其中:

  • PartitionCount: 分区总数

  • ReplicationFactor: 副本因子数

  • Leader: 负责该分区读写的主 broker

  • Replicas: 该分区的所有副本所在的 broker

  • Isr: 同步中的副本(In-Sync Replicas)

http://www.xdnf.cn/news/458101.html

相关文章:

  • Node.js 循环依赖问题详解:原理、案例与解决方案
  • 【hadoop】Kafka 安装部署
  • “傅里叶变换算法”来检测纸箱变形的简单示例
  • Android Coli 3 ImageView load two suit Bitmap thumb and formal,Kotlin(七)
  • MySQL 8.0 OCP 1Z0-908 101-110题
  • 【Conda】环境应用至JupyterLab
  • 使用java -jar命令指定VM参数-D运行jar包报错问题
  • 游戏引擎学习第281天:在房间之间为摄像机添加动画效果
  • 【虚幻引擎】UE5独立游戏开发全流程(商业级架构)
  • 什么是路由器环回接口?
  • 专项智能练习(加强题型)
  • OpenCV图像旋转原理及示例
  • IOS CSS3 right transformX 动画卡顿 回弹
  • 生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts Tasks
  • day21:零基础学嵌入式之数据结构
  • X-R1:训练医疗推理大模型
  • AD 规则的导入与导出
  • W1R3S: 1.0.1靶场
  • 10.2 LangChain v0.3全面解析:模块化架构+多代理系统如何实现效率飙升500%
  • 团队项目培训
  • 题解:P12207 [蓝桥杯 2023 国 Python B] 划分
  • 编译OpenSSL时报错,Can‘t locate IPC/Cmd.pm in @INC perl环境
  • JVM方法区核心技术解析:从方法区到执行引擎
  • 什么是 NB-IoT ?窄带IoT 应用
  • 铜墙铁壁 - 服务网格的安全之道 (Istio 实例)
  • Electron详解:原理与不足
  • 如何在多线程环境下避免快速失败异常?
  • VMware(Ubuntu系统)设置共享文件夹
  • 前端流行框架Vue3教程:16. 组件事件配合`v-model`使用
  • 阿里云ECS部署Dify