当前位置: 首页 > java >正文

Python 多进程编程全面学习指南

文章目录

  • Python 多进程编程全面学习指南
    • 一、多进程基础概念
      • 1.1 进程与线程的区别
      • 1.2 多进程优势
      • 1.3 Python多进程模块
    • 二、进程创建与管理
      • 2.1 创建进程的两种方式
        • 方式1:函数式创建
        • 方式2:类继承式创建
      • 2.2 进程常用方法与属性
    • 三、进程间通信(IPC)
      • 3.1 队列(Queue)
      • 3.2 管道(Pipe)
      • 3.3 共享内存
        • 共享值(Value)
        • 共享数组(Array)
      • 3.4 管理器(Manager)
    • 四、进程同步机制
      • 4.1 锁(Lock)
      • 4.2 信号量(Semaphore)
      • 4.3 事件(Event)
    • 五、进程池高级用法
      • 5.1 基本进程池
      • 5.2 使用ProcessPoolExecutor
      • 5.3 回调函数与错误处理
    • 六、高级多进程模式
      • 6.1 生产者-消费者模式
      • 6.2 MapReduce模式
      • 6.3 分布式进程
    • 七、多进程最佳实践
      • 7.1 性能优化技巧
      • 7.2 常见问题解决方案
      • 7.3 调试与监控
    • 八、应用场景与选择指南
      • 8.1 适合多进程的场景
      • 8.2 多进程 vs 多线程选择矩阵
      • 8.3 学习资源推荐

Python 多进程编程全面学习指南

一、多进程基础概念

1.1 进程与线程的区别

特性进程线程
资源分配独立内存空间共享进程内存
创建开销
通信方式IPC(管道、队列等)共享变量
上下文切换开销大开销小
安全性高(相互隔离)低(需要同步)
GIL影响
适用场景CPU密集型任务I/O密集型任务

1.2 多进程优势

  • 绕过GIL限制:充分利用多核CPU
  • 内存隔离:避免数据竞争
  • 高容错性:单个进程崩溃不影响其他
  • 资源控制:可分配不同CPU核心

1.3 Python多进程模块

import multiprocessing
import os
import time
from concurrent.futures import ProcessPoolExecutor

multiprocessingconcurrent.futures 都是 Python 中用于实现并发的标准库模块,它们之间的关系可以概括为:

  1. 基础与抽象

    • multiprocessing 提供底层的多进程编程接口
    • concurrent.futures 是建立在 multiprocessing 之上的高级抽象层
  2. 设计哲学

    • multiprocessing:提供全面的进程控制能力
    • concurrent.futures:提供统一的异步执行接口
  3. 关键联系
    concurrent.futures.ProcessPoolExecutor 内部使用 multiprocessing 模块实现

特性multiprocessingconcurrent.futures
核心组件Process, Pool, Queue 等Executor, Future 对象
编程范式面向过程/面向对象基于 Future 的异步编程
使用复杂度中等(需要手动管理)简单(自动管理资源)
执行模型直接控制进程通过执行器(Executor)管理
错误处理需要手动捕获异常Future 对象封装异常
结果获取通过返回值或共享对象通过 Future.result()
进度跟踪需自定义实现内置 as_completed() 方法
跨线程/进程统一接口有(ThreadPoolExecutor/ProcessPoolExecutor)

ProcessPoolExecutor 内部实现机制

# 伪代码展示 ProcessPoolExecutor 如何基于 multiprocessing 实现
class ProcessPoolExecutor:def __init__(self, max_workers=None):# 使用 multiprocessing 的 Queue 进行进程间通信self._call_queue = multiprocessing.Queue()self._result_queue = multiprocessing.Queue()# 使用 multiprocessing.Process 创建工作者进程self._processes = set()for _ in range(max_workers):p = multiprocessing.Process(target=_worker,args=(self._call_queue, self._result_queue))p.start()self._processes.add(p)def submit(self, fn, *args, **kwargs):# 将任务放入队列self._call_queue.put((fn, args, kwargs))return Future()def shutdown(self):# 清理 multiprocessing 进程for p in self._processes:p.terminate()

建议:

1、优先选择 concurrent.futures

  • 需要简单清晰的并行代码
  • 需要统一的线程/进程接口
  • 需要高级功能(回调、完成通知等)
  • 任务相对独立,无需复杂IPC

2、使用 multiprocessing

  • 需要精细控制进程
  • 需要共享内存或复杂IPC
  • 需要自定义进程同步
  • 使用复杂进程拓扑结构

二、进程创建与管理

2.1 创建进程的两种方式

方式1:函数式创建
def worker(name, delay):print(f"Process {name} (PID: {os.getpid()}) started")time.sleep(delay)print(f"Process {name} completed")return f"{name}-result"if __name__ == "__main__":# 创建进程p1 = multiprocessing.Process(target=worker, args=("Alpha", 2))p2 = multiprocessing.Process(target=worker, args=("Beta", 1))# 启动进程p1.start()p2.start()# 等待进程结束p1.join()p2.join()print("All processes completed")
方式2:类继承式创建
class MyProcess(multiprocessing.Process):def __init__(self, name, delay):super().__init__()self.name = nameself.delay = delaydef run(self):print(f"Process {self.name} (PID: {os.getpid()}) started")time.sleep(self.delay)print(f"Process {self.name} completed")if __name__ == "__main__":processes = [MyProcess("Alpha", 2),MyProcess("Beta", 1),MyProcess("Gamma", 3)]for p in processes:p.start()for p in processes:p.join()print("All custom processes finished")

2.2 进程常用方法与属性

方法/属性描述示例
start()启动进程p.start()
run()进程执行的主体方法(可重写)自定义进程类时覆盖
join(timeout)等待进程终止p.join()
is_alive()检查进程是否正在运行if p.is_alive(): ...
name获取/设置进程名称p.name = "Worker-1"
pid进程标识符(整数)print(p.pid)
daemon守护进程标志(主进程结束时自动终止)p.daemon = True
exitcode进程退出代码(None表示仍在运行)print(p.exitcode)
terminate()强制终止进程p.terminate()
kill()类似terminate但使用SIGKILL信号p.kill()
close()关闭进程对象释放资源p.close()

三、进程间通信(IPC)

3.1 队列(Queue)

def producer(q, items):for item in items:print(f"Producing: {item}")q.put(item)time.sleep(0.1)q.put(None)  # 结束信号def consumer(q):while True:item = q.get()if item is None:breakprint(f"Consuming: {item}")time.sleep(0.2)if __name__ == "__main__":queue = multiprocessing.Queue()producer_process = multiprocessing.Process(target=producer, args=(queue, ["A", "B", "C", "D", "E"]))consumer_process = multiprocessing.Process(target=consumer, args=(queue,))producer_process.start()consumer_process.start()producer_process.join()consumer_process.join()

3.2 管道(Pipe)

def sender(conn, messages):for message in messages:print(f"Sending: {message}")conn.send(message)time.sleep(0.1)conn.send("END")conn.close()def receiver(conn):while True:message = conn.recv()if message == "END":breakprint(f"Received: {message}")time.sleep(0.15)if __name__ == "__main__":parent_conn, child_conn = multiprocessing.Pipe()p1 = multiprocessing.Process(target=sender, args=(parent_conn, ["Msg1", "Msg2", "Msg3"]))p2 = multiprocessing.Process(target=receiver, args=(child_conn,))p1.start()p2.start()p1.join()p2.join()

3.3 共享内存

共享值(Value)
def increment(counter):for _ in range(10000):with counter.get_lock():counter.value += 1if __name__ == "__main__":counter = multiprocessing.Value('i', 0)processes = []for _ in range(5):p = multiprocessing.Process(target=increment, args=(counter,))processes.append(p)p.start()for p in processes:p.join()print(f"Final counter value: {counter.value}")
共享数组(Array)
def square(arr, index):arr[index] = arr[index] ** 2if __name__ == "__main__":# 创建共享数组shared_array = multiprocessing.Array('i', [1, 2, 3, 4, 5])print("Original array:", list(shared_array))processes = []for i in range(len(shared_array)):p = multiprocessing.Process(target=square, args=(shared_array, i))processes.append(p)p.start()for p in processes:p.join()print("Squared array:", list(shared_array))

3.4 管理器(Manager)

def worker(d, l, index):d[index] = f"Value-{index}"l.append(index ** 2)if __name__ == "__main__":with multiprocessing.Manager() as manager:# 创建共享字典和列表shared_dict = manager.dict()shared_list = manager.list()processes = []for i in range(5):p = multiprocessing.Process(target=worker, args=(shared_dict, shared_list, i))processes.append(p)p.start()for p in processes:p.join()print("Shared Dictionary:", dict(shared_dict))print("Shared List:", list(shared_list))

四、进程同步机制

4.1 锁(Lock)

def printer(lock, msg):with lock:print(f"[{os.getpid()}] {msg}")time.sleep(0.1)if __name__ == "__main__":lock = multiprocessing.Lock()messages = ["Hello", "from", "multiple", "processes"]processes = []for msg in messages:p = multiprocessing.Process(target=printer, args=(lock, msg))processes.append(p)p.start()for p in processes:p.join()

4.2 信号量(Semaphore)

def access_resource(sem, worker_id):with sem:print(f"Worker {worker_id} acquired resource")time.sleep(1)print(f"Worker {worker_id} releasing resource")if __name__ == "__main__":sem = multiprocessing.Semaphore(2)  # 允许2个进程同时访问workers = 5processes = []for i in range(workers):p = multiprocessing.Process(target=access_resource, args=(sem, i))processes.append(p)p.start()for p in processes:p.join()

4.3 事件(Event)

def waiter(event, worker_id):print(f"Worker {worker_id} waiting for event")event.wait()print(f"Worker {worker_id} processing after event")def setter(event):print("Setter sleeping before setting event")time.sleep(2)print("Setter setting event")event.set()if __name__ == "__main__":event = multiprocessing.Event()# 创建等待进程waiters = [multiprocessing.Process(target=waiter, args=(event, i))for i in range(3)]setter_proc = multiprocessing.Process(target=setter, args=(event,))for p in waiters:p.start()setter_proc.start()for p in waiters:p.join()setter_proc.join()

五、进程池高级用法

5.1 基本进程池

def cpu_intensive_task(n):print(f"Processing {n} in PID {os.getpid()}")return sum(i * i for i in range(n))if __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:# 同步执行result = pool.apply(cpu_intensive_task, (1000000,))print(f"Apply result: {result}")# 异步执行async_result = pool.apply_async(cpu_intensive_task, (2000000,))print(f"Async result: {async_result.get()}")# Map操作results = pool.map(cpu_intensive_task, [100000, 200000, 300000])print(f"Map results: {results}")# Map异步操作async_results = pool.map_async(cpu_intensive_task, [400000, 500000])print(f"Async map results: {async_results.get()}")

5.2 使用ProcessPoolExecutor

def task(n):print(f"Processing {n} in PID {os.getpid()}")return n * nif __name__ == "__main__":with ProcessPoolExecutor(max_workers=4) as executor:# 提交单个任务future1 = executor.submit(task, 5)print(f"Task result: {future1.result()}")# 批量提交futures = [executor.submit(task, i) for i in range(10)]# 按完成顺序处理结果for future in as_completed(futures):print(f"Completed: {future.result()}")# 使用map获取有序结果results = executor.map(task, range(5, 15))print("Ordered results:", list(results))

5.3 回调函数与错误处理

def success_callback(result):print(f"Task succeeded with result: {result}")def error_callback(exc):print(f"Task failed with exception: {exc}")def risky_task(n):if n % 3 == 0:raise ValueError(f"Bad number {n}")return n ** 0.5if __name__ == "__main__":with ProcessPoolExecutor(max_workers=3) as executor:futures = []for i in range(10):future = executor.submit(risky_task, i)future.add_done_callback(lambda f: success_callback(f.result()) if not f.exception() else error_callback(f.exception()))futures.append(future)# 等待所有任务完成for future in futures:try:future.result()except Exception as e:pass  # 错误已在回调中处理

六、高级多进程模式

6.1 生产者-消费者模式

def producer(queue, count):for i in range(count):item = f"Item-{i}"queue.put(item)print(f"Produced {item}")time.sleep(0.1)queue.put(None)  # 结束信号def consumer(queue, worker_id):while True:item = queue.get()if item is None:queue.put(None)  # 传递结束信号breakprint(f"Worker {worker_id} consumed {item}")time.sleep(0.2)if __name__ == "__main__":queue = multiprocessing.Queue(maxsize=5)# 创建生产者producer_proc = multiprocessing.Process(target=producer, args=(queue, 10))# 创建消费者consumers = [multiprocessing.Process(target=consumer, args=(queue, i))for i in range(3)]producer_proc.start()for c in consumers:c.start()producer_proc.join()for c in consumers:c.join()

6.2 MapReduce模式

def mapper(item):"""映射函数:返回(单词, 1)元组"""time.sleep(0.01)  # 模拟处理时间return [(word.lower(), 1) for word in item.split()]def reducer(key, values):"""归约函数:计算单词出现次数"""return (key, sum(values))if __name__ == "__main__":# 示例文本数据texts = ["Python is an interpreted high-level programming language","Python is created by Guido van Rossum","Python is widely used in data science","Python supports multiple programming paradigms"]with ProcessPoolExecutor() as executor:# Map阶段:并行处理所有文本map_results = executor.map(mapper, texts)# 收集所有映射结果all_items = []for result in map_results:all_items.extend(result)# 按单词分组grouped = {}for key, value in all_items:if key not in grouped:grouped[key] = []grouped[key].append(value)# Reduce阶段:并行处理每个单词reduce_futures = []for key, values in grouped.items():future = executor.submit(reducer, key, values)reduce_futures.append(future)# 获取最终结果word_counts = {}for future in reduce_futures:key, count = future.result()word_counts[key] = count# 打印词频统计print("Word Count Results:")for word, count in sorted(word_counts.items(), key=lambda x: x[1], reverse=True):print(f"{word}: {count}")

6.3 分布式进程

# manager_server.py
import multiprocessing.managersclass SharedManager(multiprocessing.managers.BaseManager):passdef run_server():shared_dict = {}shared_list = []SharedManager.register('get_dict', callable=lambda: shared_dict)SharedManager.register('get_list', callable=lambda: shared_list)manager = SharedManager(address=('', 50000), authkey=b'secret')server = manager.get_server()print("Server started...")server.serve_forever()if __name__ == "__main__":run_server()# client_worker.py
import multiprocessing.managersclass SharedManager(multiprocessing.managers.BaseManager):passdef worker():SharedManager.register('get_dict')SharedManager.register('get_list')manager = SharedManager(address=('localhost', 50000), authkey=b'secret')manager.connect()shared_dict = manager.get_dict()shared_list = manager.get_list()shared_dict['worker'] = os.getpid()shared_list.append(os.getpid())print(f"Worker {os.getpid()} updated shared data")if __name__ == "__main__":processes = [multiprocessing.Process(target=worker) for _ in range(3)]for p in processes:p.start()for p in processes:p.join()print("All workers completed")

七、多进程最佳实践

7.1 性能优化技巧

  1. 避免过度进程创建:使用进程池复用进程

  2. 减少IPC开销

    • 批量传输数据
    • 使用共享内存代替队列
    • 避免传递大型对象
  3. 负载均衡:根据任务类型分配进程

  4. CPU亲和性:绑定进程到特定核心

    import psutildef set_cpu_affinity(pid, cores):process = psutil.Process(pid)process.cpu_affinity(cores)
    

7.2 常见问题解决方案

  1. 死锁预防
    • 避免嵌套获取多个资源
    • 设置超时时间 lock.acquire(timeout=5)
  2. 僵尸进程处理
    • 使用join()等待子进程结束
    • 设置daemon=True自动清理
  3. 资源泄漏
    • 使用上下文管理器(with语句)
    • finally块中释放资源

7.3 调试与监控

def monitor_processes():while True:print("\n=== Active Processes ===")for proc in multiprocessing.active_children():print(f"{proc.name} (PID: {proc.pid}, Alive: {proc.is_alive()}")time.sleep(2)if __name__ == "__main__":# 启动工作进程processes = [multiprocessing.Process(target=time.sleep, args=(i,), name=f"Sleep-{i}")for i in range(3, 0, -1)]for p in processes:p.start()# 启动监控进程monitor = multiprocessing.Process(target=monitor_processes, daemon=True)monitor.start()# 等待工作进程完成for p in processes:p.join()print("All processes completed")

八、应用场景与选择指南

8.1 适合多进程的场景

  1. CPU密集型任务

    • 数学计算(矩阵运算、数值模拟)
    • 图像/视频处理
    • 数据压缩/加密
  2. 独立数据处理

    • 批量文件转换
    • 并行数据转换
    • 大规模数据处理
  3. 高可靠性需求

    • 关键任务处理
    • 服务隔离
    • 容错系统

8.2 多进程 vs 多线程选择矩阵

场景推荐方案理由
CPU密集型 + 数据独立多进程充分利用多核
I/O密集型 + 资源共享多线程切换开销小,共享内存方便
CPU密集型 + 数据共享多进程+IPC绕过GIL,隔离内存
混合型任务混合进程处理计算,线程处理I/O

8.3 学习资源推荐

  1. 官方文档

    • multiprocessing — Process-based parallelism
    • concurrent.futures — Launching parallel tasks
  2. 进阶书籍

    • 《Python并行编程手册》
    • 《Effective Python》第7章
  3. 实践项目

    • 多进程图像处理器
    • 并行数据管道
    • 分布式计算节点
  4. 调试工具

    • multiprocessing.log_to_stderr()
    • psutil库监控进程
    • py-spy性能分析工具
http://www.xdnf.cn/news/11908.html

相关文章:

  • Unity 大型手游碰撞性能优化指南
  • Axure高保真LayUI框架 V2.6.8元件库
  • [蓝桥杯]卡片换位
  • Modbus转EtherNET IP网关开启节能改造新范式
  • 细说C语言将格式化输出到字符串的函数sprintf、_sprintf_l、swprintf、_swprintf_l、__swprintf_l
  • IEC 61347-1:2015 灯控制装置安全标准详解
  • [Java 基础]创建人类这个类小练习
  • Python应用函数的定义与调用(一)
  • AI制药专利战:生命权VS专利权,谁在定价你的生命?
  • React Native开发鸿蒙运动健康类应用的项目实践记录
  • C++--vector的使用及其模拟实现
  • PaddleOCR v3.0.0 编译FAQ
  • itop-3568开发板机器视觉opencv开发手册-图像绘制-画线
  • UE接口通信
  • 代码随想录|动态规划|50编辑距离
  • Linux:理解库制作与原理
  • 《IDEA 高效开发:自定义类/方法注释模板详解》
  • 机器学习14-迁移学习
  • 【Linux】Linux权限
  • 在 Windows 系统下配置 VSCode + CMake + Ninja 进行 C++ 或 Qt 开发
  • docker常见命令行用法
  • WebFuture:启动数据库提示: error while loading shared libraries: libaio.so.1问题处理
  • PaddleOCR(2):PaddleOCR环境搭建
  • 跨域请求解决方案全解析
  • NFT 市场开发:基于 Ethereum 和 IPFS 构建去中心化平台
  • Open SSL 3.0相关知识以及源码流程分析
  • 【定时器】定时器存在的内存泄露问题
  • [蓝桥杯]最大比例
  • springboot ErrorController getErrorPath() 版本变迁
  • Java设计模式:责任链模式