当前位置: 首页 > backend >正文

PySpark 常用算子详解

PySpark 提供了丰富的算子用于分布式数据处理,主要分为转换算子(Transformations)和行动算子(Actions)。转换算子用于创建新的 RDD 或 DataFrame,而行动算子触发实际的计算并返回结果。

一、RDD 常用算子

1. 转换算子

转换算子是惰性的,不会立即执行,而是构建计算图。

1.1 基本转换
  • map(func)
    对 RDD 中的每个元素应用函数func

    rdd = sc.parallelize([1, 2, 3])
    squared = rdd.map(lambda x: x**2)  # [1, 4, 9]
    
  • filter(func)
    过滤出满足函数func条件的元素。

    even_nums = rdd.filter(lambda x: x % 2 == 0)  # [2]
    
  • flatMap(func)
    先应用func,再将结果展平。

    words = sc.parallelize(["Hello world", "Spark is fast"])
    flat_words = words.flatMap(lambda x: x.split())  # ["Hello", "world", "Spark", "is", "fast"]
    
1.2 集合操作
  • union(other)
    合并两个 RDD。

    rdd1 = sc.parallelize([1, 2])
    rdd2 = sc.parallelize([3, 4])
    union_rdd = rdd1.union(rdd2)  # [1, 2, 3, 4]
    
  • intersection(other)
    返回两个 RDD 的交集。

    rdd1 = sc.parallelize([1, 2, 3])
    rdd2 = sc.parallelize([3, 4, 5])
    intersect_rdd = rdd1.intersection(rdd2)  # [3]
    
  • distinct()
    去重。

    rdd = sc.parallelize([1, 1, 2, 2, 3])
    distinct_rdd = rdd.distinct()  # [1, 2, 3]
    
1.3 键值对操作
  • groupByKey()
    按键分组。

    pairs = sc.parallelize([("a", 1), ("a", 2), ("b", 3)])
    grouped = pairs.groupByKey()  # [("a", [1, 2]), ("b", [3])]
    
  • reduceByKey(func)
    按键聚合值。

    summed = pairs.reduceByKey(lambda x, y: x + y)  # [("a", 3), ("b", 3)]
    
  • join(other)
    键值对 RDD 的内连接。

    rdd1 = sc.parallelize([("a", 1), ("b", 2)])
    rdd2 = sc.parallelize([("a", 3), ("a", 4)])
    joined = rdd1.join(rdd2)  # [("a", (1, 3)), ("a", (1, 4))]
    
2. 行动算子

行动算子触发计算并返回结果或写入外部存储。

2.1 基本行动
  • collect()
    将 RDD 的所有元素收集到驱动程序。

    rdd = sc.parallelize([1, 2, 3])
    result = rdd.collect()  # [1, 2, 3]
    
  • count()
    返回 RDD 的元素个数。

    count = rdd.count()  # 3
    
  • take(n)
    返回 RDD 的前 n 个元素。

    first_two = rdd.take(2)  # [1, 2]
    
  • reduce(func)
    使用函数func聚合 RDD 元素。

    total = rdd.reduce(lambda x, y: x + y)  # 6
    
2.2 保存操作
  • saveAsTextFile(path)
    将 RDD 保存为文本文件。
    rdd.saveAsTextFile("hdfs://path/to/output")
    

二、DataFrame 常用算子

1. 转换算子

DataFrame 的转换算子基于关系代数,支持 SQL 风格操作。

1.1 选择与过滤
  • select(cols)
    选择列。

    df.select("name", "age").show()
    
  • filter(condition)
    过滤行。

    df.filter(df["age"] > 20).show()
    
  • where(condition)
    等价于filter

    df.where("age > 20").show()
    
1.2 聚合操作
  • groupBy(cols)
    按列分组。

    df.groupBy("department").avg("salary").show()
    
  • agg(expressions)
    自定义聚合。

    from pyspark.sql.functions import sum, avg
    df.agg(sum("sales"), avg("age")).show()
    
1.3 连接操作
  • join(other, on, how)
    连接两个 DataFrame。
    df1.join(df2, on="id", how="inner").show()
    
1.4 排序与去重
  • sort(cols)
    排序。

    df.sort("age", ascending=False).show()
    
  • dropDuplicates(subset)
    去重。

    df.dropDuplicates(["name", "age"]).show()
    
2. 行动算子
  • show(n)
    显示前 n 行。

    df.show(5)
    
  • count()
    统计行数。

    rows = df.count()
    
  • collect()
    收集所有行到驱动程序。

    data = df.collect()
    
  • toPandas()
    转换为 Pandas DataFrame。

    pandas_df = df.toPandas()
    

三、SQL 函数

PySpark 提供了丰富的 SQL 函数,用于复杂的数据处理。

1. 数学函数
  • sum()avg()max()min()count()
  • round()sqrt()log()exp()
2. 字符串函数
  • concat()substring()lower()upper()trim()
  • split()regexp_replace()
3. 日期时间函数
  • current_date()current_timestamp()
  • date_format()year()month()dayofmonth()
4. 条件函数
  • when()otherwise()
  • ifnull()coalesce()

示例:

from pyspark.sql.functions import when, coldf.withColumn("age_group", when(col("age") < 18, "minor").when(col("age") < 60, "adult").otherwise("senior"))

四、窗口函数

窗口函数允许在特定行组上执行计算,无需分组。

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, row_numberwindow_spec = Window.partitionBy("department").orderBy("salary")# 添加排名列
df.withColumn("rank", rank().over(window_spec)).show()

五、算子执行顺序

PySpark 采用惰性计算

  1. 转换算子构建 DAG(有向无环图)
  2. 行动算子触发 DAG 的执行
  3. 中间结果可能被缓存(使用cache()persist()

示例:

rdd = sc.parallelize([1, 2, 3, 4])
rdd = rdd.map(lambda x: x**2).filter(lambda x: x > 5)  # 转换
result = rdd.collect()  # 行动,触发计算

六、性能优化建议

  1. 避免使用collect():仅用于小数据结果,大数据会导致驱动程序内存溢出
  2. 使用广播变量:将小表广播到所有 Executor,减少 Shuffle
  3. 合理分区:通过repartition()coalesce()调整分区数
  4. 缓存重用数据:对需要多次使用的 RDD 或 DataFrame 使用cache()
  5. 优先使用 DataFrame:比 RDD 更高效(基于 Catalyst 优化器)

通过掌握这些算子,你可以高效地处理和分析大规模数据集。在实际应用中,建议结合具体业务场景选择合适的算子,并注意性能调优。

http://www.xdnf.cn/news/15492.html

相关文章:

  • kotlin的自学笔记1
  • King’s LIMS:实验室数字化转型的智能高效之选
  • 19.如何将 Python 字符串转换为 Slug
  • 极致cms多语言建站|设置主站默认语言与设置后台固定语言为中文
  • 手机当路由,连接机器人和电脑
  • Postman + Newman + Jenkins 接口自动化测试
  • 说下对mysql MVCC的理解
  • DNS的含义以及例子
  • 传输协议和消息队列
  • Claude 背后金主亚马逊亲自下场,重磅发布 AI 编程工具 Kiro 现已开启免费试用
  • 面向医疗AI场景的H20显卡算力组网方案
  • 正则表达式使用示例
  • C++20 协程参考手册详解 - 源自 cppreference.com
  • 暑假Python基础整理 --异常处理及程序调试
  • 从 0 到 1 掌握 自研企业级分布式 ID 发号器
  • 《C++模板高阶机制解析:非类型参数、特化设计与分离编译实践》
  • 【GEOS-Chem模拟教程第一期上】气溶胶专用/碳气体/全化学模拟
  • x86版的ubuntu上使用qemu运行arm版ubuntu
  • 学习软件测试的第十六天
  • HOOPS Communicator 2025.5.0版本更新速览:性能、测量与UI全面优化
  • 将 Vue 3 + Vite + TS 项目打包为 .exe 文件
  • Kubernetes 架构原理与集群环境部署
  • Mybatis05-动态sql
  • Java实现word、pdf转html保留格式
  • HTTP性能优化实战技术
  • 【电脑】显卡(GPU)的基础知识
  • 暑期算法训练.1
  • 【解决】联想电脑亮度调节
  • 行为模式-状态模式
  • 前端打包自动压缩为zip--archiver