当前位置: 首页 > ai >正文

知微集:Python中的线程(三)

欢迎来到"一起学点什么吧"的合集「NLP知微集」。在这里,我们不愿宏大叙事,只聚焦于自然语言处理领域中那些细微却关键的“齿轮”与“螺丝钉”。我相信,真正深刻的理解,源于对细节的洞察。本期,我将为您拆解的是:[Python中的线程(三)]

知微集:Python中的线程(一)

知微集:Python中的线程(二)

前两期,我们一起对Python中的线程,已经有了较为初步的认识;今天,我们开始Python线程的第三期,主要从以下来探索Python线程:何时使用线程、线程阻塞调用、线程局部数据、线程互斥锁、可重入线程锁、线程条件、线程信号、线程事件、线程计时。
在这里插入图片描述
在这里插入图片描述

何时使用线程

锁被释放的情况包括:

  • 当一个线程执行阻塞 IO 时。
  • 当一个线程执行 C 代码并显式释放锁时。

完全避免锁的方法,例如:

  • 使用第三方 Python 解释器执行 Python 代码。

使用线程进行阻塞 IO

使用线程进行 IO 密集型任务

IO 密集型任务是一种涉及从设备、文件或套接字连接中读取或写入的任务。

这些操作涉及输入和输出(IO),而这些操作的速度受限于设备、硬盘或网络连接。这就是为什么这些任务被称为 IO 相关的。

CPU 真的非常快。现代 CPU,比如 4GHz 的 CPU,每秒可以执行 40 亿条指令,而且你的系统很可能有多个 CPU 核心。

与 CPU 的速度相比,执行 IO 操作非常慢。

与设备交互、读写文件和套接字连接需要调用操作系统的指令(内核),内核将等待操作完成。如果该操作是 CPU 的主要焦点,例如在 Python 程序的主线程中执行,那么 CPU 将花费许多毫秒甚至许多秒无所事事地等待。

执行 IO 操作的线程将在整个操作期间阻塞。在阻塞期间,这会向操作系统发出信号,表明一个线程可以被挂起,而另一个线程可以执行,这被称为上下文切换。

此外,当 Python 解释器执行阻塞 IO 操作时,它会释放 GIL,从而允许 Python 进程中的其他线程执行。

因此,阻塞 IO 为在 Python 中使用线程提供了一个绝佳的用例。

阻塞 IO 操作的例子包括:

  • 从硬盘读取或写入文件。
  • 向标准输出、输入或错误(stdin、stdout、stderr)进行读取或写入。
  • 打印文档
  • 在与服务器的套接字连接上读取或写入字节。
  • 下载或上传文件。
  • 查询服务器。
  • 查询数据库。
  • 拍照或录制视频。

使用释放 GIL 的外部 C 语言线程

我们可能会调用那些会进一步调用第三方 C 库的函数。

通常,这些函数调用会释放 GIL,因为被调用的 C library 不会与 Python 解释器交互。

这为 Python 进程中的其他线程提供了运行的机会。

例如,当使用 Python 标准库中的“hash”模块时,通过 hash.update() 函数对数据进行哈希处理时会释放 GIL。

在使用 OpenSSL 提供的哈希算法时,当对大于 2047 字节的数据进行哈希更新操作,Python GIL 会被释放以允许其他线程运行。

另一个例子是用于管理数据数组的 NumPy 库,它在执行数组上的函数时会释放 GIL。

当一个线程正在等待 IO(例如等待你输入一些内容,或者等待网络中有数据传入)时,Python 会释放 GIL 以便其他线程可以运行。更重要的是,当 NumPy 正在进行数组操作时,Python 同样会释放 GIL。

使用第三方 Python 解释器

存在可以获取并用于执行您的 Python 代码的替代商业和开源 Python 解释器。

这些解释器中的一些可能会实现全局解释器锁(GIL),并且释放它的频率或多或少与 CPython 不同。其他解释器则完全移除了 GIL,允许多个 Python 并发线程并行执行。

包含无全局解释器锁(GIL)的第三方 Python 解释器的例子包括:

  • Jython:一种用 Java 编写的开源 Python 解释器。
  • IronPython:一个用.NET 编写的开源 Python 解释器。

… Jython 不受全局解释器锁(GIL)的限制。这是因为所有 Python 线程都被映射到 Java 线程,并使用标准的 Java 垃圾回收支持(CPython 中 GIL 存在的主要原因是因为引用计数垃圾回收系统)。这里的重要推论是,你可以使用线程来处理用 Python 编写的计算密集型任务。

线程阻塞调用

阻塞调用是指一个函数调用在完成之前不会返回。

所有正常函数都是阻塞调用。没什么大不了的。

在并发编程中,阻塞调用具有特殊含义。

阻塞调用是指那些会等待特定条件并通知操作系统在线程等待期间没有发生事情的函数调用。

操作系统可能会注意到一个线程正在调用阻塞函数,并决定切换到另一个线程的上下文。操作系统管理哪些线程应该运行以及何时运行它们。它通过一种多任务处理方式实现这一点,即正在运行的线程被挂起,挂起的线程被恢复并继续运行。线程的挂起和恢复称为上下文切换。

操作系统倾向于从阻塞线程中切换上下文,允许非阻塞线程运行。

这意味着如果线程执行了一个阻塞函数调用,一个等待的调用,那么它可能会发出信号表示该线程可以被挂起,从而允许其他线程运行。

类似地,许多我们可能传统上认为会阻塞的函数调用,在现代非阻塞并发 API(如 asyncio)中可能有非阻塞版本。

在并发编程中,需要考虑三种类型的阻塞函数调用,它们是:

  • 并发原语上的阻塞调用。
  • 用于 IO 的阻塞调用。
  • 阻塞调用 sleep。

并发原语上的阻塞调用

在并发编程中,存在许多阻塞函数调用的例子。

常见的例子包括:

  • 等待锁,例如调用 threading.Lock 上的 acquire()。

    通过 threading.Lock 获取互斥锁(mutex)是一个阻塞调用。

    这是通过调用 acquire() 来实现的,如果锁可用,它将立即返回(不会阻塞),否则将阻塞直到可用。lock.acquire()

    也可以通过使用上下文管理器更清晰地实现,如果锁不可用,它也可能被阻塞。with lock:

  • 等待通知,例如调用 threading.Condition 对象的 wait()方法。

    线程可以等待程序状态发生某些变化的通知。

    with condition:# block until notifiedcondition.wait()
    
  • 等待线程终止,例如调用 threading.Thread 的 join() 方法。

    一个线程在等待另一个线程终止时可能会被阻塞。

    这是通过等待线程调用运行在其他线程上的 join()函数来实现的。

  • 等待信号量,例如调用 threading.Semaphore 的 acquire() 方法。

    一个 threading.Semaphore 提供了一个线程安全的计数器,该计数器的上限是预定义的。

    一个线程必须使用 acquire() 函数来获取信号量上的位置。一旦信号量已满,额外的获取尝试必须被阻塞,直到一个位置变得可用。

  • 等待事件,例如调用 threading.Event 的 wait() 方法。

    一个 threading.Event 是一个线程安全的布尔标志。

    一个线程可以通过 wait() 函数来阻塞等待事件被设置。

  • 等待屏障,例如调用 threading.Barrier 的 wait() 方法。

    一个 threading.Barrier 是一个同步原语,它允许多个线程进行协调。

    线程将在屏障上阻塞,直到固定数量的参与者到达,然后它们将全部被释放。

在并发编程中,阻塞于并发原语是正常现象。

I/O 的阻塞调用

传统上,与 IO 交互的函数调用通常是阻塞函数调用。也就是说,它们与并发原语中的阻塞调用具有相同的意义。

等待 IO 设备响应是向操作系统发出线程可以上下文切换的另一个信号。

常见的例子包括:

  • 硬盘驱动器(Hard disk drive):读取、写入、追加、重命名、删除等文件。
  • 外设(Peripherals):鼠标、键盘、屏幕、打印机、串口、摄像头等。
  • 互联网(Internet):下载和上传文件、获取网页、查询 RSS 等。
  • 数据库(Database):选择、更新、删除等 SQL 查询。
  • Email:发送邮件、接收邮件、查询收件箱等。

还有许多其他示例,大多与套接字相关。

与 CPU 相比,使用设备进行 IO 通常非常慢。

与向文件或套接字读取或写入一些字节相比,CPU 可以执行数量级的更多指令。

设备 IO 由操作系统和设备协调。这意味着操作系统可以从设备收集或发送一些字节,然后在需要时切换回阻塞线程,允许函数调用继续进行。

阻塞调用 sleep

sleep() 函数是由底层操作系统提供的一项功能,我们可以在程序中加以利用。

这是一个阻塞函数调用,它会暂停线程,使其固定时间(以秒为单位)内阻塞。

这可以通过调用 time.sleep() 来实现。

它和并发原语以及阻塞 IO 函数调用具有相同意义上的阻塞调用。它向操作系统发出信号,表明该线程正在等待,并且是上下文切换的良好候选者。

在并发编程中,添加睡眠可以是一种通过线程模拟固定间隔计算工作量的有用方法。

在展示并发编程的工作示例时,我们经常使用睡眠,但向代码中添加睡眠也有助于单元测试和调试并发失败条件,例如通过强制动态应用程序中事件的时序错误来引发竞态条件。

什么是wait

wait 是线程被阻塞时执行的操作。

通常在使用并发原语时,函数调用本身可能具有 wait() 或 await() 的名称,这表示线程将阻塞,直到条件满足。

线程局部数据

线程局部数据存储是多线程编程中的一个机制,它允许数据以每个线程私有的方式存储和访问。

它可能被称为“线程局部存储”、“线程私有”或简单地称为“线程局部”。

通常这涉及创建一个线程局部对象实例,该实例在数据被设置和检索的对象之间共享。

在线程局部对象上存储和检索的数据可能跨线程具有相同的变量名,这意味着相同的代码可以在不同线程中使用,这是使用工作线程时的常见方法。

重要的是,对线程局部对象的读取和写入在线程级别上是独特且私有的。这意味着一个线程可以写入一个名为“address”的变量,而另一个线程可以读取或写入一个具有相同名称的变量,但它不会与第一个线程存储的变量交互。

如果执行相同的代码并使用相同的线程局部实例,那么每个线程都有自己的命名变量的私有版本及其在线程局部存储中分配的值。

当多个线程需要存储本地数据,如部分解决方案或临时变量,并且需要在执行时使用相同的代码,例如相同的对象实例时,这会很有用。

线程可以通过 threading.local 类的实例存储局部数据。

其他线程可以使用局部的相同属性名,但值将限制在每个线程中。

这就像每个线程都有一个命名空间,称为“线程局部数据”。这意味着线程无法访问或读取其他线程的局部数据。

重要的是,每个线程必须持有“局部”实例才能访问存储的数据。

使用线程局部数据的示例

# 线程局部存储示例
from time import sleep
import threading# 自定义目标函数
def task(value):# 创建局部存储local = threading.local()# 存储数据local.value = value# 阻塞一会儿sleep(value)# 检索值print(f'Stored value: {local.value}')# 创建并启动线程
threading.Thread(target=task, args=(1,)).start()
# 创建并启动另一个线程
threading.Thread(target=task, args=(2,)).start()
Stored value: 1
Stored value: 2

线程局部变量的令人兴奋之处在于,我们可以在不同线程之间共享同一个实例。

共享线程局部实例的示例

可以创建一个线程局部对象的实例,并在多个线程之间共享它。

重要的是,每个线程可以在同一个同名的线程局部存储中存储唯一数据,并且不会相互干扰。

# 使用共享实例的线程局部存储示例
from time import sleep
import threading# 自定义目标函数
def task(value, local):# 存储数据local.value = value# 阻塞一会儿sleep(value)# 检索值print(f'Stored value: {local.value}')# 创建共享的线程局部实例
local = threading.local()
# 创建并启动线程
start_time = time.time()
threading.Thread(target=task, args=(1,local)).start()
print(f'Started in {time.time() - start_time:.2f}s')
# 等待一会儿
sleep(0.5)
# 创建并启动另一个线程
threading.Thread(target=task, args=(2,local)).start()
print(f'Started in {time.time() - start_time:.2f}s')
Started in 0.00s
Started in 0.50s
Stored value: 1
Stored value: 2

首先运行示例会创建线程局部实例,并将其传递给第一个线程,该线程将其存储在名为“value”的线程局部实例中,并休眠一秒钟。

主线程随后阻塞了一小段时间。

接下来,创建并启动了第二个线程,将其值存储在与相同名称“value”相同的线程局部实例中,然后阻塞了两秒钟。

第一个线程醒来并报告其值为“1”,然后第二个线程醒来并报告其值为“2”。

这表明当线程局部实例在多个线程间共享,并且每个线程都将其私有数据存储在相同的属性中时,情况是这样的。

全局线程局部实例示例

通过将线程局部实例设为全局变量,并在每个函数中直接访问它来实现相同的结果。

通过在目标任务函数中显式定义全局变量,然后像之前一样使用它来实现

使用线程局部变量作为线程私有的存储空间是线程局部机制的一种常见使用模式。

# 使用全局实例的线程本地存储示例
import time
from time import sleep
import threading# 自定义目标函数
def task(value):global local# 存储数据local.value = value# 阻塞一会儿sleep(value)# 检索值print(f'Stored value: {local.value}')# 创建共享的线程本地实例
local = threading.local()
start_time = time.time()
# 创建并启动线程
threading.Thread(target=task, args=(1,)).start()
print(f'Started in {time.time() - start_time:.2f}s')
# 等待一会儿
sleep(0.5)
# 创建并启动另一个线程
threading.Thread(target=task, args=(2,)).start()
print(f'Started in {time.time() - start_time:.2f}s')
Started in 0.00s
Started in 0.50s
Stored value: 1
Stored value: 2

何时使用线程局部数据?

当每个线程必须存储不同的数据,且这些数据不应被其他线程访问时,线程本地数据很有用。

一个线程局部存储机制允许每个线程:

  • 执行相同的代码。
  • 使用相同的变量名。
  • 共享相同的线程局部实例。

这是在确保线程存储的数据只能被该线程访问的同时实现的,使其在线程级别上是私有的,因此得名“线程局部”。

何时不应使用线程局部数据?

线程局部数据不应用于在多个线程之间共享数据。

这是因为数据变量对每个线程都是私有的,并且无法被访问。

Thread-Local 的常见模式是什么?

  1. 全局线程局部:将线程局部作为全局变量创建,并在多个线程执行的函数中使用它。
  2. 静态线程局部:将线程局部作为类变量(例如静态)创建,并在多个方法和多个线程执行的相同对象的多个实例中使用它。

线程互斥锁

在编写并发程序时,我们可能需要在线程之间共享数据或资源,这些通常需要用互斥锁来保护。

互斥锁用于保护代码的关键部分免受并发执行。

可以通过 threading.Lock 类在 Python 中使用互斥锁(mutex)

互斥锁

互斥锁或互斥量是一种用于防止竞态条件的同步原语。

竞态条件是一种并发失败情况,当两个线程执行相同的代码并访问或更新相同的资源(例如数据变量、流等)时,会导致资源处于未知且不一致的状态。

竞态条件常常导致程序出现意外行为和/或数据损坏。

能够被多个线程并发执行且可能导致竞态条件的代码敏感部分被称为临界区。临界区可以指单个代码块,但也指多个函数从多个地方对同一数据变量或资源进行访问。

确保互斥最常用的机制是互斥锁或简称互斥锁(mutex),它是一种在底层硬件中具有支持的特殊对象。基本思想是每个临界区都由一个锁来保护。

互斥锁可用于确保同一时间只有一个线程执行代码的关键部分,而所有其他试图执行相同代码的线程必须等待当前正在执行的线程完成关键部分并释放锁。

当一个线程“拥有”锁——也就是说,它已经返回了锁函数的调用,但还没有调用解锁函数——任何其他试图执行关键部分代码的线程都会在其锁函数调用中等待。

每个线程必须在临界区的开始尝试获取锁。如果锁尚未被获取,则某个线程将获取它,而其他线程必须等待获取锁的线程释放它。

在线程执行临界区代码之前,它必须通过调用互斥锁函数来“obtain获取”互斥锁,当它完成在临界区执行代码后,应该通过调用解锁函数来“relinquish释放”互斥锁。

如果锁没有被获取,我们可能称其为处于“未锁定(unlocked)”状态。而如果锁已被获取,我们可能称其为处于“已锁定(locked)”状态。

  • UnLocked:锁未被获取,并且可以被下一个尝试获取的线程获取。
  • Locked:该锁已被一个线程获取,任何试图获取它的线程都必须等待它被释放。

锁是在未锁定状态下创建的。

如何使用互斥锁

Python 通过 threading.Lock 类提供互斥锁。

锁的实例可以被创建,然后在线程访问临界区之前获取,在临界区之后释放。

任何时候只有一个线程可以拥有锁。如果一个线程没有释放已获得的锁,它将无法再次被获取。

尝试获取锁的线程将被阻塞,直到获取到锁,例如如果另一个线程当前持有锁然后释放了它。

我们可以通过将“blocking”参数设置为 False 来尝试获取锁而不阻塞。如果无法获取锁,则返回 False 值。

lock.acquire(blocking=false)

我们也可以尝试带超时地获取锁,即在设定的秒数内等待获取锁,如果超时则放弃。如果无法获取锁,则返回值 False。

lock.acquire(timeout=10)

我们也可以通过上下文管理器协议,使用 with 语句来使用锁,使得临界区成为锁使用过程中的一个代码块。

...
# create a lock
lock = Lock()
# acquire the lock
with lock:# ...

这是首选用法,因为它清楚地表明了受保护代码的开始和结束位置,并确保锁始终被释放,即使临界区内部出现异常或错误也是如此。

我们还可以通过 locked()函数检查锁是否当前被线程获取。

...
# check if a lock is currently acquired
if lock.locked():# ...

使用互斥锁的示例

# SuperFastPython.com
# 互斥锁(mutex)示例
from time import sleep
from random import random
from threading import Thread
from threading import Lock# 工作函数
def task(lock, identifier, value):# 获取锁with lock:print(f'>thread {identifier} got the lock, sleeping for {value}')sleep(value)# 创建共享锁
lock = Lock()
# 启动几个尝试执行相同临界区的线程
for i in range(10):# 启动线程Thread(target=task, args=(lock, i, random())).start()
# 等待所有线程完成...

运行示例会启动十个线程,所有线程都执行一个自定义的目标函数。

每个线程尝试获取锁,一旦获取成功,它们会报告一条包含其 ID 以及它们在释放锁之前将睡眠多长时间的消息。

由于使用了随机数,您的具体结果可能会有所不同。

>thread 0 got the lock, sleeping for 0.3595121656333118
>thread 1 got the lock, sleeping for 0.4432990299676699
>thread 2 got the lock, sleeping for 0.20874337791325426
>thread 3 got the lock, sleeping for 0.9722774087786338
>thread 4 got the lock, sleeping for 0.7095006286856006
>thread 5 got the lock, sleeping for 0.5762733611611981
>thread 6 got the lock, sleeping for 0.27261346362585626
>thread 7 got the lock, sleeping for 0.6444981069336172
>thread 8 got the lock, sleeping for 0.43306723744296294
>thread 9 got the lock, sleeping for 0.9158837061873222

如果不释放锁会怎样?

不释放锁是一种并发错误。

如果一个线程获得了锁但没有释放它,那么这个锁就不能被再次获取,受临界区保护的代码也无法再次执行。

如果忽略锁会怎样?

忽视锁是一种并发错误。

可以编写代码,使得某些线程遵守锁,而另一些线程不遵守。这很可能导致竞争条件,从而违背了设置锁的初衷。

该锁只有在所有对临界区(数据、资源等)的访问中都强制执行时,才能保护临界区。

线程锁 threading.Lock 是否可重入?

不可以

可重入锁是一种锁,如果被一个线程持有,该线程可以再次获取该锁。

threading.Thread 锁不是可重入的,这意味着如果一个线程获取了锁,并在代码的其他部分尝试再次获取它,这将导致阻塞,从而引发死锁错误(一种并发失败条件)。

可重入互斥锁在 threading.RLock 类中可用。

可重入线程锁

标准的互斥锁不允许线程多次获取锁。这意味着在一个临界区中的代码不能调用或执行受同一锁保护的另一个临界区中的代码。相反,需要一种不同类型的互斥锁,称为可重入锁。

可重入锁是一种可以被同一个线程多次获取的锁。

可以通过 threading.RLock 类在 Python 中使用可重入锁

可重入锁

可重入互斥锁,简称“可重入互斥量”或“可重入锁”,类似于互斥锁,但它允许线程多次获取该锁。

可重入锁是一种同步原语,可以被同一个线程多次获取。 […] 在锁定状态下,某个线程拥有该锁;在未锁定状态下,没有线程拥有它。

一个线程可能因为多种原因需要多次获取同一个锁。

我们可以想象关键部分分布在一个或多个函数中,每个部分都由同一个锁保护。线程在正常执行过程中可能会调用这些函数,也可能从一个关键部分调用另一个关键部分。

一个(非可重入)互斥锁的局限性在于,如果一个线程已经获取了该锁,则它无法再次获取该锁。实际上,这种情况将导致死锁,因为它将永远等待锁被释放以便获取,但它持有该锁且不会释放。

可重入锁允许线程在已经获取该锁的情况下再次获取相同的锁。这允许线程在受相同可重入锁保护的情况下,从临界区内部执行临界区。

每次线程获取锁时,也必须释放它,这意味着拥有该线程的获取和释放操作存在递归层次。因此,这种类型的锁有时被称为“递归互斥锁”。

锁与可重入锁的区别

Python 通过 threading.Lock 类提供互斥锁,并通过 threading.RLock 类提供可重入锁。

  • threading.Lock: Python 互斥锁。
  • threading.RLock: Python 可重入互斥锁。

一个 threading.Lock 只能被获取一次,一旦被获取,同一线程或任何其他线程都不能再次获取它,直到它被释放。

一个 threading.RLock 可以被同一线程多次获取,尽管一旦被某个线程获取,其他线程不能获取它,直到它被释放。

重要的是,每次当同一个线程获取 threading.RLock 时,必须以相同次数释放,直到它可被另一个线程再次获取。这意味着 acquire()的调用次数必须与 release()的调用次数相同,RLock 才能返回到未锁定状态。

如何使用可重入锁

Python 通过 threading.RLock 类提供锁。

RLock 的实例可以在线程访问临界区之前创建并获取,在临界区之后释放。

尝试获取锁的线程将被阻塞,直到获取到锁,例如如果另一个线程当前持有锁(一次或多次)然后释放它。

我们可以通过将“blocking”参数设置为 False 来尝试获取锁而不阻塞。如果无法获取锁,则返回 False 值。

lock.acquire(blocking=false)

我们也可以尝试带超时地获取锁,即在设定的秒数内等待获取锁,如果超时则放弃。如果无法获取锁,则返回值 False。

lock.acquire(timeout=10)

我们也可以通过上下文管理器协议使用 with 语句来使用可重入锁,使得临界区成为锁使用过程中的一个代码块。

...
# create a reentrant lock
lock = RLock()
# acquire the lock
with lock:# ...

使用可重入锁的示例

鉴于目标任务函数受锁保护,且调用也受相同锁保护的报告函数,我们可以使用可重入锁,这样如果线程在 task()中获取了锁,它将能够在 report()函数中重新进入该锁。

# 可重入锁示例
from time import sleep
from random import random
from threading import Thread
from threading import RLock# 报告函数
def report(lock, identifier):# 获取锁with lock:print(f'>thread {identifier} done')# 工作函数
def task(lock, identifier, value):# 获取锁with lock:print(f'>thread {identifier} sleeping for {value}')sleep(value)# 报告report(lock, identifier)# 创建共享的可重入锁
lock = RLock()
# 启动几个尝试执行相同临界区的线程
for i in range(10):# 启动线程Thread(target=task, args=(lock, i, random())).start()
# 等待所有线程完成...

运行示例会启动十个线程来执行目标任务函数。

一次只能有一个线程获取锁,获取后,会阻塞并重新进入同一个锁以报告完成消息。

如果使用非可重入锁,例如 threading.Lock,则线程将永远阻塞等待锁变得可用,但它无法实现这一点,因为线程已经持有锁。

线程条件

条件允许线程等待和被通知。

通过 threading.Condition 类在 Python 中使用线程条件对象。

线程条件(Threading Condition)

在并发中,条件(也称为监视器)允许多个线程被通知某些结果。

结合了互斥锁(mutex)和条件变量

一个互斥锁可以用来保护一个临界区,但它不能用来通知其他线程一个条件已经改变或满足。

一个条件可以被一个线程(类似于互斥锁)获取,之后它可以等待另一个线程通知它某些事情已经改变。在等待时,该线程被阻塞,并释放锁以便其他线程获取。

然后,另一个线程可以获取条件变量,进行更改,并通知一个、全部或等待该条件变量的线程子集某些内容已更改。等待的线程随后可以被唤醒(由操作系统调度),重新获取条件变量(互斥锁),检查任何已更改的状态,并执行所需的操作。

这表明一个条件内部使用互斥锁(用于获取/释放条件),但它还提供了其他功能,例如允许线程在条件上等待,并允许线程通知其他在条件上等待的线程。

使用条件对象

Python 通过 threading.Condition 类提供了一个条件变量。

可以创建一个条件对象,默认情况下它将创建一个新的可重入互斥锁(threading.RLock 类),该锁将内部使用。

condition = threading.Condition()
...
# acquire the condition
condition.acquire()
# wait to be notified
condition.wait()
# release the condition
condition.release()

直接调用 acquire() 和 release() 函数的另一种方法是使用上下文管理器,它将自动为我们执行获取/释放操作

...
# acquire the condition
with condition:# wait to be notifiedcondition.wait()# condition.wait(timeout=10) #该参数允许线程在指定的秒数时间限制后停止阻塞。# condition.wait_for(all_data_collected) #函数可用于仅在满足某个条件时才解锁等待的线程,例如调用返回布尔值的函数。

通过 notify()函数通知一个正在等待的线程

...
# acquire the condition
with condition:# notify a waiting threadcondition.notify()

使用条件变量的等待和通知示例

# 使用条件变量的等待/通知示例
from time import sleep
from threading import Thread
from threading import Condition# 准备一些工作的目标函数
def task(condition, work_list):# 阻塞一会儿sleep(1)# 向工作列表添加数据work_list.append(33)# 通知等待的线程工作已完成print('Thread sending notification...')with condition:condition.notify()# 创建条件变量
condition = Condition()
# 准备工作列表
work_list = list()
# 等待通知数据已准备就绪
print('Main thread waiting for data...')
with condition:# 启动新线程执行一些工作worker = Thread(target=task, args=(condition, work_list))worker.start()# 等待通知condition.wait()
# 我们知道数据已准备就绪
print(f'Got data: {work_list}')

首先运行示例会创建条件和任务列表。

新线程被定义并启动。线程暂时阻塞,向列表中添加数据,然后通知等待的线程。

与此同时,主线程等待新线程的通知,一旦收到通知,它就知道数据已准备好并报告结果。

使用 wait() 和 notify_all() 的条件变量示例

from time import sleep
from random import random
from threading import Thread
from threading import Condition# 目标函数
def task(condition, number):# 等待通知print(f'Thread {number} waiting...')with condition:condition.wait()# 阻塞一会儿value = random()sleep(value)# 报告结果print(f'Thread {number} got {value}')# 创建条件变量
condition = Condition()
# 启动一堆等待通知的线程
for i in range(5):worker = Thread(target=task, args=(condition, i))worker.start()
# 阻塞一会儿
sleep(1)
# 通知所有等待的线程可以运行
with condition:# 等待通知condition.notify_all()
# 阻塞直到所有非守护线程完成...

首先运行示例会创建五个线程,这些线程立即开始运行,并且都获取了条件变量并阻塞等待通知。

主线程阻塞片刻,然后通知所有五个等待线程。等待线程被唤醒,逐一获取条件变量中的锁,执行其处理并报告其结果。

程序在所有线程完成其处理后将退出。

wait_for() 的使用示例:条件

# 使用条件变量等待的示例
from time import sleep
from random import random
from threading import Thread
from threading import Condition# 目标函数
def task(condition, work_list):# 获取条件变量with condition:# 阻塞一会儿value = random()sleep(value)# 向列表添加工作work_list.append(value)print(f'Thread added {value}')# 通知等待的线程condition.notify()# 创建条件变量
condition = Condition()
# 定义工作列表
work_list = list()
# 启动一堆线程,它们将向列表添加工作
for i in range(5):worker = Thread(target=task, args=(condition, work_list))worker.start()
# 等待所有线程将工作添加到列表中
with condition:# 等待通知condition.wait_for(lambda : len(work_list)==5)print(f'Done, got: {work_list}')

首先运行示例会启动五个线程,每个线程将获取条件变量,生成一个随机值,将其添加到共享工作列表中,并通知主线程。

主线程等待条件变量,并在每个新线程完成时收到通知,但直到 lambda 可调用对象返回 True 时才实际继续执行并打印消息,即当列表中的值数量与线程数量相匹配时。

线程信号量(Thread Semaphore)

信号量本质上是一个由互斥锁保护的计数器,用于限制可以访问资源的线程数量。

在 Python 中,可以通过 threading.Semaphore 类使用信号量。

什么是信号量

信号量是一种并发原语,它允许限制可以获取保护关键区的锁的线程数量。

它是一种互斥锁(mutex)锁的扩展,增加了可以获取锁的线程数量计数,以便在额外的线程阻塞之前。一旦满员,新线程只能在一个持有信号量的现有线程释放一个位置后才能获取信号量的位置。

在内部,信号量维护一个由互斥锁保护的计数器,每次获取信号量时计数器会递增,每次释放信号量时计数器会递减。

当创建一个信号量时,计数器的上限被设定。如果设定为 1,那么信号量将像互斥锁一样运行。

一个信号量提供了一个有用的并发工具,用于限制可以并发访问资源的线程数量。一些例子包括:

  • 限制对服务器的同时套接字连接。
  • 限制硬盘上的并发文件操作。
  • 限制并发计算。

如何使用信号量

Python 通过 threading.Semaphore 类提供信号量。

在创建 threading.Semaphore 实例时,必须对其进行配置以设置内部计数器的限制。此限制将匹配可以持有信号量的并发线程数量。

...
# create a semaphore with a limit of 100
semaphore = Semaphore(100)

在此实现中,每次获取信号量时,内部计数器都会递减。每次释放信号量时,内部计数器都会递增。如果信号量没有可用的位置,则无法获取信号量,在这种情况下,尝试获取它的线程必须阻塞,直到某个位置变得可用。

# 信号量可以通过调用 acquire()函数来获取
# 默认情况下,这是一个阻塞调用,这意味着调用线程将被阻塞,直到信号量上出现一个可用位置。
semaphore.acquire()# "blocking" 参数可以设置为 False,在这种情况下,如果信号量上没有可用位置,线程将不会阻塞,函数将立即返回,返回 False 值表示信号量未被获取,或者返回 True 值表示成功获取了位置。
semaphore.acquire(blocking=False)# “timeout”参数可以设置为调用线程在信号量不可用时愿意等待的秒数,如果无法获得则放弃。同样,acquire()函数在能够获得位置时将返回 True,否则返回 False。
semaphore.acquire(timeout=10)

一旦获取,可以通过调用 release() 函数再次释放信号量。

semaphore.release()
with semaphore:# ...

使用信号量的示例

# 使用信号量的示例
from time import sleep
from random import random
from threading import Thread
from threading import Semaphore# 目标函数
def task(semaphore, number):# 尝试获取信号量with semaphore:# 处理value = random()sleep(value)# 报告结果print(f'Thread {number} got {value}')# 创建信号量
semaphore = Semaphore(2)
# 创建一系列线程
for i in range(10):worker = Thread(target=task, args=(semaphore, i))worker.start()
# 等待所有工作线程完成...

首先运行示例会创建信号量实例,然后启动十个工作线程。

所有十个线程尝试获取信号量,但每次只有两个线程被授予位置。在信号量上的线程完成工作后,会在随机的时间间隔内释放信号量。

每个信号量的释放(通过上下文管理器)允许另一个线程获取一个位置并执行其计算,同时仅允许两个线程在任何时候被处理,尽管所有十个线程都在执行它们的 run 方法。

线程事件(Thread Event)

事件是一个线程安全的布尔标志。

使用 Python 中的 threading.Event 类来使用一个事件对象。

如何使用事件对象

Python 通过 threading.Event 类提供了一个事件对象。

事件是一种简单的并发原语,允许线程之间进行通信。

一个 threading.Event 对象封装了一个布尔变量,该变量可以是“设置”(True)或“未设置”(False)。共享该事件实例的线程可以检查事件是否已设置、设置事件、清除事件(使其未设置)或等待事件被设置。

线程中的 Event 提供了一种便捷的方式,用于在多个线程之间共享一个布尔变量,该变量可以作为触发某个动作的信号。

这是线程间通信中最简单的机制之一:一个线程发出事件信号,其他线程等待该信号。

# 创建一个事件对象,该事件将处于“未设置”状态
event = threading.Event()# 创建后,我们可以通过 is_set() 函数检查事件是否已被设置,如果事件已设置,该函数将返回 True,否则返回 False。
# check if the event is set
if event.is_set():# do something...# 该事件可以通过 set() 函数进行设置。任何等待该事件被设置的线程都将被通知。
event.set()# 通过 clear() 函数可以将事件标记为“未设置”(无论其当前是否已设置)。
event.clear()# 最后,线程可以通过 wait()函数等待事件被设置。调用此函数将阻塞,直到事件被标记为已设置(例如,另一个线程调用了 set()函数)。如果事件已经设置,wait()函数将立即返回。
event.wait()
# 在审查 threading.Event 的源代码时,只有当 set() 函数被调用时,等待的线程才会被通知,而 clear() 函数被调用时则不会。# 可以向 wait() 函数传递一个“超时”参数,该参数将限制线程等待事件被标记为已设置的时间(以秒为单位)。
# 如果等待期间事件被设置,wait() 函数将返回 True;否则,返回 False 的值表示事件未被设置且调用超时。
event.wait(timeout=10)

使用事件对象的示例

# 使用事件对象的示例
from time import sleep
from random import random
from threading import Thread
from threading import Event# 目标任务函数
def task(event, number):# 等待事件被设置event.wait()# 开始处理value = random()sleep(value)print(f'Thread {number} got {value}')# 创建共享事件对象
event = Event()
# 创建一系列线程
for i in range(5):thread = Thread(target=task, args=(event, i))thread.start()
# 阻塞一会儿
print('Main thread blocking...')
sleep(2)
# 在所有线程中开始处理
event.set()
# 等待所有线程完成...

首先运行示例会创建并启动五个线程。每个线程在开始处理之前都会等待它前面的事件。

主线程暂时阻塞,让所有线程开始运行并等待事件。然后主线程设置该事件。这会触发所有五个线程执行它们的处理并报告消息。

计时线程(Timer Threads)

一个计时器线程将在时间延迟后执行一个函数。

可以通过 threading.Timer 类在 Python 中使用计时器线程对象。

如何使用计时器线程

Python 在 threading.Timer 类中提供了一个计时器线程。

threading.Timer 是 threading.Thread 类的扩展,这意味着我们可以像使用普通线程实例一样使用它。

它提供了一种在经过一定时间间隔后执行函数的实用方法。

# 创建一个计时器实例并进行配置。这包括在执行前等待的时间(以秒为单位)、触发时执行的函数以及传递给目标函数的任何参数。
timer = Timer(10, task, args=(arg1, arg2))# 目标任务函数不会执行,直到时间流逝完毕。
# 一旦创建,必须通过调用 start() 函数来启动线程,这将开始计时器。
timer.start()# 如果我们决定在目标函数执行之前取消定时器,可以通过调用 cancel()函数来实现。
timer.cancel()# 一旦时间已过,目标函数已执行,计时器线程就不能再被复用了。

使用计时器线程的示例

# 使用线程定时器对象的示例
from threading import Timer# 目标任务函数
def task(message):# 报告自定义消息print(message)# 创建线程定时器对象
timer = Timer(3, task, args=('Hello world',))
# 启动定时器对象
timer.start()
# 等待定时器完成
print('Waiting for the timer...')

首先运行示例会创建一个 threading.Timer 对象,并指定目标任务函数 task()以及单个参数。

计时器随后启动,主线程等待计时器线程完成。

计时器线程运行,等待配置的 3 秒钟,然后执行 task()函数报告自定义消息。

取消计时器线程的示例

# 使用线程定时器对象的示例
from time import sleep
from threading import Timer# 目标任务函数
def task(message):# 报告自定义消息print(message)# 创建线程定时器对象
timer = Timer(3, task, args=('Hello world',))
# 启动定时器对象
timer.start()
# 阻塞一会儿
sleep(1)
# 取消线程
print('Canceling the timer...')
timer.cancel()
Canceling the timer...

运行示例会启动计时器线程,目标任务函数 task()将在 3 秒后触发。

主线程等待一秒钟,然后取消计时器线程,阻止task()函数执行并向报告消息。

结语

“见微知著,积跬步以至千里”,至此,关于 [Python线程] 的微探索之旅才缓缓拉开序幕,后续将一起详细去探索一下Python的线程相关问题。感谢您的耐心阅读。希望这片小小的“知微”碎片,能让你对Python线程有更清晰的认识。

点赞关注不迷路,点击合集标签「[#NLP知微集](javascript:😉」,不错过每一次细微的洞察。

下期再见,继续我们的拆解之旅!

Reference

行,计时器线程就不能再被复用了。


## 使用计时器线程的示例```python
# 使用线程定时器对象的示例
from threading import Timer# 目标任务函数
def task(message):# 报告自定义消息print(message)# 创建线程定时器对象
timer = Timer(3, task, args=('Hello world',))
# 启动定时器对象
timer.start()
# 等待定时器完成
print('Waiting for the timer...')

首先运行示例会创建一个 threading.Timer 对象,并指定目标任务函数 task()以及单个参数。

计时器随后启动,主线程等待计时器线程完成。

计时器线程运行,等待配置的 3 秒钟,然后执行 task()函数报告自定义消息。

取消计时器线程的示例

# 使用线程定时器对象的示例
from time import sleep
from threading import Timer# 目标任务函数
def task(message):# 报告自定义消息print(message)# 创建线程定时器对象
timer = Timer(3, task, args=('Hello world',))
# 启动定时器对象
timer.start()
# 阻塞一会儿
sleep(1)
# 取消线程
print('Canceling the timer...')
timer.cancel()
Canceling the timer...

运行示例会启动计时器线程,目标任务函数 task()将在 3 秒后触发。

主线程等待一秒钟,然后取消计时器线程,阻止task()函数执行并向报告消息。

结语

“见微知著,积跬步以至千里”,至此,关于 [Python线程] 的微探索之旅才缓缓拉开序幕,后续将一起详细去探索一下Python的线程相关问题。感谢您的耐心阅读。希望这片小小的“知微”碎片,能让你对Python线程有更清晰的认识。

点赞关注不迷路,点击合集标签「[#NLP知微集](javascript:😉」,不错过每一次细微的洞察。

下期再见,继续我们的拆解之旅!

Reference

  • https://superfastpython.com/threading-in-python/
http://www.xdnf.cn/news/20139.html

相关文章:

  • JavaScript 中的并发编程实践与误区:一次深入的探讨
  • 软考高级 — 系统规划与管理师考试知识点精要
  • 电脑活动追踪全解析:六款软件助企业实现数字化精细管理
  • whl编译命令作用解释
  • 【完整源码+数据集+部署教程】加工操作安全手套与手部检测系统源码和数据集:改进yolo11-cls
  • mysq集群高可用架构之组复制MGR(单主复制-多主复制)
  • 2025 年 8 个最佳网站内容管理系统(CMS)
  • 小迪安全v2023学习笔记(七十八讲)—— 数据库安全RedisCouchDBH2database未授权CVE
  • LeetCode 刷题【65. 有效数字】
  • 机器学习算法介绍二
  • postgresql 通过dblink实现 跨库查询
  • PostgreSQL收集pg_stat_activity记录的shell工具pg_collect_pgsa
  • zoho crm notes add customer fields
  • 数字人打断对话的逻辑
  • 本地 Ai 离线视频去水印字幕!支持字幕、动静态水印去除!
  • python-虚拟试衣
  • LVS、Nginx与HAProxy负载均衡技术对比介绍
  • 任意齿形的齿轮和齿条相互包络工具
  • Linux常见命令总结 合集二:基本命令、目录操作命令、文件操作命令、压缩文件操作、查找命令、权限命令、其他命令
  • Process Explorer 学习笔记(第三章3.2.5):状态栏信息详解
  • PyTorch 训练显存越跑越涨:隐式保留计算图导致 OOM
  • 机器学习周报十二
  • 基于Echarts+HTML5可视化数据大屏展示-旅游智慧中心
  • CC-Link IE FB 转 DeviceNet 实现欧姆龙 PLC 与松下机器人在 SMT 生产线锡膏印刷环节的精准定位控制
  • docker 安装kafaka常用版本
  • 错误波形曲线
  • Qt信号与槽机制全面解析
  • Redis 事务:餐厅后厨的 “批量订单处理” 流程
  • 两条平面直线之间通过三次多项式曲线进行过渡的方法介绍
  • 雅菲奥朗SRE知识墙分享(七):『可观测性的定义与实践』