当前位置: 首页 > backend >正文

pyspark中的kafka的读和写案例操作

下面将详细讲解 PySpark 中操作 Kafka 进行数据读写的案例,包括必要的配置、代码实现和关键参数说明。

PySpark 与 Kafka 集成基础

PySpark 通过 Spark Streaming 或 Structured Streaming 与 Kafka 集成,需要引入特定的依赖包。通常使用spark-sql-kafka-0-10_2.12包,版本需要与 Spark 版本匹配。

读取 Kafka 数据(消费消息)

从 Kafka 读取数据可以分为批处理和流处理两种方式:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType# 初始化SparkSession
spark = SparkSession.builder \.appName("KafkaReaderExample") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.getOrCreate()# 1. 流处理方式读取Kafka数据
def stream_read_kafka():# 配置Kafka连接参数kafka_df = spark.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "test_topic")  # 订阅的主题,可以是多个用逗号分隔.option("startingOffsets", "earliest")  # 从最早的偏移量开始消费.load()# Kafka返回的数据包含多个字段,我们主要关注value字段(实际消息内容)# 将二进制的value转换为字符串kafka_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 如果消息是JSON格式,可以进一步解析schema = StructType() \.add("id", IntegerType()) \.add("name", StringType()) \.add("age", IntegerType())parsed_df = kafka_df.select(from_json(col("value"), schema).alias("data")).select("data.*")# 输出到控制台(调试用)query = parsed_df.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()# 2. 批处理方式读取Kafka数据
def batch_read_kafka():kafka_df = spark.read \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "test_topic") \.option("startingOffsets", """{"test_topic":{"0":0}}""")  # 指定分区和偏移量.option("endingOffsets", """{"test_topic":{"0":100}}""") \.load()# 转换为字符串并展示result_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")result_df.show(truncate=False)if __name__ == "__main__":# 选择运行流处理或批处理# stream_read_kafka()batch_read_kafka()

写入 Kafka 数据(生产消息)

同样,写入 Kafka 也支持流处理和批处理两种方式:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_json, struct# 初始化SparkSession
spark = SparkSession.builder \.appName("KafkaWriterExample") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.getOrCreate()# 1. 流处理方式写入Kafka
def stream_write_kafka():# 创建测试数据data = [("1", "Alice", 25), ("2", "Bob", 30), ("3", "Charlie", 35)]df = spark.createDataFrame(data, ["id", "name", "age"])# 转换为Kafka所需的格式(必须包含key和value字段)kafka_df = df.select(col("id").alias("key"),  # key字段to_json(struct("id", "name", "age")).alias("value")  # value字段转为JSON)# 写入Kafkaquery = kafka_df.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "test_topic") \.option("checkpointLocation", "/tmp/kafka_checkpoint")  # 流处理必须设置检查点.start()query.awaitTermination()# 2. 批处理方式写入Kafka
def batch_write_kafka():# 创建测试数据data = [("4", "David", 40), ("5", "Eve", 45)]df = spark.createDataFrame(data, ["id", "name", "age"])# 转换为Kafka所需格式kafka_df = df.select(col("id").cast("string").alias("key"),to_json(struct("id", "name", "age")).alias("value"))# 写入Kafkakafka_df.write \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "test_topic") \.save()if __name__ == "__main__":# 选择运行流处理或批处理# stream_write_kafka()batch_write_kafka()

关键参数说明

  1. 连接参数

    • kafka.bootstrap.servers:Kafka 集群的地址列表,格式为host:port
    • subscribe:要订阅的主题,多个主题用逗号分隔
    • topic:写入时指定的目标主题
  2. 偏移量设置

    • startingOffsets:读取的起始偏移量,earliest(最早)或latest(最新)
    • endingOffsets:批处理时的结束偏移量
  3. 数据格式

    • Kafka 中的数据以二进制形式存储,需要转换为字符串:CAST(key AS STRING)CAST(value AS STRING)
    • 写入时需要将数据转换为包含keyvalue字段的 DataFrame
  4. 流处理特殊参数

    • checkpointLocation:必须设置,用于保存流处理的状态信息
    • outputMode:输出模式,常用append(追加)

运行注意事项

  1. 确保 Kafka 服务已启动并正常运行
  2. 主题需要提前创建:kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  3. 依赖包版本需要与 Spark 版本匹配,例如 Spark 3.3.0 对应spark-sql-kafka-0-10_2.12:3.3.0
  4. 流处理程序需要手动停止,可通过query.stop()或 Ctrl+C 终止

通过以上示例,你可以实现 PySpark 与 Kafka 之间的数据交互,根据实际需求选择批处理或流处理方式。

http://www.xdnf.cn/news/16989.html

相关文章:

  • 面向对象编程基础:类的实例化与对象内存模型详解
  • Oracle 在线重定义
  • 【unitrix】 7.2 二进制位减法(bit_sub.rs)
  • MySQL偏门但基础的面试题集锦
  • MySql的两种安装方式
  • MySQL Router
  • VUE2 学习笔记17 路由
  • 华为OD机考2025C卷 - 最小矩阵宽度(Java Python JS C++ C )
  • 架构师面试(三十九):微服务重构单体应用
  • 【C++】语法基础篇
  • Javascript面试题及详细答案150道(046-060)
  • Flask全栈入门:打造区块链艺术品交易所
  • 风光储并网协同运行simulink仿真模型实现
  • transformer与神经网络
  • Azure DevOps — Kubernetes 上的自托管代理 — 第 5 部分
  • 进程间通信:管道与共享内存
  • Antlr学习笔记 02、使用antlr4实现简易版计算器
  • 【无标题】标准 I/O 中的一些函数,按功能分类说明其用法和特点
  • 【LeetCode刷题集】--排序(一)
  • clocking_cb驱动之坑
  • BackgroundTasks 如何巧妙驾驭多任务并发?
  • 测试-概念篇(3)
  • <PhotoShop><JavaScript><脚本>基于JavaScript,利用脚本实现PS软件批量替换图片,并转换为智能对象?
  • Linux 逻辑卷管理
  • 深入理解Spring中的循环依赖及解决方案
  • ssh连接VirtualBox中的Ubuntu24.04(win11、putty、NAT 模式)
  • 模型蒸馏(Distillation):原理、算法、应用
  • 每日任务day0804:小小勇者成长记之药剂师的小咪
  • 深入剖析Java Stream API性能优化实践指南
  • AgxOrin平台JetPack5.x版本fix multi-cam race condition 补丁