当前位置: 首页 > ai >正文

Python并行处理实战:使用ProcessPoolExecutor加速计算

Python并行处理实战:使用ProcessPoolExecutor加速计算

    • 简介
    • 完整代码示例
    • 代码解释
      • 1. 导入必要的模块
      • 2. 定义处理函数
      • 3. 主函数
      • 4. 生成数字列表
      • 5. 确定最佳工作进程数量
      • 6. 将数字分成块
      • 7. 并行处理
      • 8. 计算耗时
    • 并行处理的基本概念和优势
    • 如何运行和测试这个示例
    • 总结

简介

在现代计算中,并行处理是提高程序性能的重要手段。Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecutor是一个非常强大且易于使用的工具。本文将通过一个实际示例,展示如何使用ProcessPoolExecutor进行并行处理,并详细解释代码的工作原理。

完整代码示例

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Listdef process_numbers(chunk: List[int], factor: int) -> str:"""处理数字的函数,通过将它们乘以因子来模拟处理。这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,并返回结果字符串。"""result = sum(x * factor for x in chunk)time.sleep(0.1)  # 使用睡眠模拟工作return f"处理的块和: {result}"def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):"""演示并行处理的主函数。这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用ProcessPoolExecutor进行并行处理。"""import logginglogging.basicConfig(level=logging.INFO)_log = logging.getLogger(__name__)# 如果没有提供数字,则生成示例列表if numbers is None:numbers = list(range(1, 101))  # 生成1到100的数字total_numbers = len(numbers)_log.info(f"开始并行处理 {total_numbers} 个数字")cpu_count = multiprocessing.cpu_count()_log.info(f"检测到 {cpu_count} 个CPU核心")# 确定最佳工作进程数量optimal_workers = min(cpu_count, num_chunks)_log.info(f"使用 {optimal_workers} 个工作进程")# 计算块大小chunk_size = max(1, total_numbers // optimal_workers)_log.info(f"每个块包含 {chunk_size} 个数字")# 将数字分成块chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]_log.info(f"总共生成了 {len(chunks)} 个块")start_time = time.time()processed_count = 0# 使用ProcessPoolExecutor进行并行处理with ProcessPoolExecutor(max_workers=optimal_workers) as executor:_log.info("启动ProcessPoolExecutor")# 提交所有任务futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]_log.info(f"提交了 {len(futures)} 个任务")# 等待完成并收集结果for future in as_completed(futures):try:result = future.result()processed_count += 1_log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")except Exception as e:_log.error(f"处理块时出错: {str(e)}")raiseelapsed_time = time.time() - start_time_log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")if __name__ == "__main__":# 使用数字列表的示例main()

代码解释

1. 导入必要的模块

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List

这些模块提供了我们需要的并行处理功能和类型提示。

2. 定义处理函数

def process_numbers(chunk: List[int], factor: int) -> str:"""处理数字的函数,通过将它们乘以因子来模拟处理。这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,并返回结果字符串。"""result = sum(x * factor for x in chunk)time.sleep(0.1)  # 使用睡眠模拟工作return f"处理的块和: {result}"

这个函数模拟了对数字列表的处理,通过将每个数字乘以一个因子并求和。time.sleep(0.1)用于模拟实际工作。

3. 主函数

def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):"""演示并行处理的主函数。这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用ProcessPoolExecutor进行并行处理。"""import logginglogging.basicConfig(level=logging.INFO)_log = logging.getLogger(__name__)

主函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用ProcessPoolExecutor进行并行处理。

4. 生成数字列表

    # 如果没有提供数字,则生成示例列表if numbers is None:numbers = list(range(1, 101))  # 生成1到100的数字

如果没有提供数字列表,则生成1到100的数字列表。

5. 确定最佳工作进程数量

    cpu_count = multiprocessing.cpu_count()_log.info(f"检测到 {cpu_count} 个CPU核心")# 确定最佳工作进程数量optimal_workers = min(cpu_count, num_chunks)_log.info(f"使用 {optimal_workers} 个工作进程")

根据CPU核心数和用户指定的块数,确定最佳工作进程数量。

6. 将数字分成块

    # 计算块大小chunk_size = max(1, total_numbers // optimal_workers)_log.info(f"每个块包含 {chunk_size} 个数字")# 将数字分成块chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]_log.info(f"总共生成了 {len(chunks)} 个块")

将数字列表分成多个块,每个块的大小根据总数和工作进程数量计算。

7. 并行处理

    start_time = time.time()processed_count = 0# 使用ProcessPoolExecutor进行并行处理with ProcessPoolExecutor(max_workers=optimal_workers) as executor:_log.info("启动ProcessPoolExecutor")# 提交所有任务futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]_log.info(f"提交了 {len(futures)} 个任务")# 等待完成并收集结果for future in as_completed(futures):try:result = future.result()processed_count += 1_log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")except Exception as e:_log.error(f"处理块时出错: {str(e)}")raise

使用ProcessPoolExecutor进行并行处理,提交所有任务并等待完成。

8. 计算耗时

    elapsed_time = time.time() - start_time_log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")

计算并行处理的总耗时并输出。

并行处理的基本概念和优势

并行处理是指同时执行多个任务,以提高程序的执行效率。Python的concurrent.futures模块提供了一个高级接口,用于并行执行任务。ProcessPoolExecutor是其中一个重要的类,它使用多进程来并行执行任务。

并行处理的优势包括:

  1. 提高程序的执行效率
  2. 充分利用多核CPU的计算能力
  3. 简化多线程或多进程编程的复杂性

如何运行和测试这个示例

  1. 将上述代码保存为parallel_processing_example.py文件。
  2. 确保你的Python环境中安装了必要的模块(本示例不需要额外安装模块)。
  3. 在终端或命令行中运行以下命令:
python parallel_processing_example.py

你将看到程序的执行过程和并行处理的结果。

总结

通过这个示例,我们展示了如何使用Python的ProcessPoolExecutor进行并行处理。并行处理是提高程序性能的重要手段,特别是在处理大量数据或计算密集型任务时。希望这个示例能帮助你更好地理解并行处理的概念和实现。

http://www.xdnf.cn/news/10425.html

相关文章:

  • Redis分布式锁深度解析与最佳实践
  • 源码解析(二):nnUNet
  • 解释程序(Python)不需要生成机器码 逐行解析 逐行执行
  • 模型训练相关的问题
  • 个人用户进行LLMs本地部署前如何自查和筛选
  • 14.Wifi模组(ESP8266)
  • LeetCode 热题 100 208. 实现 Trie (前缀树)
  • 724.寻找数组的中心下标前缀和
  • 网页前端开发(基础进阶2)
  • 多线程( Thread)
  • Python训练打卡Day39
  • 电子电路:时钟脉冲与上升沿的详细解析
  • CppCon 2014 学习:ASYNCHRONOUS COMPUTING IN C++
  • ssm 学习笔记day03
  • OVD开放词汇检测 Detic 训练COCO数据集实践
  • 28 C 语言作用域详解:作用域特性(全局、局部、块级)、应用场景、注意事项
  • 【Java学习笔记】枚举
  • 怎么更改cursor chat中的字体大小
  • XCPC 常用技巧
  • Beta分布Dirichlet分布
  • [Python] Python中的多重继承
  • 飞牛fnNAS装机之迷你小主机的利旧
  • SolidWorks软件的安装与卸载
  • 12 Java GUI
  • Word双栏英文论文排版攻略
  • 【解决】【亲测下载obsidian可行】打不开github.com 或者 加速访问 github
  • Pull Request Integration 拉取请求集成
  • Python实现HPSO-TVAC优化算法优化支持向量机SVC分类模型项目实战
  • QT/c++航空返修数据智能分析系统
  • 重读《人件》Peopleware -(15)Ⅱ 办公环境 Ⅷ 撑伞之步:构建理想办公环境(上)