跳至内容

异步编程与asyncio

asyncio 是 Python 标准库中用于编写并发 I/O 代码的核心模块,使用 async/await 语法。它适合网络请求、数据库查询、文件 I/O 等等待密集型任务,但不适合 CPU 密集型任务(后者应用 multiprocessing)。

基本概念

  • 协程(Coroutine):用 async def 定义的函数,调用后返回协程对象(不会立即执行)。
  • 任务(Task):将协程包装成并发执行的调度单元。
  • 事件循环(Event Loop):负责调度所有协程和任务。
  • await:挂起当前协程,把控制权交给事件循环;等待的对象必须是可等待对象(协程、Task、Future)。

快速上手

import asyncio

async def greet(name: str, delay: float) -> str:
    await asyncio.sleep(delay)   # 模拟 I/O 等待(不阻塞事件循环)
    return f"Hello, {name}!"

async def main():
    result = await greet("Alice", 1.0)
    print(result)   # Hello, Alice!

asyncio.run(main())   # 创建事件循环并运行,Python 3.7+

并发执行:asyncio.gather

gather 并发运行多个协程,所有协程完成后才返回(全部结果):

import asyncio
import time

async def fetch(name: str, delay: float) -> str:
    print(f"{name} 开始")
    await asyncio.sleep(delay)
    print(f"{name} 完成")
    return f"{name} 结果"

async def main():
    start = time.perf_counter()

    # 三个任务并发执行,总耗时约 max(1, 2, 1.5) = 2 秒
    results = await asyncio.gather(
        fetch("任务A", 1.0),
        fetch("任务B", 2.0),
        fetch("任务C", 1.5),
    )

    elapsed = time.perf_counter() - start
    print(results)      # ['任务A 结果', '任务B 结果', '任务C 结果']
    print(f"总耗时: {elapsed:.2f}s")  # ~2.00s

asyncio.run(main())

gather 中如果某个协程抛出异常,默认会取消其他协程并重新抛出该异常。设置 return_exceptions=True 可让异常作为返回值处理。

任务(asyncio.create_task)

create_task 立即将协程提交到事件循环调度,比 gather 更灵活:

import asyncio

async def background_job(n: int) -> int:
    await asyncio.sleep(n)
    return n * n

async def main():
    # 立即提交,不等待完成
    task1 = asyncio.create_task(background_job(2))
    task2 = asyncio.create_task(background_job(3))

    print("任务已提交,执行其他工作")
    await asyncio.sleep(0.1)   # 让出控制权给事件循环

    # 等待任务完成并获取结果
    result1 = await task1
    result2 = await task2
    print(result1, result2)   # 4 9

asyncio.run(main())

TaskGroup — 结构化并发(Python 3.11+)

asyncio.TaskGroup 提供更安全的并发模式:组内任意任务失败时,自动取消其他任务:

import asyncio

async def fetch_data(source: str) -> dict:
    await asyncio.sleep(0.5)
    if source == "bad":
        raise ValueError(f"源 {source} 不可用")
    return {"source": source, "data": "..."}

async def main():
    async with asyncio.TaskGroup() as tg:
        task_a = tg.create_task(fetch_data("api-1"))
        task_b = tg.create_task(fetch_data("api-2"))
        task_c = tg.create_task(fetch_data("api-3"))

    # 全部成功后才能获取结果
    print(task_a.result())
    print(task_b.result())
    print(task_c.result())

asyncio.run(main())

超时控制

import asyncio

async def slow_op() -> str:
    await asyncio.sleep(10)
    return "完成"

async def main():
    # asyncio.wait_for:超时则取消任务并抛出 TimeoutError
    try:
        result = await asyncio.wait_for(slow_op(), timeout=1.0)
    except asyncio.TimeoutError:
        print("操作超时!")

    # Python 3.11+ 推荐用 asyncio.timeout()
    try:
        async with asyncio.timeout(1.0):
            result = await slow_op()
    except TimeoutError:
        print("操作超时!")

asyncio.run(main())

异步上下文管理器与迭代器

import asyncio

class AsyncDB:
    async def __aenter__(self):
        print("连接数据库")
        return self

    async def __aexit__(self, *args):
        print("关闭连接")

    async def fetch(self, query: str):
        await asyncio.sleep(0.1)
        return [{"id": 1}]

async def main():
    async with AsyncDB() as db:
        rows = await db.fetch("SELECT * FROM users")
        print(rows)

# 异步迭代器
class AsyncStream:
    def __init__(self, items: list):
        self._items = iter(items)

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            await asyncio.sleep(0.01)
            return next(self._items)
        except StopIteration:
            raise StopAsyncIteration

async def consume():
    async for item in AsyncStream([1, 2, 3]):
        print(item)

asyncio.run(consume())

asyncio.wait — 细粒度控制

import asyncio

async def task(n: int) -> int:
    await asyncio.sleep(n)
    return n

async def main():
    tasks = {asyncio.create_task(task(i)) for i in [3, 1, 2]}

    # FIRST_COMPLETED:第一个完成时立即返回
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    for t in done:
        print(f"已完成: {t.result()}")   # 1

    for t in pending:
        t.cancel()   # 取消剩余任务

asyncio.run(main())

实战:并发 HTTP 请求

import asyncio
import aiohttp   # pip install aiohttp

async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as resp:
        return await resp.json()

async def main():
    urls = [
        "https://httpbin.org/get?n=1",
        "https://httpbin.org/get?n=2",
        "https://httpbin.org/get?n=3",
    ]

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch(session, url) for url in urls]
        )

    for r in results:
        print(r["args"])

asyncio.run(main())

asyncio vs threading vs multiprocessing

场景推荐方案
大量 I/O 等待(网络请求、数据库)asyncio
CPU 密集型(数值计算、压缩)multiprocessing
混合场景或遗留同步代码threading
异步中调用阻塞函数loop.run_in_executor()
import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io() -> str:
    import time
    time.sleep(1)      # 阻塞调用
    return "io done"

async def main():
    loop = asyncio.get_event_loop()
    # 在线程池中运行阻塞函数,不阻塞事件循环
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
    print(result)

asyncio.run(main())
最后更新于