跳至内容

协程与异步

Tornado 的核心优势在于基于事件循环的异步 I/O 模型。理解这一模型,需要从 Python 的迭代器、生成器出发,逐步建立协程与 async/await 的心智模型。本文从概念到实践,涵盖异步 HTTP 请求、并发调用等常见场景。

同步与异步

同步模型中,每个 I/O 操作(网络请求、数据库查询、文件读写)都阻塞当前线程,等待结果返回才能继续执行。在高并发场景下,大量线程处于等待状态,内存和切换开销极高。

异步模型中,遇到 I/O 操作时不阻塞,将当前任务挂起,事件循环转而处理其他就绪的任务;当 I/O 完成时再恢复执行。单线程即可处理大量并发连接,开销极低。

同步(每个请求独占线程):
  请求1 → [等待I/O] → 处理 → 响应
  请求2 →             等待线程空闲 → [等待I/O] → 处理 → 响应

异步(单线程事件循环):
  请求1 → [发起I/O] → 挂起
  请求2 → [发起I/O] → 挂起
  [I/O完成通知] → 恢复请求1 → 处理 → 响应
  [I/O完成通知] → 恢复请求2 → 处理 → 响应

迭代器与生成器(基础铺垫)

理解协程前,需先掌握迭代器和生成器。

迭代器

实现了 __iter__()__next__() 方法的对象即迭代器:

class Counter:
    def __init__(self, n):
        self.n = n
        self.i = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.i >= self.n:
            raise StopIteration
        self.i += 1
        return self.i

for x in Counter(3):
    print(x)   # 1 2 3

生成器

函数中包含 yield 语句时,调用该函数返回的是一个生成器对象,函数体在每次 next() 调用时运行到下一个 yield 处暂停:

def countdown(n):
    while n > 0:
        yield n
        n -= 1

gen = countdown(3)
print(next(gen))   # 3
print(next(gen))   # 2
print(next(gen))   # 1

yield 也可以通过 send() 传入值,实现双向通信:

def accumulator():
    total = 0
    while True:
        value = yield total   # 暂停,接收外部发来的值
        if value is None:
            break
        total += value

acc = accumulator()
next(acc)           # 初始化(运行到第一个 yield)
acc.send(10)        # → total = 10
acc.send(20)        # → total = 30

yield from

yield from 用于委派生成器,将子生成器的 yield 值透传给外层调用者,同时将外层 send 的值传入子生成器:

def inner():
    yield 1
    yield 2

def outer():
    yield from inner()
    yield 3

list(outer())   # [1, 2, 3]

这种委派机制是早期协程的基础。

Tornado 协程:async/await

Tornado 6.x 完全基于 Python 3 原生 async/await 语法,这是推荐的写法。

基本模式

import tornado.web
import tornado.httpclient

class AsyncHandler(tornado.web.RequestHandler):
    async def get(self):
        # await 挂起当前协程,事件循环继续处理其他请求
        client = tornado.httpclient.AsyncHTTPClient()
        response = await client.fetch("https://httpbin.org/get")
        self.write(response.body)

async def 定义一个协程函数,await 等待一个可等待对象(协程、FutureTask)完成。与普通同步函数的区别:await 期间不阻塞事件循环,其他请求可以继续被处理。

@gen.coroutine(旧式写法)

Tornado 5.x 及更早版本使用 @gen.coroutine 装饰器 + yield 实现协程,在维护旧代码时会遇到:

from tornado import gen

class OldStyleHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        client = tornado.httpclient.AsyncHTTPClient()
        response = yield client.fetch("https://httpbin.org/get")
        self.write(response.body)
新代码统一使用 async/await@gen.coroutineasync def 可以互相 await/yield,不影响混用。

异步 HTTP 请求

AsyncHTTPClient 是 Tornado 内置的异步 HTTP 客户端,非阻塞地发起外部请求:

import json
import tornado.httpclient

class WeatherHandler(tornado.web.RequestHandler):
    async def get(self):
        city = self.get_argument("city", "Beijing")
        client = tornado.httpclient.AsyncHTTPClient()

        # 构造请求(可自定义 headers、method、body、timeout 等)
        request = tornado.httpclient.HTTPRequest(
            url=f"https://api.example.com/weather?city={city}",
            method="GET",
            headers={"Authorization": "Bearer token123"},
            connect_timeout=5,
            request_timeout=10,
        )

        try:
            response = await client.fetch(request)
            data = json.loads(response.body)
            self.write(data)
        except tornado.httpclient.HTTPError as e:
            self.set_status(e.code)
            self.write({"error": str(e)})

常用 HTTPRequest 参数:

参数说明
url请求地址
methodHTTP 方法(默认 GET
headers请求头字典
body请求体(POST/PUT 时使用)
connect_timeout连接超时(秒,默认 20)
request_timeout请求总超时(秒,默认 20)
follow_redirects是否跟随重定向(默认 True
validate_cert是否验证 SSL 证书(默认 True

并行协程

多个异步任务相互独立时,并行执行而非顺序等待可大幅缩短总耗时。

使用 asyncio.gather

import asyncio
import tornado.httpclient

class ParallelHandler(tornado.web.RequestHandler):
    async def get(self):
        client = tornado.httpclient.AsyncHTTPClient()

        # 创建多个协程任务,并行发起请求
        results = await asyncio.gather(
            client.fetch("https://httpbin.org/get"),
            client.fetch("https://httpbin.org/ip"),
            client.fetch("https://httpbin.org/headers"),
        )

        self.write({
            "task1": results[0].body.decode(),
            "task2": results[1].body.decode(),
            "task3": results[2].body.decode(),
        })

使用 yield 列表(gen.coroutine 风格)

旧式协程中,yield 一个列表或字典即可并行等待:

from tornado import gen

class ParallelHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        client = tornado.httpclient.AsyncHTTPClient()

        # yield 列表:并行执行,返回结果列表
        r1, r2 = yield [
            client.fetch("https://httpbin.org/get"),
            client.fetch("https://httpbin.org/ip"),
        ]

        # yield 字典:并行执行,返回同名字典
        results = yield {
            "info": client.fetch("https://httpbin.org/get"),
            "ip":   client.fetch("https://httpbin.org/ip"),
        }
        self.write(results["ip"].body)

在 Tornado 中运行同步阻塞代码

若必须调用同步阻塞函数(如旧版 pymysql),可用 tornado.ioloop.IOLoop.current().run_in_executor() 在线程池中执行,避免阻塞事件循环:

import asyncio
import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

class DbHandler(tornado.web.RequestHandler):
    async def get(self):
        loop = asyncio.get_event_loop()
        # 在线程池中执行阻塞函数
        result = await loop.run_in_executor(executor, blocking_db_query, user_id)
        self.write({"result": result})
生产环境中,数据库操作应优先选择原生异步驱动:MySQL 使用 aiomysql,MongoDB 使用 motor,Redis 使用 aioredis。通过线程池运行同步 I/O 只应作为过渡方案。
最后更新于