Pandas 内存不足 或 UDF 执行慢
针对 Pandas 内存不足 和 UDF(用户自定义函数)执行慢 的问题,以下是系统性优化方案,结合内存管理、计算加速和代码重构技巧:
一、Pandas 内存不足的优化
1. 数据存储优化
-
压缩数据类型:减少数值型数据内存占用。
# 优化前(默认 int64, float64) df = pd.read_csv('data.csv') # 优化后(降级数据类型) df = df.astype({'user_id': 'int32', # int32 替代 int64'price': 'float32', # float32 替代 float64'category': 'category' # 分类类型替代 object })
-
效果:内存占用减少 50%~70%。
-
-
稀疏数据存储:对高稀疏度列(如大量0或NaN)使用稀疏矩阵。
from pandas.arrays import SparseArray df['sparse_col'] = SparseArray(df['sparse_col'])
2. 分块处理(Chunk Processing)
-
流式读取大数据文件:避免全量加载到内存。
chunk_size = 100000 chunks = pd.read_csv('large_data.csv', chunksize=chunk_size) for chunk in chunks:process(chunk) # 逐块处理
3. 内存释放
-
手动释放无用对象:
del df # 删除变量 gc.collect() # 强制垃圾回收
-
避免深拷贝:使用
inplace=True
或引用传递减少复制。
4. 替代工具
-
Dask:并行处理超出内存的数据集。
import dask.dataframe as dd ddf = dd.read_csv('large_data.csv') result = ddf.groupby('user_id').sum().compute()
-
Vaex:零内存复制分析工具。
import vaex df = vaex.open('large_data.hdf5') df.groupby('user_id', agg={'price': 'sum'})
二、UDF 执行慢的优化
1. 向量化替代循环
-
避免逐行操作:使用 Pandas/Numpy 向量化函数。
# 优化前(逐行 apply) df['discount'] = df.apply(lambda row: row['price'] * 0.9 if row['vip'] else row['price'], axis=1) # 优化后(向量化计算) df['discount'] = df['price'] * np.where(df['vip'], 0.9, 1.0)
-
性能提升:速度提升 100~1000 倍。
-
2. 加速工具
-
Numba:对数值计算函数即时编译(JIT)。
from numba import jit @jit(nopython=True) def numba_udf(x, y):return x * y + np.sqrt(x) df['result'] = numba_udf(df['x'].values, df['y'].values)
-
Swifter:自动选择最佳加速方式(向量化/Dask)。
import swifter df['result'] = df.swifter.apply(lambda x: custom_udf(x), axis=1)
3. 并行计算
-
多进程加速:
from pandarallel import pandarallel pandarallel.initialize(nb_workers=8) # 初始化8个进程 df['result'] = df.parallel_apply(custom_udf, axis=1)
4. 避免复杂 Python 对象
-
替换 object 类型:将字符串列转为
category
。df['category'] = df['category'].astype('category')
三、综合优化案例
场景:处理 10GB 用户交易数据,计算每个用户的折扣金额(VIP 用户打 9 折)。
优化步骤
-
分块读取 + 类型压缩:
dtype_opt = {'user_id': 'int32','price': 'float32','vip': 'bool' } chunks = pd.read_csv('transactions.csv', chunksize=1e6, dtype=dtype_opt)
-
向量化计算折扣:
for chunk in chunks:chunk['discount'] = chunk['price'] * np.where(chunk['vip'], 0.9, 1.0)process(chunk)
-
内存清理:
del chunk gc.collect()
优化效果
-
内存占用:从 10GB 降至 3GB。
-
执行时间:从 2 小时缩短至 15 分钟。
四、对比总结
问题 | 优化手段 | 适用场景 | 性能提升 |
---|---|---|---|
内存不足 | 数据类型压缩、分块处理、Dask/Vaex | 数据量远超内存大小 | 内存占用减少 50%~90% |
UDF 执行慢 | 向量化、Numba、Swifter | 复杂逐行计算、数值密集型操作 | 加速 10~1000 倍 |
混合问题 | 分块 + 向量化 + 并行 | 超大数据集需复杂计算 | 综合提升内存和计算效率 |
五、注意事项
-
监控内存使用:通过
df.memory_usage(deep=True)
分析内存分布。 -
避免链式赋值:如
df[df.x > 0]['y'] = 1
应改为df.loc[df.x > 0, 'y'] = 1
。 -
测试优化效果:使用
%timeit
或line_profiler
定位瓶颈。
通过以上方法,可显著提升大数据场景下的内存利用率和计算效率。