当前位置: 首页 > backend >正文

Kafka 命令行操作与 Spark-Streaming 核心编程总结

一、Kafka 命令行操作详解

1.创建 Topic

命令格式:

kafka-topics.sh --create --zookeeper <zk节点列表> --topic <主题名> --partitions <分区数> --replication-factor <副本数>

参数说明:

分区数(partitions):必须指定,决定数据分片存储的并行度。

副本数(replication-factor):必须指定,不能超过 Broker 节点总数,用于数据冗余和高可用。

数据存储:创建后在 Kafka 数据目录生成以主题名-分区编号命名的文件夹(如test1-0)。

2.查看所有 Topic

命令:

kafka-topics.sh --list --zookeeper <zk节点列表>

3.查看 Topic 详细信息

命令:

bash

kafka-topics.sh --describe --zookeeper <zk节点列表> --topic <主题名>

ISR(In-Sync Replicas):与 Leader 同步的副本,可提供服务。

AR(Assigned Replicas):分区的所有副本。

4.删除 Topic

命令:

kafka-topics.sh --delete --zookeeper <zk节点列表> --topic <主题名>

5.生产数据

命令格式:

kafka-console-producer.sh --broker-list <Broker节点列表> --topic <主题名>

说明:数据以追加日志形式写入分区,每条数据仅存在于一个分区,但所有副本均存储数据。

6.消费数据

默认从最新位置消费:

kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表>

从头开始消费:

kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表> --from-beginning

指定消费组(Group ID):

kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表> --consumer-property group.id=<组名>

特性:同一 Topic 的数据只能被同一 Group ID 的 Consumer 消费一次(通过偏移量记录消费进度)。

二、Spark-Streaming 核心编程:Kafka 数据源集成

1.Receiver API 与 Direct API 对比

Receiver API:

需要专用 Executor 接收数据,可能因接收与计算速度不匹配导致内存溢出,适用于早期版本。

Direct API(推荐):

计算 Executor 主动拉取 Kafka 数据,速度可控,适用于 Kafka 0.10 + 版本。

2.Kafka 0-10 Direct 模式实现步骤

(1)打开虚拟机zookpeer与kafka集群

(2)导入依赖

(3)编写代码

    (4) 开启Kafka生产者,产生数据

    kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

    (5)运行程序,接收Kafka生产的数据并进行相应处理

    (6)查看消费进度

    http://www.xdnf.cn/news/1687.html

    相关文章:

  1. Python3 基础:变量、数据类型和基本运算
  2. 驱动开发系列53 - 一个OpenGL应用程序是如何调用到驱动厂商GL库的
  3. 济南国网数字化培训班学习笔记-第二组-5节-输电线路设计
  4. vue3--手写手机屏组件
  5. 【工具】使用 MCP Inspector 调试服务的完全指南
  6. 关于nginx,负载均衡是什么?它能给我们的业务带来什么?怎么去配置它?
  7. 服务器的演进与应用:从物理设备到云端革命
  8. 前端出现的一些新技术或者升级的技术汇总
  9. Git多人协作与企业级开发模型
  10. 两段文本比对,高亮出差异部分
  11. 【多智能体系统】特点解析与高效组织策略
  12. Milvus(6):Collection 管理分区、管理别名
  13. 深度解析 Kubernetes 配置管理:如何安全使用 ConfigMap 和 Secret
  14. 字典与集合——测试界的黑话宝典与BUG追捕术
  15. C语言编程--16.删除链表的倒数第n个节点
  16. 触觉智能RK3506核心板,工业应用之RK3506 RT-Linux实时性测试
  17. arm64适配系列文章-第九章-arm64环境上sentinel的部署
  18. 【mysql】windows mysql命令
  19. Verilog 语法 (一)
  20. springboot在eclipse里面运行 run as 是Java Application还是 Maven
  21. Java面试场景篇:分布式锁的实现与组件详解
  22. MCP‌和LangGraph‌结合2
  23. 基于Vue3 的 h5监听从左到右手滑返回上一页
  24. 开源模型应用落地-语音合成-MegaTTS3-零样本克隆与多语言生成的突破
  25. 从工作到娱乐:Codigger Desktop 让桌面环境更智能
  26. c#-命名和书写规范
  27. k8s基于角色的访问控制(RBAC)
  28. GPT-4o最新图像生成完全指南:10大应用场景与提示词模板
  29. opencv--图像变换
  30. 悟空统计:小而美的网站流量统计工具,免费好用