Python 多线程编程全面学习指南
文章目录
- Python 多线程编程全面学习指南
- 一、多线程基础概念
- 1.1 线程与进程的区别
- 1.2 全局解释器锁(GIL)
- 二、线程创建与管理
- 2.1 创建线程的两种方式
- 方式1:函数式创建
- 方式2:类继承式创建
- 2.2 线程常用方法与属性
- 三、线程同步机制
- 3.1 锁(Lock)
- 3.2 可重入锁(RLock)
- 3.3 信号量(Semaphore)
- 3.4 事件(Event)
- 3.5 条件变量(Condition)
- 3.6 屏障(Barrier)
- 四、线程间通信
- 4.1 使用队列(Queue)
- 4.2 线程局部数据
- 五、线程池与高级用法
- 5.1 使用ThreadPoolExecutor
- 5.2 定时器线程
- 5.3 线程优先级队列
- 六、多线程编程最佳实践
- 6.1 避免常见陷阱
- 6.2 性能优化技巧
- 6.3 调试与监控
- 七、多线程应用场景
- 7.1 适合多线程的场景
- 7.2 不适合多线程的场景
- 八、学习资源推荐
Python 多线程编程全面学习指南
一、多线程基础概念
1.1 线程与进程的区别
特性 | 进程 | 线程 |
---|---|---|
资源分配 | 独立内存空间 | 共享进程内存 |
创建开销 | 大 | 小 |
通信方式 | 管道、套接字等 | 共享变量 |
上下文切换 | 开销大 | 开销小 |
安全性 | 高(相互隔离) | 低(需要同步机制) |
Python中的限制 | 无GIL限制 | 受GIL限制 |
1.2 全局解释器锁(GIL)
- Python解释器的设计特性
- 同一时刻只允许一个线程执行Python字节码
- 对I/O密集型任务影响小,对CPU密集型任务影响大
- 解决方案:使用多进程或C扩展绕过GIL
二、线程创建与管理
2.1 创建线程的两种方式
方式1:函数式创建
import threading
import timedef print_numbers():for i in range(5):time.sleep(0.5)print(f"Number: {i}")def print_letters():for letter in 'ABCDE':time.sleep(0.7)print(f"Letter: {letter}")# 创建线程
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)# 启动线程
t1.start()
t2.start()# 等待线程结束
t1.join()
t2.join()print("All threads completed!")
方式2:类继承式创建
class MyThread(threading.Thread):def __init__(self, name, delay):super().__init__()self.name = nameself.delay = delaydef run(self):print(f"Thread {self.name} starting")for i in range(5):time.sleep(self.delay)print(f"{self.name}: {i}")print(f"Thread {self.name} completed")# 创建并启动线程
threads = [MyThread("Alpha", 0.3),MyThread("Beta", 0.4),MyThread("Gamma", 0.5)
]for t in threads:t.start()for t in threads:t.join()print("All custom threads finished")
2.2 线程常用方法与属性
方法/属性 | 描述 | 示例 |
---|---|---|
start() | 启动线程 | t.start() |
run() | 线程执行的主体方法(可重写) | 自定义线程类时覆盖 |
join(timeout) | 等待线程终止 | t.join() |
is_alive() | 检查线程是否正在运行 | if t.is_alive(): ... |
name | 获取/设置线程名称 | t.name = "Worker-1" |
ident | 线程标识符(整数) | print(t.ident) |
daemon | 守护线程标志(主线程结束时自动终止) | t.daemon = True |
isDaemon() | 检查是否为守护线程 | t.isDaemon() |
setDaemon(bool) | 设置守护线程状态 | t.setDaemon(True) |
native_id | 内核级线程ID (Python 3.8+) | print(t.native_id) |
三、线程同步机制
3.1 锁(Lock)
import threadingcounter = 0
lock = threading.Lock()def increment():global counterfor _ in range(100000):with lock: # 自动获取和释放锁counter += 1threads = []
for i in range(5):t = threading.Thread(target=increment)threads.append(t)t.start()for t in threads:t.join()print(f"Final counter value: {counter} (expected: 500000)")
3.2 可重入锁(RLock)
rlock = threading.RLock()def recursive_func(n):with rlock:if n > 0:print(f"Level {n}")recursive_func(n-1)t1 = threading.Thread(target=recursive_func, args=(3,))
t2 = threading.Thread(target=recursive_func, args=(3,))t1.start()
t2.start()t1.join()
t2.join()
3.3 信号量(Semaphore)
# 限制同时访问资源的线程数
semaphore = threading.Semaphore(3) # 最多3个线程同时访问def access_resource(thread_id):with semaphore:print(f"Thread {thread_id} accessing resource")time.sleep(2)print(f"Thread {thread_id} releasing resource")threads = []
for i in range(10):t = threading.Thread(target=access_resource, args=(i,))threads.append(t)t.start()for t in threads:t.join()
3.4 事件(Event)
# 线程间通信机制
event = threading.Event()def waiter():print("Waiter: waiting for event...")event.wait() # 阻塞直到事件被设置print("Waiter: event received!")def setter():time.sleep(2)print("Setter: setting event")event.set() # 唤醒所有等待的线程t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)t1.start()
t2.start()t1.join()
t2.join()
3.5 条件变量(Condition)
# 生产者-消费者模式
condition = threading.Condition()
buffer = []
BUFFER_SIZE = 5def producer():global bufferfor i in range(10):with condition:# 检查缓冲区是否已满while len(buffer) >= BUFFER_SIZE:print("Buffer full, producer waiting")condition.wait()item = f"Item-{i}"buffer.append(item)print(f"Produced: {item}")# 通知消费者condition.notify_all()time.sleep(0.1)def consumer():global bufferfor _ in range(10):with condition:# 检查缓冲区是否为空while len(buffer) == 0:print("Buffer empty, consumer waiting")condition.wait()item = buffer.pop(0)print(f"Consumed: {item}")# 通知生产者condition.notify_all()time.sleep(0.2)producers = [threading.Thread(target=producer) for _ in range(2)]
consumers = [threading.Thread(target=consumer) for _ in range(3)]for t in producers + consumers:t.start()for t in producers + consumers:t.join()
3.6 屏障(Barrier)
# 同步多个线程的执行点
barrier = threading.Barrier(3)def worker(name):print(f"{name} working phase 1")time.sleep(random.uniform(0.5, 1.5))print(f"{name} reached barrier")barrier.wait() # 等待所有线程到达print(f"{name} working phase 2")time.sleep(random.uniform(0.5, 1.5))print(f"{name} completed")threads = [threading.Thread(target=worker, args=("Alice",)),threading.Thread(target=worker, args=("Bob",)),threading.Thread(target=worker, args=("Charlie",))
]for t in threads:t.start()for t in threads:t.join()
四、线程间通信
4.1 使用队列(Queue)
from queue import Queue
import random# 线程安全的队列
task_queue = Queue()
result_queue = Queue()def producer():for i in range(10):task = f"Task-{i}"task_queue.put(task)print(f"Produced: {task}")time.sleep(random.uniform(0.1, 0.3))task_queue.put(None) # 发送结束信号def consumer():while True:task = task_queue.get()if task is None: # 收到结束信号task_queue.put(None) # 传递给下一个消费者break# 处理任务time.sleep(random.uniform(0.2, 0.5))result = f"Result of {task}"result_queue.put(result)print(f"Consumed: {task} -> {result}")task_queue.task_done() # 标记任务完成# 创建生产者线程
prod_thread = threading.Thread(target=producer)# 创建消费者线程
cons_threads = [threading.Thread(target=consumer) for _ in range(3)]# 启动所有线程
prod_thread.start()
for t in cons_threads:t.start()# 等待生产者完成
prod_thread.join()# 等待所有任务完成
task_queue.join()# 处理结果
print("\nResults:")
while not result_queue.empty():print(result_queue.get())
4.2 线程局部数据
# 每个线程有独立的数据副本
thread_local = threading.local()def show_data():try:value = thread_local.valueexcept AttributeError:print("No value set for this thread")else:print(f"Thread value: {value}")def set_data(value):thread_local.value = valueshow_data()# 创建线程
threads = []
for i in range(3):t = threading.Thread(target=set_data, args=(i,))threads.append(t)t.start()for t in threads:t.join()
五、线程池与高级用法
5.1 使用ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import requestsdef download_url(url):print(f"Downloading {url}")response = requests.get(url, timeout=5)return {'url': url,'status': response.status_code,'length': len(response.text),'content': response.text[:100] # 取前100个字符}urls = ['https://www.python.org','https://www.google.com','https://www.github.com','https://www.wikipedia.org','https://www.stackoverflow.com'
]# 使用线程池管理
with ThreadPoolExecutor(max_workers=3) as executor:# 提交任务future_to_url = {executor.submit(download_url, url): url for url in urls}# 处理结果for future in as_completed(future_to_url):url = future_to_url[future]try:data = future.result()print(f"{url} downloaded: status={data['status']}, length={data['length']}")# print(f"Preview: {data['content']}")except Exception as e:print(f"{url} generated an exception: {e}")
5.2 定时器线程
def delayed_action(message):print(f"Delayed message: {message}")# 5秒后执行
timer = threading.Timer(5.0, delayed_action, args=("Hello after 5 seconds!",))
timer.start()print("Timer started, waiting...")
5.3 线程优先级队列
import queue# 创建优先级队列
prio_queue = queue.PriorityQueue()def worker():while True:priority, task = prio_queue.get()if task is None:breakprint(f"Processing task: {task} (priority: {priority})")time.sleep(0.5)prio_queue.task_done()# 启动工作线程
worker_thread = threading.Thread(target=worker)
worker_thread.start()# 添加任务(优先级,任务)
prio_queue.put((3, "Low priority task"))
prio_queue.put((1, "High priority task"))
prio_queue.put((2, "Medium priority task"))
prio_queue.put((1, "Another high priority task"))# 等待队列处理完成
prio_queue.join()# 发送停止信号
prio_queue.put((0, None))
worker_thread.join()
六、多线程编程最佳实践
6.1 避免常见陷阱
- 竞争条件:始终使用同步机制保护共享资源
- 死锁:
- 避免嵌套锁
- 按固定顺序获取锁
- 使用带超时的锁
- 线程饥饿:合理设置线程优先级
- 资源泄漏:确保释放所有资源(文件、网络连接等)
6.2 性能优化技巧
- 线程池:重用线程减少创建开销
- 批量处理:减少锁的获取/释放次数
- 无锁数据结构:如使用
queue.Queue
- 局部存储:减少共享状态
- 异步I/O:结合asyncio提高I/O密集型性能
6.3 调试与监控
import threading
import timedef worker():print(f"{threading.current_thread().name} starting")time.sleep(2)print(f"{threading.current_thread().name} exiting")# 列出所有活动线程
def monitor_threads():while True:print("\n=== Active Threads ===")for thread in threading.enumerate():print(f"{thread.name} (ID: {thread.ident}, Alive: {thread.is_alive()}")time.sleep(1)# 启动工作线程
threads = [threading.Thread(target=worker, name=f"Worker-{i}") for i in range(3)]
for t in threads:t.start()# 启动监控线程
monitor = threading.Thread(target=monitor_threads, daemon=True)
monitor.start()# 等待工作线程完成
for t in threads:t.join()print("All worker threads completed")
七、多线程应用场景
7.1 适合多线程的场景
-
I/O密集型任务:
- 网络请求(API调用、网页抓取)
- 文件读写(特别是SSD)
- 数据库操作
-
用户界面响应:
- 保持UI线程响应
- 后台处理任务
-
并行处理独立任务:
- 批量图片处理
- 数据预处理
- 日志处理
7.2 不适合多线程的场景
- CPU密集型任务:使用多进程代替
- 需要精确时序控制的任务
- 对数据一致性要求极高的场景
八、学习资源推荐
-
官方文档:
- threading — Thread-based parallelism
- queue — A synchronized queue class
-
经典书籍:
- 《Python并行编程手册》
- 《流畅的Python》第17章
-
实践项目:
- 多线程网页爬虫
- 并行文件处理器
- 实时数据仪表盘
-
调试工具:
threading.enumerate()
- 列出活动线程logging
模块 - 线程安全的日志记录- 可视化调试器(如PyCharm的线程视图)