当前位置: 首页 > web >正文

Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务

目录

  • Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务
    • 1. 什么是 CPU 密集型任务?
    • 2. ProcessPoolExecutor 的基本用法
    • 3. 封装成通用函数
    • 4. 使用示例
      • 示例一:大规模运算
      • 示例二:图像处理
    • 5. ThreadPool vs ProcessPool 对比
    • 6. 注意事项
    • 7. 总结

Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务

在上一篇文章里,我们介绍了如何优雅封装 ThreadPoolExecutor 来处理 I/O 密集型任务(比如文件读取、网络请求、日志解析)。

但是,当任务本身计算量很大(例如大规模数值运算、复杂模拟)时,线程池往往帮不上忙,甚至可能变慢。原因在于:Python 的 GIL(全局解释器锁)限制了同一进程内的多线程并行计算

这时,就需要请出 ProcessPoolExecutor —— 一个真正能发挥多核 CPU 性能的利器。


1. 什么是 CPU 密集型任务?

在计算机科学里,常将任务分为两类:

  • I/O 密集型 (I/O-bound)
    等待外部资源的时间远多于计算时间。
    典型场景:爬虫、数据库操作、文件读写。

  • CPU 密集型 (CPU-bound)
    主要消耗 CPU 的算力,计算时间远大于 I/O 时间。
    典型场景:

    • 大规模矩阵运算(线性代数、深度学习前向传播)
    • 图像处理(滤波、卷积)
    • CFD 数值模拟、风电场流体计算
    • 加密解密、压缩解压

对于 CPU 密集型任务,线程池的多线程无法真正并行,因为受 GIL 限制。同一时间只会有一个线程运行 Python 字节码。

解决办法:使用 多进程 —— 每个进程都有自己的 Python 解释器和 GIL,可以真正利用多核 CPU 并行执行。


2. ProcessPoolExecutor 的基本用法

from concurrent.futures import ProcessPoolExecutor
import mathdef cpu_task(n):"""模拟 CPU 密集型任务:计算大量平方根"""return sum(math.sqrt(i) for i in range(n))if __name__ == "__main__":numbers = [10_00000, 20_00000, 30_00000, 40_00000]with ProcessPoolExecutor(max_workers=4) as executor:results = list(executor.map(cpu_task, numbers))print(results)

运行效果:

  • 串行执行:可能需要 10+ 秒。
  • 多进程执行:4 个核同时计算,总耗时大约缩短到 3 秒左右。

3. 封装成通用函数

与之前的 run_in_threads 类似,我们可以写一个 run_in_processes

from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Callable, Iterable, Anydef run_in_processes(func: Callable[..., Any],tasks: Iterable[tuple],max_workers: int = 4,show_log: bool = True
) -> list:"""使用多进程池并行执行 CPU 密集型任务。"""results = [None] * len(tasks)if show_log:print(f"⚡ 开始多进程计算(进程数 = {max_workers},任务数 = {len(tasks)})...")with ProcessPoolExecutor(max_workers=max_workers) as executor:future_to_idx = {executor.submit(func, *args): idxfor idx, args in enumerate(tasks)}for future in as_completed(future_to_idx):idx = future_to_idx[future]try:results[idx] = future.result()except Exception as e:results[idx] = f"❌ 任务 {idx} 出错 → {e}"return results

4. 使用示例

示例一:大规模运算

import mathdef heavy_calc(n):return sum(math.sqrt(i) for i in range(n))tasks = [(50_00000,), (60_00000,), (70_00000,), (80_00000,)]
results = run_in_processes(heavy_calc, tasks, max_workers=4)
print(results)

示例二:图像处理

from PIL import Image, ImageFilterdef blur_image(path):img = Image.open(path)img = img.filter(ImageFilter.GaussianBlur(5))return f"{path} 完成"tasks = [(f"img_{i}.jpg",) for i in range(10)]
results = run_in_processes(blur_image, tasks, max_workers=4)
print(results)

5. ThreadPool vs ProcessPool 对比

特性ThreadPoolExecutorProcessPoolExecutor
适用场景I/O 密集型(网络请求、磁盘读写)CPU 密集型(数值计算、图像处理)
受 GIL 限制✅ 是❌ 否
启动开销大(进程启动慢,内存消耗多)
共享内存方便(同一进程内)不方便(进程间需要序列化传输)
速度优势等待多的任务(I/O-bound)计算多的任务(CPU-bound)

👉 简单总结:

  • I/O-bound → ThreadPoolExecutor
  • CPU-bound → ProcessPoolExecutor

6. 注意事项

  1. 必须放在 if __name__ == "__main__":
    否则在 Windows/Mac 上会出现无限递归创建进程的问题。
  2. 数据传输开销大
    多进程间要通过序列化传输数据,避免传递超大对象(如几 GB 的数组)。
    建议用 共享内存(multiprocessing.Array / numpy.memmap) 解决。
  3. 进程数不宜过多
    一般等于 CPU 核心数或 核心数 - 1

7. 总结

  • CPU 密集型任务:计算量大,主要消耗 CPU 时间。例子:矩阵运算、CFD 模拟、图像处理。
  • 适合工具ProcessPoolExecutor,真正利用多核 CPU 并行,性能大幅提升。
  • 与线程池区别:线程适合等待多、计算少的场景;进程适合计算多、等待少的场景。

📌 推荐实践:

  • 任务多但轻 → 用 线程池
  • 任务重且算力需求大 → 用 进程池
  • 混合任务 → 可以考虑 线程池 + 进程池混合,甚至引入 asyncio

下一篇 “线程池 + 进程池混合应用实战”(例如风电 CFD 模拟里:多进程算流场 + 多线程读写文件)

http://www.xdnf.cn/news/18984.html

相关文章:

  • 从“找不到”到“秒上手”:金仓文档系统重构记
  • 《电商库存系统超卖事故的技术复盘与数据防护体系重构》
  • 推荐系统王树森(四)特征交叉+行为序列
  • java基础(十六)操作系统(上)
  • 基于单片机光照强度检测(光敏电阻)系统Proteus仿真(含全部资料)
  • 【Qt开发】常用控件(七)-> styleSheet
  • 深度学习(鱼书)day12--卷积神经网络(后四节)
  • Java项目-苍穹外卖_Day3-Day4
  • 深度解析Structured Outputs:基于JSON Schema的结构化输出实践与最佳方案
  • 8月26日
  • 开发避坑指南(37):Vue3 标签页实现攻略
  • iPhone 17 Pro 全新配色确定,首款折叠屏 iPhone 将配备 Touch ID 及四颗镜头
  • 二、JVM 入门 —— (四)堆以及 GC
  • MATLAB中函数的详细使用
  • Slice-100K:推动AI驱动的CAD与3D打印创新的多模态数据集
  • 『专利好药用力心脑血管健康』——爱上古中医(28)(健康生活是coder抒写优质代码的前提条件——《黄帝内经》伴读学习纪要)
  • Hadoop MapReduce 任务/输入数据 分片 InputSplit 解析
  • VS中创建Linux项目
  • VGVLP思路探索和讨论
  • STL库——vector(类函数学习)
  • 算法编程实例-快乐学习
  • Git:基本使用
  • 校园勤工俭学微信小程序的设计与实现:基于数字化服务生态的赋能体系构建
  • 10分钟快速搭建 SkyWalking 服务
  • 机器学习笔记
  • 【C语言】小游戏:关机程序
  • 【Linux 进程】进程程序替换
  • RAG中使用到的相关函数注释——LangChain核心函数
  • AI出题人给出的Java后端面经(二十仨)(不定更)
  • 【AI论文】FutureX:面向未来预测任务中大语言模型智能体的前沿动态基准测试