Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务
目录
- Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务
- 1. 什么是 CPU 密集型任务?
- 2. ProcessPoolExecutor 的基本用法
- 3. 封装成通用函数
- 4. 使用示例
- 示例一:大规模运算
- 示例二:图像处理
- 5. ThreadPool vs ProcessPool 对比
- 6. 注意事项
- 7. 总结
Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务
在上一篇文章里,我们介绍了如何优雅封装 ThreadPoolExecutor
来处理 I/O 密集型任务(比如文件读取、网络请求、日志解析)。
但是,当任务本身计算量很大(例如大规模数值运算、复杂模拟)时,线程池往往帮不上忙,甚至可能变慢。原因在于:Python 的 GIL(全局解释器锁)限制了同一进程内的多线程并行计算。
这时,就需要请出 ProcessPoolExecutor
—— 一个真正能发挥多核 CPU 性能的利器。
1. 什么是 CPU 密集型任务?
在计算机科学里,常将任务分为两类:
-
I/O 密集型 (I/O-bound)
等待外部资源的时间远多于计算时间。
典型场景:爬虫、数据库操作、文件读写。 -
CPU 密集型 (CPU-bound)
主要消耗 CPU 的算力,计算时间远大于 I/O 时间。
典型场景:- 大规模矩阵运算(线性代数、深度学习前向传播)
- 图像处理(滤波、卷积)
- CFD 数值模拟、风电场流体计算
- 加密解密、压缩解压
对于 CPU 密集型任务,线程池的多线程无法真正并行,因为受 GIL 限制。同一时间只会有一个线程运行 Python 字节码。
解决办法:使用 多进程 —— 每个进程都有自己的 Python 解释器和 GIL,可以真正利用多核 CPU 并行执行。
2. ProcessPoolExecutor 的基本用法
from concurrent.futures import ProcessPoolExecutor
import mathdef cpu_task(n):"""模拟 CPU 密集型任务:计算大量平方根"""return sum(math.sqrt(i) for i in range(n))if __name__ == "__main__":numbers = [10_00000, 20_00000, 30_00000, 40_00000]with ProcessPoolExecutor(max_workers=4) as executor:results = list(executor.map(cpu_task, numbers))print(results)
运行效果:
- 串行执行:可能需要 10+ 秒。
- 多进程执行:4 个核同时计算,总耗时大约缩短到 3 秒左右。
3. 封装成通用函数
与之前的 run_in_threads
类似,我们可以写一个 run_in_processes
:
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Callable, Iterable, Anydef run_in_processes(func: Callable[..., Any],tasks: Iterable[tuple],max_workers: int = 4,show_log: bool = True
) -> list:"""使用多进程池并行执行 CPU 密集型任务。"""results = [None] * len(tasks)if show_log:print(f"⚡ 开始多进程计算(进程数 = {max_workers},任务数 = {len(tasks)})...")with ProcessPoolExecutor(max_workers=max_workers) as executor:future_to_idx = {executor.submit(func, *args): idxfor idx, args in enumerate(tasks)}for future in as_completed(future_to_idx):idx = future_to_idx[future]try:results[idx] = future.result()except Exception as e:results[idx] = f"❌ 任务 {idx} 出错 → {e}"return results
4. 使用示例
示例一:大规模运算
import mathdef heavy_calc(n):return sum(math.sqrt(i) for i in range(n))tasks = [(50_00000,), (60_00000,), (70_00000,), (80_00000,)]
results = run_in_processes(heavy_calc, tasks, max_workers=4)
print(results)
示例二:图像处理
from PIL import Image, ImageFilterdef blur_image(path):img = Image.open(path)img = img.filter(ImageFilter.GaussianBlur(5))return f"{path} 完成"tasks = [(f"img_{i}.jpg",) for i in range(10)]
results = run_in_processes(blur_image, tasks, max_workers=4)
print(results)
5. ThreadPool vs ProcessPool 对比
特性 | ThreadPoolExecutor | ProcessPoolExecutor |
---|---|---|
适用场景 | I/O 密集型(网络请求、磁盘读写) | CPU 密集型(数值计算、图像处理) |
受 GIL 限制 | ✅ 是 | ❌ 否 |
启动开销 | 小 | 大(进程启动慢,内存消耗多) |
共享内存 | 方便(同一进程内) | 不方便(进程间需要序列化传输) |
速度优势 | 等待多的任务(I/O-bound) | 计算多的任务(CPU-bound) |
👉 简单总结:
- I/O-bound → ThreadPoolExecutor
- CPU-bound → ProcessPoolExecutor
6. 注意事项
- 必须放在
if __name__ == "__main__":
下
否则在 Windows/Mac 上会出现无限递归创建进程的问题。 - 数据传输开销大
多进程间要通过序列化传输数据,避免传递超大对象(如几 GB 的数组)。
建议用 共享内存(multiprocessing.Array / numpy.memmap) 解决。 - 进程数不宜过多
一般等于 CPU 核心数或核心数 - 1
。
7. 总结
- CPU 密集型任务:计算量大,主要消耗 CPU 时间。例子:矩阵运算、CFD 模拟、图像处理。
- 适合工具:
ProcessPoolExecutor
,真正利用多核 CPU 并行,性能大幅提升。 - 与线程池区别:线程适合等待多、计算少的场景;进程适合计算多、等待少的场景。
📌 推荐实践:
- 任务多但轻 → 用 线程池。
- 任务重且算力需求大 → 用 进程池。
- 混合任务 → 可以考虑 线程池 + 进程池混合,甚至引入
asyncio
。
下一篇 “线程池 + 进程池混合应用实战”(例如风电 CFD 模拟里:多进程算流场 + 多线程读写文件)