Python 使用期物处理并发(使用concurrent.futures模块下载)
使用concurrent.futures模块下载
concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和
ProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程
或进程中执行可调用的对象。这两个类在内部维护着一个工作线程或进
程池,以及要执行的任务队列。不过,这个接口抽象的层级很高,像下
载国旗这种简单的案例,无需关心任何实现细节。
示例 17-3 展示如何使用 ThreadPoolExecutor.map 方法,以最简单的
方式实现并发下载。
示例 17-3 flags_threadpool.py:使用
futures.ThreadPoolExecutor 类实现多线程下载的脚本
from concurrent import futures
from flags import save_flag, get_flag, show, main ➊
MAX_WORKERS = 20 ➋
def download_one(cc): ➌image = get_flag(cc)show(cc)save_flag(image, cc.lower() + '.gif')return cc
def download_many(cc_list):workers = min(MAX_WORKERS, len(cc_list)) ➍with futures.ThreadPoolExecutor(workers) as executor: ➎res = executor.map(download_one, sorted(cc_list)) ➏return len(list(res)) ➐
if __name__ == '__main__':main(download_many) ➑
❶ 重用 flags 模块(见示例 17-2)中的几个函数。
❷ 设定 ThreadPoolExecutor 类最多使用几个线程。
❸ 下载一个图像的函数;这是在各个线程中执行的函数。
❹ 设定工作的线程数量:使用允许的最大值(MAX_WORKERS)与要处
理的数量之间较小的那个值,以免创建多余的线程。
❺ 使用工作的线程数实例化 ThreadPoolExecutor
类;executor.__exit__
方法会调用
executor.shutdown(wait=True) 方法,它会在所有线程都执行完毕
前阻塞线程。
❻ map 方法的作用与内置的 map 函数类似,不过 download_one 函数
会在多个线程中并发调用;map 方法返回一个生成器,因此可以迭代,
获取各个函数返回的值。
❼ 返回获取的结果数量;如果有线程抛出异常,异常会在这里抛出,
这与隐式调用 next() 函数从迭代器中获取相应的返回值一样。
❽ 调用 flags 模块中的 main 函数,传入 download_many 函数的增强
版。
注意,示例 17-3 中的 download_one 函数其实是示例 17-2 中
download_many 函数的 for 循环体。编写并发代码时经常这样重构:
把依序执行的 for 循环体改成函数,以便并发调用。
我们用的库叫 concurrency.futures,可是在示例 17-3 中没有见到期
物,因此你可能想知道期物在哪里。下一节会解答这个问题。
期物在哪里
期物是 concurrent.futures 模块和 asyncio 包的重要组件,可是,
作为这两个库的用户,我们有时却见不到期物。示例 17-3 在背后用到
了期物,但是我编写的代码没有直接使用。这一节概述期物,还会举一
个例子,展示用法。
从 Python 3.4 起,标准库中有两个名为 Future 的类:concurrent.futures.Future 和 asyncio.Future。这两个类的
作用相同:两个 Future 类的实例都表示可能已经完成或者尚未完成的
延迟计算。这与 Twisted 引擎中的 Deferred 类、Tornado 框架中的
Future 类,以及多个 JavaScript 库中的 Promise 对象类似。
期物封装待完成的操作,可以放入队列,完成的状态可以查询,得到结
果(或抛出异常)后可以获取结果(或异常)。
我们要记住一件事:通常情况下自己不应该创建期物,而只能由并发框
架(concurrent.futures 或 asyncio)实例化。原因很简单:期物
表示终将发生的事情,而确定某件事会发生的唯一方式是执行的时间已
经排定。因此,只有排定把某件事交给
concurrent.futures.Executor 子类处理时,才会创建
concurrent.futures.Future 实例。例如,Executor.submit() 方
法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象
排期,并返回一个期物。
客户端代码不应该改变期物的状态,并发框架在期物表示的延迟计算结
束后会改变期物的状态,而我们无法控制计算何时结束。
这两种期物都有 .done() 方法,这个方法不阻塞,返回值是布尔值,
指明期物链接的可调用对象是否已经执行。客户端代码通常不会询问期
物是否运行结束,而是会等待通知。因此,两个 Future 类都有
.add_done_callback() 方法:这个方法只有一个参数,类型是可调
用的对象,期物运行结束后会调用指定的可调用对象。
此外,还有 .result() 方法。在期物运行结束后调用的话,这个方法
在两个 Future 类中的作用相同:返回可调用对象的结果,或者重新抛
出执行可调用的对象时抛出的异常。可是,如果期物没有运行结
束,result 方法在两个 Future 类中的行为相差很大。对
concurrency.futures.Future 实例来说,调用 f.result() 方法会
阻塞调用方所在的线程,直到有结果可返回。此时,result 方法可以
接收可选的 timeout 参数,如果在指定的时间内期物没有运行完毕,
会抛出 TimeoutError 异常。读到 18.1.1 节你会发
现,asyncio.Future.result 方法不支持设定超时时间,在那个库中
获取期物的结果最好使用 yield from 结构。不过,对
concurrency.futures.Future 实例不能这么做。
为了从实用的角度理解期物,我们可以使用
concurrent.futures.as_completed 函数
(https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed
重写示例 17-3。这个函数的参数是一个期物列表,返回值是一个迭代
器,在期物运行结束后产出期物。
为了使用 futures.as_completed 函数,只需修改 download_many 函
数,把较抽象的 executor.map 调用换成两个 for 循环:一个用于创
建并排定期物,另一个用于获取期物的结果。同时,我们会添加几个
print 调用,显示运行结束前后的期物。修改后的 download_many 函
数如示例 17-4,代码行数由 5 变成 17,不过现在我们能一窥神秘的期
物了。其他函数不变,与示例 17-3 中的一样。
示例 17-4 flags_threadpool_ac.py:把download_many 函数中的
executor.map 方法换成 executor.submit 方法和
futures.as_completed 函数
def download_many(cc_list):cc_list = cc_list[:5] ➊with futures.ThreadPoolExecutor(max_workers=3) as executor: ➋to_do = []for cc in sorted(cc_list): ➌future = executor.submit(download_one, cc) ➍to_do.append(future) ➎msg = 'Scheduled for {}: {}'print(msg.format(cc, future)) ➏results = []for future in futures.as_completed(to_do): ➐res = future.result() ➑msg = '{} result: {!r}'print(msg.format(future, res)) ➒results.append(res)return len(results)
❶ 这次演示只使用人口最多的 5 个国家。
❷ 把 max_workers 硬编码为 3,以便在输出中观察待完成的期物。
❸ 按照字母表顺序迭代国家代码,明确表明输出的顺序与输入一致。
❹ executor.submit 方法排定可调用对象的执行时间,然后返回一个
期物,表示这个待执行的操作。
❺ 存储各个期物,后面传给 as_completed 函数。
❻ 显示一个消息,包含国家代码和对应的期物。
❼ as_completed 函数在期物运行结束后产出期物。
❽ 获取该期物的结果。
❾ 显示期物及其结果。
注意,在这个示例中调用 future.result() 方法绝不会阻塞,因为
future 由 as_completed 函数产出。运行示例 17-4 得到的输出如示例
17-5 所示。
示例 17-5 flags_threadpool_ac.py 脚本的输出
$ python3 flags_threadpool_ac.py
Scheduled for BR: <Future at 0x100791518 state=running> ➊
Scheduled for CN: <Future at 0x100791710 state=running>
Scheduled for ID: <Future at 0x100791a90 state=running>
Scheduled for IN: <Future at 0x101807080 state=pending> ➋
Scheduled for US: <Future at 0x101807128 state=pending>
CN <Future at 0x100791710 state=finished returned str> result: 'CN' ➌
BR ID <Future at 0x100791518 state=finished returned str> result: 'BR' ➍
<Future at 0x100791a90 state=finished returned str> result: 'ID'
IN <Future at 0x101807080 state=finished returned str> result: 'IN'
US <Future at 0x101807128 state=finished returned str> result: 'US'
5 flags downloaded in 0.70s
❶ 排定的期物按字母表排序;期物的 repr() 方法会显示期物的状态:
前三个期物的状态是 running,因为有三个工作的线程。
❷ 后两个期物的状态是 pending,等待有线程可用。
❸ 这一行里的第一个 CN 是运行在一个工作线程中的 download_one 函
数输出的,随后的内容是 download_many 函数输出的。
❹ 这里有两个线程输出国家代码,然后主线程中的 download_many 函
数输出第一个线程的结果。
多次运行 flags_threadpool_ac.py 脚本,看到的结果有所不
同。如果把 max_workers 参数的值增大到 5,结果的顺序变化更
多。把 max_workers 参数的值设为 1,代码依序运行,结果的顺
序始终与调用 submit 方法的顺序一致。
我们分析了两个版本的使用 concurrent.futures 库实现的下载脚
本:使用 ThreadPoolExecutor.map 方法的示例 17-3 和使用
futures.as_completed 函数的示例 17-4。如果你对 flags_asyncio.py
脚本的代码好奇,可以看一眼第 18 章中的示例 18-5。
严格来说,我们目前测试的并发脚本都不能并行下载。使用
concurrent.futures 库实现的那两个示例受 GIL(Global Interpreter
Lock,全局解释器锁)的限制,而 flags_asyncio.py 脚本在单个线程中运
行。
读到这里,你可能会对前面做的非正规基准测试有下述疑问。
-
既然 Python 线程受 GIL 的限制,任何时候都只允许运行一个线程,
那么 flags_threadpool.py 脚本的下载速度怎么会比 flags.py 脚本快 5
倍? -
flags_asyncio.py 脚本和 flags.py 脚本都在单个线程中运行,前者怎
么会比后者快 5 倍?