异步编程与asyncio
asyncio 是 Python 标准库中用于编写并发 I/O 代码的核心模块,使用 async/await 语法。它适合网络请求、数据库查询、文件 I/O 等等待密集型任务,但不适合 CPU 密集型任务(后者应用 multiprocessing)。
基本概念
- 协程(Coroutine):用
async def定义的函数,调用后返回协程对象(不会立即执行)。 - 任务(Task):将协程包装成并发执行的调度单元。
- 事件循环(Event Loop):负责调度所有协程和任务。
await:挂起当前协程,把控制权交给事件循环;等待的对象必须是可等待对象(协程、Task、Future)。
快速上手
import asyncio
async def greet(name: str, delay: float) -> str:
await asyncio.sleep(delay) # 模拟 I/O 等待(不阻塞事件循环)
return f"Hello, {name}!"
async def main():
result = await greet("Alice", 1.0)
print(result) # Hello, Alice!
asyncio.run(main()) # 创建事件循环并运行,Python 3.7+并发执行:asyncio.gather
gather 并发运行多个协程,所有协程完成后才返回(全部结果):
import asyncio
import time
async def fetch(name: str, delay: float) -> str:
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 结果"
async def main():
start = time.perf_counter()
# 三个任务并发执行,总耗时约 max(1, 2, 1.5) = 2 秒
results = await asyncio.gather(
fetch("任务A", 1.0),
fetch("任务B", 2.0),
fetch("任务C", 1.5),
)
elapsed = time.perf_counter() - start
print(results) # ['任务A 结果', '任务B 结果', '任务C 结果']
print(f"总耗时: {elapsed:.2f}s") # ~2.00s
asyncio.run(main())gather 中如果某个协程抛出异常,默认会取消其他协程并重新抛出该异常。设置 return_exceptions=True 可让异常作为返回值处理。
任务(asyncio.create_task)
create_task 立即将协程提交到事件循环调度,比 gather 更灵活:
import asyncio
async def background_job(n: int) -> int:
await asyncio.sleep(n)
return n * n
async def main():
# 立即提交,不等待完成
task1 = asyncio.create_task(background_job(2))
task2 = asyncio.create_task(background_job(3))
print("任务已提交,执行其他工作")
await asyncio.sleep(0.1) # 让出控制权给事件循环
# 等待任务完成并获取结果
result1 = await task1
result2 = await task2
print(result1, result2) # 4 9
asyncio.run(main())TaskGroup — 结构化并发(Python 3.11+)
asyncio.TaskGroup 提供更安全的并发模式:组内任意任务失败时,自动取消其他任务:
import asyncio
async def fetch_data(source: str) -> dict:
await asyncio.sleep(0.5)
if source == "bad":
raise ValueError(f"源 {source} 不可用")
return {"source": source, "data": "..."}
async def main():
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(fetch_data("api-1"))
task_b = tg.create_task(fetch_data("api-2"))
task_c = tg.create_task(fetch_data("api-3"))
# 全部成功后才能获取结果
print(task_a.result())
print(task_b.result())
print(task_c.result())
asyncio.run(main())超时控制
import asyncio
async def slow_op() -> str:
await asyncio.sleep(10)
return "完成"
async def main():
# asyncio.wait_for:超时则取消任务并抛出 TimeoutError
try:
result = await asyncio.wait_for(slow_op(), timeout=1.0)
except asyncio.TimeoutError:
print("操作超时!")
# Python 3.11+ 推荐用 asyncio.timeout()
try:
async with asyncio.timeout(1.0):
result = await slow_op()
except TimeoutError:
print("操作超时!")
asyncio.run(main())异步上下文管理器与迭代器
import asyncio
class AsyncDB:
async def __aenter__(self):
print("连接数据库")
return self
async def __aexit__(self, *args):
print("关闭连接")
async def fetch(self, query: str):
await asyncio.sleep(0.1)
return [{"id": 1}]
async def main():
async with AsyncDB() as db:
rows = await db.fetch("SELECT * FROM users")
print(rows)
# 异步迭代器
class AsyncStream:
def __init__(self, items: list):
self._items = iter(items)
def __aiter__(self):
return self
async def __anext__(self):
try:
await asyncio.sleep(0.01)
return next(self._items)
except StopIteration:
raise StopAsyncIteration
async def consume():
async for item in AsyncStream([1, 2, 3]):
print(item)
asyncio.run(consume())asyncio.wait — 细粒度控制
import asyncio
async def task(n: int) -> int:
await asyncio.sleep(n)
return n
async def main():
tasks = {asyncio.create_task(task(i)) for i in [3, 1, 2]}
# FIRST_COMPLETED:第一个完成时立即返回
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in done:
print(f"已完成: {t.result()}") # 1
for t in pending:
t.cancel() # 取消剩余任务
asyncio.run(main())实战:并发 HTTP 请求
import asyncio
import aiohttp # pip install aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as resp:
return await resp.json()
async def main():
urls = [
"https://httpbin.org/get?n=1",
"https://httpbin.org/get?n=2",
"https://httpbin.org/get?n=3",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch(session, url) for url in urls]
)
for r in results:
print(r["args"])
asyncio.run(main())asyncio vs threading vs multiprocessing
| 场景 | 推荐方案 |
|---|---|
| 大量 I/O 等待(网络请求、数据库) | asyncio |
| CPU 密集型(数值计算、压缩) | multiprocessing |
| 混合场景或遗留同步代码 | threading |
| 异步中调用阻塞函数 | loop.run_in_executor() |
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io() -> str:
import time
time.sleep(1) # 阻塞调用
return "io done"
async def main():
loop = asyncio.get_event_loop()
# 在线程池中运行阻塞函数,不阻塞事件循环
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(result)
asyncio.run(main())最后更新于