当前位置: 首页 > backend >正文

使用pyspark对上百亿行的hive表生成稀疏向量

背景:一张上百亿行的hive表,只有id和app两列,其中app的去重量是8w多个(原app有上百万枚举值,此处已经用id数量进行过筛选,只留下有一定规模的app),id的去重量大概有八九亿,最终希望生成pid和对应app的稀疏向量。

我们使用pyspark来实现:

# 处理app特征,生成id,app和app对应的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorUDT  # 新增导入
from pyspark.ml.functions import vector_to_array  # 新增关键导入
import sys
import os# 配置环境变量,否则报错python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()# 1. 从Hive读取数据
df = spark.sql("SELECT id,app FROM database.table")# 2. & 3. 定义特征转换管道(Pipeline)
# 步骤1:将app字符串转换为数值索引
indexer = StringIndexer(inputCol="app", outputCol="app_index")
# 步骤2:将索引进行One-Hot编码,输出为稀疏向量
encoder = OneHotEncoder(inputCol="app_index", outputCol="app_ohe_vector")# 将两个步骤组合成一个管道
pipeline = Pipeline(stages=[indexer, encoder])# 拟合数据并转换
model = pipeline.fit(df)
result = model.transform(df)# 查看结果(可选)
# result.select("id", "app", "app_ohe_vector").show(truncate=False)# 4. 将结果保存回HDFS(例如Parquet格式)
# result.select("id", "app_ohe_vector").write \
#     .mode("overwrite") \
#     .parquet("/path/to/your/output/onehot_result.parquet")# 需要跑6小时,表非常大 338亿数据
result.createOrReplaceTempView("temp_view")
spark.sql("CREATE TABLE database.app_vec AS SELECT * FROM temp_view")# 停止SparkSession
spark.stop()

此方案的优点:

高效:​​ Spark是专为大规模数据处理设计的,性能远超Hive UDF。

节省空间:​​ 输出是稀疏向量,8万个类别中每个用户只有少量app,向量中大部分是0,稀疏表示非常紧凑。

标准化:​​ 这是ML领域处理类别特征的标准流程,与后续的Spark MLlib机器学习库无缝集成。

此方案生成的结果数据示例如下:

id

app

app_index

app_ohe_vector

1001

微信

0

(0,80000, [0], [1.0])

1001

王者荣耀

79999

(0,80000, [79999], [1.0])

1002

淘宝

1

(0,80000, [1], [1.0])

在hive表中,app_ohe_vector的格式为row("type" tinyint, "size" integer, "indices" array(integer), "values" array(double))。

app_ohe_vector的结构是Spark ML的标准格式:

  • 0: 向量类型(0=稀疏向量,1=密集向量)
  • 80000: 向量总长度(即app总数)
  • [0, 1, 2, 3, ...]: 非零元素的索引位置
  • [1.0, 1.0, 1.0, ...]: 对应索引位置的值

接下来我们对id进行聚合,同样使用pyspark来实现:

# 处理app特征,按id聚合app对应的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import expr, col, collect_list, udf, first, size,struct
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StructType, StructField
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors,VectorUDT  # 新增导入
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array  # 新增关键导入
from pyspark import StorageLevel
import json
import sys
import os# 配置环境变量,否则报错python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.sql.shuffle.partitions", "5000") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()for i in ['0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f']:# 1. 从Hive读取数据print(i)sql_idapp = '''select id,app_ohe_vector from database.app_vec_par where flag = '{fflag}\''''.format(fflag=i)df = spark.sql(sql_idapp)# 打印数据概览total_count = df.count()print(f"数据总量: {total_count:,}")    # 高效UDAF聚合函数(针对单元素向量优化)def merge_sparse_vectors(vectors):"""高效合并稀疏向量,针对单元素向量优化"""if not vectors:return {"type": 0, "size": 0, "indices": [], "values": []}# 获取向量尺寸(假设所有向量尺寸相同)size_val = vectors[0]["size"]# 使用字典高效聚合value_dict = {}for vec in vectors:# 直接访问第一个(也是唯一一个)索引和值idx = vec["indices"][0]val = vec["values"][0]# 使用get方法避免两次字典查找value_dict[idx] = value_dict.get(idx, 0.0) + val# 提取并排序索引sorted_indices = sorted(value_dict.keys())sorted_values = [value_dict[i] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}# 注册UDAFmerge_sparse_vectors_udf = udf(merge_sparse_vectors,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 数据预处理:过滤无效记录并重新分区print("开始数据预处理...")cleaned_df = df.filter((col("app_ohe_vector").isNotNull()) & (size(col("app_ohe_vector.indices")) > 0)).repartition(5000, "id")  # 增加分区数处理数据倾斜# 释放原始DF内存df.unpersist()# 两阶段聚合策略(处理数据倾斜)print("开始第一阶段聚合(按id和索引分组)...")# 步骤1: 提取每个向量的索引和值expanded_df = cleaned_df.select("id",col("app_ohe_vector.indices")[0].alias("index"),col("app_ohe_vector.values")[0].alias("value"),col("app_ohe_vector.size").alias("size"))# 步骤2: 按(id, index)分组求和intermediate_df = expanded_df.groupBy("id", "index").agg(expr("sum(value)").alias("sum_value"),first("size").alias("size"))# 步骤3: 按id分组,收集所有(index, sum_value)对print("开始第二阶段聚合(按id分组)...")grouped_df = intermediate_df.groupBy("id").agg(collect_list(struct("index", "sum_value")).alias("index_value_pairs"),first("size").alias("size"))# 步骤4: 转换为稀疏向量格式def pairs_to_sparse_vector(pairs, size_val):"""将(index, value)对列表转换为稀疏向量"""if not pairs:return {"type": 0, "size": size_val, "indices": [], "values": []}# 提取索引和值indices = [p["index"] for p in pairs]values = [p["sum_value"] for p in pairs]# 排序(如果需要)sorted_indices = sorted(indices)sorted_values = [values[indices.index(i)] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}pairs_to_sparse_vector_udf = udf(pairs_to_sparse_vector,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 生成最终结果result = grouped_df.withColumn("merged_vector",pairs_to_sparse_vector_udf("index_value_pairs", "size")).select("id", "merged_vector")print("开始第三阶段数据插入...")# 创建临时视图result.createOrReplaceTempView("sparse_matrix_result")res_sql='''INSERT into TABLE database.app_vecagg_res PARTITION(flag='{fflag}')SELECT id,merged_vector from sparse_matrix_result'''.format(fflag=i)spark.sql(res_sql)print("数据插入完成")# 停止SparkSession
spark.stop()

此处因为原表有300亿+数据,集群性能有限无法一次性处理,所以我将id进行了分区,然后循环分区进行的聚合。

聚合后的结果数据示例如下:

id

merged_vector

1001

(0,80000, [0,79999], [1.0,1.0])

merged_vector的结构是Spark ML的标准格式:

  • 0: 向量类型(0=稀疏向量,1=密集向量)
  • 80000: 向量总长度(即app总数)
  • [0, 1, 2, 3, ...]: 非零元素的索引位置
  • [1.0, 1.0, 1.0, ...]: 对应索引位置的值

Spark ML的算法设计时就已经考虑了这种向量格式,所有内置算法都能正确处理这种结构:

  1. 算法兼容性​:Spark ML的所有分类、回归、聚类算法都接受这种格式的向量
  2. 性能优化​:稀疏向量格式在内存使用和计算效率上都有优化
  3. 内置支持​:Spark ML的VectorAssembler、特征变换器等都能处理这种格式

至此我们就可以将此向量作为特征用于后续的建模操作了。

http://www.xdnf.cn/news/19762.html

相关文章:

  • 2025年COR IOTJ SCI2区,灾后通信无人机基站位置优化和移动充电无人机路径规划,深度解析+性能实测
  • Android aoap开发常见问题之package_allowed_list.txt导致的编译报错
  • 深度学习------模型的保存和使用
  • 深度学习篇---Adam优化器
  • Docker Pull 代理配置方法
  • 【正则表达式】 正则表达式有哪些语法?
  • Low-Light Image Enhancement via Structure Modeling and Guidance 论文阅读
  • AP5414:高效灵活的LED驱动解决方案,点亮创意生活
  • go大厂真实的面试经历与总结
  • 心路历程-初识Linux用户
  • EasyExcel 基础用法
  • 如何在FastAPI中巧妙隔离依赖项,让单元测试不再头疼?
  • 一文吃透 `protoc` 安装与落地
  • 【Spring Cloud微服务】10.王子、巨龙与Spring Cloud:用注解重塑微服务王国
  • 普通人也能走的自由之路
  • 科技赋能田园:数字化解决方案开启智慧农业新篇章
  • 告别 Hadoop,拥抱 StarRocks!政采云数据平台升级之路
  • 【Maniskill】StackCube-v1 官方命令训练结果不稳定的研究报告
  • Android Looper源码阅读
  • 大数据毕业设计选题推荐-基于大数据的电商物流数据分析与可视化系统-Spark-Hadoop-Bigdata
  • SkyWalking 支持的告警通知方式(Alarm Hooks)类型
  • MySQL常见报错分析及解决方案总结(9)---出现interactive_timeout/wait_timeout
  • 51单片机----LED与数码管模块
  • 计算机网络:(十七)应用层(上)应用层基本概念
  • 如何创建交换空间
  • Elasticsearch(高性能分布式搜索引擎)01
  • Day20_【机器学习—逻辑回归 (2)—分类评估方法】
  • 硬件基础与c51基础
  • 深入剖析Spring Boot中Spring MVC的请求处理流程
  • Linux(2)|入门的开始:Linux基本指令(2)