使用pyspark对上百亿行的hive表生成稀疏向量
背景:一张上百亿行的hive表,只有id和app两列,其中app的去重量是8w多个(原app有上百万枚举值,此处已经用id数量进行过筛选,只留下有一定规模的app),id的去重量大概有八九亿,最终希望生成pid和对应app的稀疏向量。
我们使用pyspark来实现:
# 处理app特征,生成id,app和app对应的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorUDT # 新增导入
from pyspark.ml.functions import vector_to_array # 新增关键导入
import sys
import os# 配置环境变量,否则报错python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()# 1. 从Hive读取数据
df = spark.sql("SELECT id,app FROM database.table")# 2. & 3. 定义特征转换管道(Pipeline)
# 步骤1:将app字符串转换为数值索引
indexer = StringIndexer(inputCol="app", outputCol="app_index")
# 步骤2:将索引进行One-Hot编码,输出为稀疏向量
encoder = OneHotEncoder(inputCol="app_index", outputCol="app_ohe_vector")# 将两个步骤组合成一个管道
pipeline = Pipeline(stages=[indexer, encoder])# 拟合数据并转换
model = pipeline.fit(df)
result = model.transform(df)# 查看结果(可选)
# result.select("id", "app", "app_ohe_vector").show(truncate=False)# 4. 将结果保存回HDFS(例如Parquet格式)
# result.select("id", "app_ohe_vector").write \
# .mode("overwrite") \
# .parquet("/path/to/your/output/onehot_result.parquet")# 需要跑6小时,表非常大 338亿数据
result.createOrReplaceTempView("temp_view")
spark.sql("CREATE TABLE database.app_vec AS SELECT * FROM temp_view")# 停止SparkSession
spark.stop()
此方案的优点:
•高效: Spark是专为大规模数据处理设计的,性能远超Hive UDF。
•节省空间: 输出是稀疏向量,8万个类别中每个用户只有少量app,向量中大部分是0,稀疏表示非常紧凑。
•标准化: 这是ML领域处理类别特征的标准流程,与后续的Spark MLlib机器学习库无缝集成。
此方案生成的结果数据示例如下:
id | app | app_index | app_ohe_vector |
---|---|---|---|
1001 | 微信 | 0 |
|
1001 | 王者荣耀 | 79999 |
|
1002 | 淘宝 | 1 |
|
在hive表中,app_ohe_vector的格式为row("type" tinyint, "size" integer, "indices" array(integer), "values" array(double))。
app_ohe_vector的结构是Spark ML的标准格式:
0
: 向量类型(0=稀疏向量,1=密集向量)80000
: 向量总长度(即app总数)[0, 1, 2, 3, ...]
: 非零元素的索引位置[1.0, 1.0, 1.0, ...]
: 对应索引位置的值
接下来我们对id进行聚合,同样使用pyspark来实现:
# 处理app特征,按id聚合app对应的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import expr, col, collect_list, udf, first, size,struct
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StructType, StructField
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors,VectorUDT # 新增导入
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array # 新增关键导入
from pyspark import StorageLevel
import json
import sys
import os# 配置环境变量,否则报错python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.sql.shuffle.partitions", "5000") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()for i in ['0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f']:# 1. 从Hive读取数据print(i)sql_idapp = '''select id,app_ohe_vector from database.app_vec_par where flag = '{fflag}\''''.format(fflag=i)df = spark.sql(sql_idapp)# 打印数据概览total_count = df.count()print(f"数据总量: {total_count:,}") # 高效UDAF聚合函数(针对单元素向量优化)def merge_sparse_vectors(vectors):"""高效合并稀疏向量,针对单元素向量优化"""if not vectors:return {"type": 0, "size": 0, "indices": [], "values": []}# 获取向量尺寸(假设所有向量尺寸相同)size_val = vectors[0]["size"]# 使用字典高效聚合value_dict = {}for vec in vectors:# 直接访问第一个(也是唯一一个)索引和值idx = vec["indices"][0]val = vec["values"][0]# 使用get方法避免两次字典查找value_dict[idx] = value_dict.get(idx, 0.0) + val# 提取并排序索引sorted_indices = sorted(value_dict.keys())sorted_values = [value_dict[i] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}# 注册UDAFmerge_sparse_vectors_udf = udf(merge_sparse_vectors,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 数据预处理:过滤无效记录并重新分区print("开始数据预处理...")cleaned_df = df.filter((col("app_ohe_vector").isNotNull()) & (size(col("app_ohe_vector.indices")) > 0)).repartition(5000, "id") # 增加分区数处理数据倾斜# 释放原始DF内存df.unpersist()# 两阶段聚合策略(处理数据倾斜)print("开始第一阶段聚合(按id和索引分组)...")# 步骤1: 提取每个向量的索引和值expanded_df = cleaned_df.select("id",col("app_ohe_vector.indices")[0].alias("index"),col("app_ohe_vector.values")[0].alias("value"),col("app_ohe_vector.size").alias("size"))# 步骤2: 按(id, index)分组求和intermediate_df = expanded_df.groupBy("id", "index").agg(expr("sum(value)").alias("sum_value"),first("size").alias("size"))# 步骤3: 按id分组,收集所有(index, sum_value)对print("开始第二阶段聚合(按id分组)...")grouped_df = intermediate_df.groupBy("id").agg(collect_list(struct("index", "sum_value")).alias("index_value_pairs"),first("size").alias("size"))# 步骤4: 转换为稀疏向量格式def pairs_to_sparse_vector(pairs, size_val):"""将(index, value)对列表转换为稀疏向量"""if not pairs:return {"type": 0, "size": size_val, "indices": [], "values": []}# 提取索引和值indices = [p["index"] for p in pairs]values = [p["sum_value"] for p in pairs]# 排序(如果需要)sorted_indices = sorted(indices)sorted_values = [values[indices.index(i)] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}pairs_to_sparse_vector_udf = udf(pairs_to_sparse_vector,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 生成最终结果result = grouped_df.withColumn("merged_vector",pairs_to_sparse_vector_udf("index_value_pairs", "size")).select("id", "merged_vector")print("开始第三阶段数据插入...")# 创建临时视图result.createOrReplaceTempView("sparse_matrix_result")res_sql='''INSERT into TABLE database.app_vecagg_res PARTITION(flag='{fflag}')SELECT id,merged_vector from sparse_matrix_result'''.format(fflag=i)spark.sql(res_sql)print("数据插入完成")# 停止SparkSession
spark.stop()
此处因为原表有300亿+数据,集群性能有限无法一次性处理,所以我将id进行了分区,然后循环分区进行的聚合。
聚合后的结果数据示例如下:
id | merged_vector |
---|---|
1001 |
|
merged_vector的结构是Spark ML的标准格式:
0
: 向量类型(0=稀疏向量,1=密集向量)80000
: 向量总长度(即app总数)[0, 1, 2, 3, ...]
: 非零元素的索引位置[1.0, 1.0, 1.0, ...]
: 对应索引位置的值
Spark ML的算法设计时就已经考虑了这种向量格式,所有内置算法都能正确处理这种结构:
- 算法兼容性:Spark ML的所有分类、回归、聚类算法都接受这种格式的向量
- 性能优化:稀疏向量格式在内存使用和计算效率上都有优化
- 内置支持:Spark ML的
VectorAssembler
、特征变换器等都能处理这种格式
至此我们就可以将此向量作为特征用于后续的建模操作了。