当前位置: 首页 > backend >正文

Python多线程、锁、多进程、异步编程

多线程

import threading
import timedef task(name, delay):print(f"线程 {name} 开始执行")time.sleep(delay)print(f"线程 {name} 执行完成")# 创建线程
thread1 = threading.Thread(target=task, args=("A", 2))
thread2 = threading.Thread(target=task, args=("B", 1))# 启动线程
thread1.start()
thread2.start()# 等待线程完成
thread1.join()
thread2.join()print("所有线程执行完毕")
#线程池
from concurrent.futures import ThreadPoolExecutor
import timedef task(name):print(f"任务 {name} 开始")time.sleep(1)return f"任务 {name} 完成"with ThreadPoolExecutor(max_workers=3) as executor:# 提交多个任务futures = [executor.submit(task, i) for i in range(5)]# 获取结果for future in futures:print(future.result())

锁机制

import threadingcounter = 0
lock = threading.Lock()def increment():global counterfor _ in range(100000):with lock:  # 自动获取和释放锁counter += 1threads = []
for _ in range(5):t = threading.Thread(target=increment)threads.append(t)t.start()for t in threads:t.join()print(f"最终计数器值: {counter}")  # 应该是500000

生产消费者模型

import threading
import queue
import random
import time# 共享队列
q = queue.Queue(maxsize=5)
condition = threading.Condition()def producer():while True:with condition:if q.full():print("队列已满,生产者等待")condition.wait()item = random.randint(1, 100)q.put(item)print(f"生产者生产了: {item}")condition.notify()time.sleep(random.random())def consumer():while True:with condition:if q.empty():print("队列为空,消费者等待")condition.wait()item = q.get()print(f"消费者消费了: {item}")condition.notify()time.sleep(random.random())# 启动生产者和消费者
prod = threading.Thread(target=producer)
cons = threading.Thread(target=consumer)prod.start()
cons.start()

多进程编程

from multiprocessing import Process
import osdef task(name):print(f"子进程 {name} (PID: {os.getpid()}) 执行")time.sleep(2)if __name__ == '__main__':processes = []for i in range(3):p = Process(target=task, args=(i,))processes.append(p)p.start()for p in processes:p.join()print("所有子进程执行完毕")
#进程池
from multiprocessing import Pool
import timedef square(x):print(f"计算 {x} 的平方")time.sleep(1)return x * xif __name__ == '__main__':with Pool(processes=4) as pool:# 同步执行result = pool.map(square, range(10))print(result)# 异步执行async_result = pool.map_async(square, range(10))print(async_result.get())
#进程间通信
from multiprocessing import Process, Pipedef sender(conn):conn.send("Hello from sender")conn.close()def receiver(conn):msg = conn.recv()print(f"接收到的消息: {msg}")conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p1 = Process(target=sender, args=(child_conn,))p2 = Process(target=receiver, args=(parent_conn,))p1.start()p2.start()p1.join()p2.join()

异步

协程(Coroutine): 异步执行的任务单元

事件循环(Event Loop): 调度和执行协程的核心

Future: 表示异步操作的最终结果

Task: Future的子类,用于包装和管理协程的执行

import asyncioasync def main(): #携程函数定义print('Hello')await asyncio.sleep(1) #异步等待print('World')asyncio.run(main())

创建和运行协程

async def say_after(delay, message):await asyncio.sleep(delay)print(message)async def main():# 顺序执行await say_after(1, 'hello')await say_after(2, 'world')# 并发执行task1 = asyncio.create_task(say_after(1, 'hello'))task2 = asyncio.create_task(say_after(2, 'world'))await task1await task2asyncio.run(main())

并发运行多个协程

async def fetch_data(task_id, delay):print(f'Task {task_id} started')await asyncio.sleep(delay)print(f'Task {task_id} completed')return f'result-{task_id}'async def main():# 使用gather并发运行results = await asyncio.gather(fetch_data(1, 2),fetch_data(2, 1),fetch_data(3, 3))print(f'All results: {results}')asyncio.run(main())

异步上下文管理器

class AsyncResource:async def __aenter__(self):print('Acquiring resource')await asyncio.sleep(0.5)return selfasync def __aexit__(self, exc_type, exc, tb):print('Releasing resource')await asyncio.sleep(0.5)async def use_resource():async with AsyncResource() as resource:print('Using resource')await asyncio.sleep(1)asyncio.run(use_resource())

异步迭代器

class AsyncCounter:def __init__(self, stop):self.current = 0self.stop = stopdef __aiter__(self):return selfasync def __anext__(self):if self.current >= self.stop:raise StopAsyncIterationawait asyncio.sleep(0.5)self.current += 1return self.currentasync def main():async for number in AsyncCounter(5):print(number)asyncio.run(main())

异步Http请求

import aiohttpasync def fetch_url(session, url):async with session.get(url) as response:return await response.text()async def main():urls = ['https://www.python.org','https://www.google.com','https://www.github.com']async with aiohttp.ClientSession() as session:tasks = [fetch_url(session, url) for url in urls]results = await asyncio.gather(*tasks)for url, content in zip(urls, results):print(f"{url} -> {len(content)} bytes")asyncio.run(main())

异步数据库访问

import asyncpgasync def fetch_data():conn = await asyncpg.connect(user='user', password='pass',database='db', host='localhost')try:# 执行查询result = await conn.fetch('SELECT * FROM users WHERE id = $1', 1)print(result)finally:await conn.close()asyncio.run(fetch_data())
http://www.xdnf.cn/news/18020.html

相关文章:

  • 自动驾驶中的传感器技术34——Lidar(9)
  • Python训练营打卡Day35-复习日
  • 2025年5月架构设计师综合知识真题回顾,附参考答案、解析及所涉知识点(五)
  • Pandas 和 NumPy的区别和联系
  • 安卓开发中遇到Medium Phone API 36.0 is already running as process XXX.
  • RK3568平台开发系列讲解:PCIE trainning失败怎么办
  • 计算机网络 OSI 七层模型和 TCP 五层模型
  • day43_2025-08-17
  • git stash临时保存工作区
  • Talk2BEV论文速读
  • Next.js跟React关系(Next.js是基于React库的全栈框架)(文件系统路由、服务端渲染SSR、静态生成SSG、增量静态再生ISR、API路由)
  • 【Python】-- 机器学习项目 - 基于KNN算法的鸢尾花分类
  • 基于飞算JavaAI实现布隆过滤器防止缓存穿透:原理、实践与全流程解析
  • HTTP0.9/1.0/1.1/2.0
  • 免费照片压缩网站
  • Android原生(Kotlin)与Flutter混合开发 - 设备控制与状态同步解决方案
  • Visual Studio Code 基础设置指南
  • C++ 特殊类设计与单例模式解析
  • 云计算-K8s 实战:Pod、安全上下文、HPA 、CRD、网络策略、亲和性等功能配置实操指南
  • 天地图开发的优点
  • Leaflet赋能:WebGIS视角下的省域区县天气可视化实战攻略
  • PostgreSQL——用户管理
  • Dify 从入门到精通(第 38/100 篇):Dify 的实时协作功能
  • PIDGen!DecodeProdKey函数分析之四个断点
  • 优雅草星云物联网项目私有化定制技术解析:RS485接口与工业通讯协议-优雅草卓伊凡
  • 原码表示法、反码表示法、移码表示法、补码表示法
  • C语言基础:(十五)深入理解指针(5)
  • 牛 CDR3 单抗:抗病毒领域的 “纳米级精准导弹”
  • 类与类加载器
  • 8.16打卡 DAY43 复习日