当前位置: 首页 > news >正文

深入理解Python协程:asyncio、异步并发、事件循环

概念解析

异步编程是一种允许程序在等待 I/O 操作如网络请求、文件读写)时不被阻塞,转而执行其他任务的编程范式。

在 Python 中,其核心实现概念如下:

协程(Coroutines)

  • 定义:可暂停执行并在合适时机恢复的函数,通过async def关键字声明。
  • 特点:非抢占式调度,协程主动通过await让出执行权,适用于处理高并发 I/O 场景。

事件循环(Event Loop)

  • 定义:异步编程的 “调度器”,负责管理协程的执行顺序、监控 I/O 事件状态,并在事件就绪时恢复对应协程。
  • 核心逻辑:循环检查可执行的协程任务,按事件触发顺序调度执行。

任务(Tasks)

  • 定义:对协程的封装,代表一个独立的异步操作单元,通过asyncio.create_task()创建。
  • 功能:支持取消任务、等待任务完成(await task)或获取任务状态。

Future 对象

  • 定义:表示一个尚未完成的异步操作结果,本质是协程间传递状态的载体。
  • 作用:当协程需要等待某个异步操作的结果时,可通过await Future暂停执行,待操作完成后获取结果。

await 表达式

  • 定义:协程中用于暂停执行的关键字,用于等待另一个协程、Task 或 Future 对象的完成。
  • 示例:result = await async_function(),表示暂停当前协程,直到async_function执行完毕并返回结果。

应用场景

异步编程尤其适用于I/O 密集型任务如 HTTP 请求、数据库查询、文件操作等)。
通过减少线程切换开销和 CPU 闲置时间,显著提升程序的并发处理能力。
相比多线程编程,异步编程在高并发场景下更轻量、更高效。


asyncio库详解

Python 3.4 引入了 asyncio 库,作为异步编程的核心组件,事件循环是 asyncio 的核心。
Windows 上使用 ProactorEventLoop,在 Unix 上使用 SelectorEventLoop

Unix系统(SelectorEventLoop)

  • 基于selectors 模块对底层 I/O 多路复用机制(如select、poll、epoll、kqueue)的抽象。
  • 采用 “就绪通知” 机制:监视文件描述符(如套接字)状态,当 I/O 操作就绪时(如可读 / 可写)通知应用程序。
  • 优势场景1:擅长处理大量并发连接(如 Linuxepoll机制支持高效事件驱动)。
  • 优势场景2:适用于网络服务器、高并发 I/O 场景。

Windows系统(ProactorEventLoop)

  • 基于 Windows 专有 I/O 完成端口(IOCP)机制,属于系统级异步 I/O 框架。
  • 采用 “完成通知” 机制:异步启动 I/O 操作,操作完成后由系统主动通知程序。
  • 优势场景1:深度优化 Windows 平台特性,支持全类型异步 I/O 操作(含文件 I/O)。
  • 优势场景1:在 Windows 环境下性能通常优于 SelectorEventLoop

两种事件循环的关键差异

维度SelectorEventLoop(Unix)ProactorEventLoop(Windows)
通知机制等待 I/O 就绪后执行操作(“询问式”:Can I read/write?异步启动 I/O 操作,完成后被动接收通知(“回调式”:Notify when done
API 覆盖范围部分平台可能不支持全类型文件 I/O 操作原生支持 Windows 所有异步 I/O 操作(如管道、套接字、文件)
性能特点Unix 系统下,依赖epoll/kqueue等机制实现高效并发Windows 下利用 IOCP 机制实现低延迟、高吞吐量

事件循环管理

import asyncio# 获取事件循环
loop = asyncio.get_event_loop()# 运行协程直到完成
loop.run_until_complete(my_coroutine())# 运行事件循环直到stop()被调用
loop.run_forever()# 关闭事件循环
loop.close()

协程定义与执行

async def fetch_data(url):print(f"开始获取数据: {url}")await asyncio.sleep(2)  # 模拟I/O操作print(f"数据获取完成: {url}")return f"来自 {url} 的数据"# Python 3.7+ 推荐方式
async def main():result = await fetch_data("example.com")print(result)asyncio.run(main())  # Python 3.7+引入,简化了事件循环管理

任务创建与管理

async def main():# 创建任务task1 = asyncio.create_task(fetch_data("site1.com"))task2 = asyncio.create_task(fetch_data("site2.com"))# 等待所有任务完成results = await asyncio.gather(task1, task2)print(results)# 并发运行多个协程results = await asyncio.gather(fetch_data("site3.com"),fetch_data("site4.com"))

超时管理

async def main():try:# 设置超时result = await asyncio.wait_for(fetch_data("example.com"), timeout=1.0)except asyncio.TimeoutError:print("操作超时")

同步与异步代码结合

import concurrent.futuresdef cpu_bound_task(x):# 计算密集型任务return x * xasync def main():# 使用线程池执行阻塞I/Oloop = asyncio.get_running_loop()with concurrent.futures.ThreadPoolExecutor() as pool:result = await loop.run_in_executor(pool, cpu_bound_task, 10)print(result)

高并发场景实战案例

案例1: 并发网络请求

import asyncio
import aiohttp
import timeasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def fetch_all(urls):async with aiohttp.ClientSession() as session:tasks = [fetch(session, url) for url in urls]results = await asyncio.gather(*tasks)return results# 测试URLs
urls = ["https://www.google.com","https://www.github.com","https://www.python.org",# 可添加更多URL
] * 5  # 重复请求以增加数量async def main():start = time.time()results = await fetch_all(urls)end = time.time()print(f"获取了 {len(results)} 个页面,耗时: {end - start:.2f} 秒")# 运行
asyncio.run(main())

案例2: 异步数据库操作

使用asyncpg进行PostgreSQL异步操作

import asyncio
import asyncpgasync def create_tables(conn):await conn.execute('''CREATE TABLE IF NOT EXISTS users(id SERIAL PRIMARY KEY,name TEXT,email TEXT)''')async def insert_users(conn, users):# 批量插入await conn.executemany('INSERT INTO users(name, email) VALUES($1, $2)',users)async def fetch_users(conn):return await conn.fetch('SELECT * FROM users')async def main():# 连接数据库conn = await asyncpg.connect(user='postgres',password='password',database='testdb',host='127.0.0.1')# 创建表await create_tables(conn)# 生成测试数据test_users = [('User1', 'user1@example.com'),('User2', 'user2@example.com'),('User3', 'user3@example.com'),]# 插入数据await insert_users(conn, test_users)# 查询数据users = await fetch_users(conn)for user in users:print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")# 关闭连接await conn.close()# 运行
asyncio.run(main())

案例3: 异步Web爬虫

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import timeasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def parse(html):# 使用BeautifulSoup解析HTMLsoup = BeautifulSoup(html, 'html.parser')# 获取所有链接links = [a.get('href') for a in soup.find_all('a') if a.get('href')]return linksasync def crawl(url, max_depth=2):visited = set()async def _crawl(current_url, depth):if depth > max_depth or current_url in visited:returnvisited.add(current_url)print(f"正在爬取: {current_url}")try:async with aiohttp.ClientSession() as session:html = await fetch(session, current_url)links = await parse(html)# 过滤出同域名链接base_url = '/'.join(current_url.split('/')[:3])same_domain_links = [link if link.startswith('http') else f"{base_url}{link}"for link in links if link and (link.startswith('http') or link.startswith('/'))]# 创建子任务继续爬取tasks = [_crawl(link, depth + 1) for link in same_domain_links[:5]  # 限制每页最多爬5个链接]await asyncio.gather(*tasks)except Exception as e:print(f"爬取 {current_url} 出错: {e}")await _crawl(url, 0)return visitedasync def main():start = time.time()visited = await crawl("https://python.org", max_depth=1)end = time.time()print(f"爬取了 {len(visited)} 个页面,耗时: {end - start:.2f} 秒")# 运行
asyncio.run(main())

案例4: 异步API服务器处理大量并发请求

使用FastAPI构建高并发API服务

from fastapi import FastAPI, BackgroundTasks
import asyncio
import uvicorn
import time
import randomapp = FastAPI()# 模拟数据库
db = {}# 模拟异步数据库操作
async def db_operation(key, delay=None):if delay is None:delay = random.uniform(0.1, 0.5)  # 随机延迟模拟真实场景await asyncio.sleep(delay)return db.get(key)# 模拟耗时任务
async def process_item(item_id):print(f"开始处理项目 {item_id}")await asyncio.sleep(5)  # 模拟耗时操作print(f"项目 {item_id} 处理完成")return {"item_id": item_id, "status": "processed"}# 常规端点
@app.get("/items/{item_id}")
async def read_item(item_id: str):result = await db_operation(item_id)return {"item_id": item_id, "value": result}# 批量操作端点
@app.get("/batch")
async def batch_operation(items: str):item_ids = items.split(",")tasks = [db_operation(item_id) for item_id in item_ids]results = await asyncio.gather(*tasks)return dict(zip(item_ids, results))# 后台任务
@app.post("/items/{item_id}/process")
async def process(item_id: str, background_tasks: BackgroundTasks):background_tasks.add_task(process_item, item_id)return {"message": f"Processing item {item_id} in the background"}# 负载测试端点
@app.get("/load-test/{count}")
async def load_test(count: int):start = time.time()# 创建多个并发任务tasks = []for i in range(count):# 随机延迟delay = random.uniform(0.1, 0.5)tasks.append(asyncio.sleep(delay))# 并发执行所有任务await asyncio.gather(*tasks)end = time.time()return {"tasks_completed": count,"time_taken": f"{end - start:.2f} 秒","tasks_per_second": f"{count / (end - start):.2f}"}# 初始化一些测试数据
@app.on_event("startup")
async def startup_event():for i in range(1000):db[str(i)] = f"value-{i}"if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)
http://www.xdnf.cn/news/1017217.html

相关文章:

  • 开疆智能ModbusTCP转Devicenet网关连接三菱PLC与ABB机器人配置案例
  • NAS 年中成果汇报:从入门到高阶的影视/音乐/小说/资源下载 等好玩Docker 全集合
  • Python让自动驾驶“看见未来”:环境建模那些事儿
  • AWS知识点和技术面试模拟题
  • 基于python大数据的nba球员可视化分析系统
  • 大模型驱动数据分析革新:美林数据智能问数解决方案破局传统 BI 痛点
  • CSS基础学习1
  • Python 数据分析10
  • 【Three.js】初识 Three.js
  • 【论文阅读33】滑坡易发性 PINN ( EG2025 )
  • 基于 SpaCy DependencyMatcher 编写复杂依存关系规则实战指南
  • java 将多张图片合成gif动态图
  • 国产数据库StarRocks在数栈轻量化数据开发的全流程实践
  • 普通人怎样用好Deepseek?
  • MySQL 8.0 OCP 英文题库解析(十九)
  • 26-数据结构-线性表2
  • linux alignment fault对齐造成设备挂死问题定位梳理
  • Leetcode 2604. 吃掉所有谷子的最短时间
  • 线性回归原理推导与应用(九):逻辑回归多分类问题的原理与推导
  • 用户通知服务,轻松实现应用与用户的多场景交互
  • 嵌套滚动交互处理总结
  • FastChat 架构拆解:打造类 ChatGPT 私有化部署解决方案的基石
  • python实现鸟类识别系统实现方案
  • Java实现Pdf转Word
  • 打破语言壁垒!DHTMLX Gantt 与 Scheduler 文档正式上线中文等多语言版本!
  • 使用 PolarProxy+Proxifier 解密 TLS 流量
  • 北京大学肖臻老师《区块链技术与应用》公开课:08-BTC-比特币挖矿
  • MySQL索引原理
  • KDJ指标的运用
  • 商家如何利用Shopify插件进行AB测试和优化