协程与异步
Tornado 的核心优势在于基于事件循环的异步 I/O 模型。理解这一模型,需要从 Python 的迭代器、生成器出发,逐步建立协程与 async/await 的心智模型。本文从概念到实践,涵盖异步 HTTP 请求、并发调用等常见场景。
同步与异步
同步模型中,每个 I/O 操作(网络请求、数据库查询、文件读写)都阻塞当前线程,等待结果返回才能继续执行。在高并发场景下,大量线程处于等待状态,内存和切换开销极高。
异步模型中,遇到 I/O 操作时不阻塞,将当前任务挂起,事件循环转而处理其他就绪的任务;当 I/O 完成时再恢复执行。单线程即可处理大量并发连接,开销极低。
同步(每个请求独占线程):
请求1 → [等待I/O] → 处理 → 响应
请求2 → 等待线程空闲 → [等待I/O] → 处理 → 响应
异步(单线程事件循环):
请求1 → [发起I/O] → 挂起
请求2 → [发起I/O] → 挂起
[I/O完成通知] → 恢复请求1 → 处理 → 响应
[I/O完成通知] → 恢复请求2 → 处理 → 响应迭代器与生成器(基础铺垫)
理解协程前,需先掌握迭代器和生成器。
迭代器
实现了 __iter__() 和 __next__() 方法的对象即迭代器:
class Counter:
def __init__(self, n):
self.n = n
self.i = 0
def __iter__(self):
return self
def __next__(self):
if self.i >= self.n:
raise StopIteration
self.i += 1
return self.i
for x in Counter(3):
print(x) # 1 2 3生成器
函数中包含 yield 语句时,调用该函数返回的是一个生成器对象,函数体在每次 next() 调用时运行到下一个 yield 处暂停:
def countdown(n):
while n > 0:
yield n
n -= 1
gen = countdown(3)
print(next(gen)) # 3
print(next(gen)) # 2
print(next(gen)) # 1yield 也可以通过 send() 传入值,实现双向通信:
def accumulator():
total = 0
while True:
value = yield total # 暂停,接收外部发来的值
if value is None:
break
total += value
acc = accumulator()
next(acc) # 初始化(运行到第一个 yield)
acc.send(10) # → total = 10
acc.send(20) # → total = 30yield from
yield from 用于委派生成器,将子生成器的 yield 值透传给外层调用者,同时将外层 send 的值传入子生成器:
def inner():
yield 1
yield 2
def outer():
yield from inner()
yield 3
list(outer()) # [1, 2, 3]这种委派机制是早期协程的基础。
Tornado 协程:async/await
Tornado 6.x 完全基于 Python 3 原生 async/await 语法,这是推荐的写法。
基本模式
import tornado.web
import tornado.httpclient
class AsyncHandler(tornado.web.RequestHandler):
async def get(self):
# await 挂起当前协程,事件循环继续处理其他请求
client = tornado.httpclient.AsyncHTTPClient()
response = await client.fetch("https://httpbin.org/get")
self.write(response.body)async def 定义一个协程函数,await 等待一个可等待对象(协程、Future、Task)完成。与普通同步函数的区别:await 期间不阻塞事件循环,其他请求可以继续被处理。
@gen.coroutine(旧式写法)
Tornado 5.x 及更早版本使用 @gen.coroutine 装饰器 + yield 实现协程,在维护旧代码时会遇到:
from tornado import gen
class OldStyleHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
client = tornado.httpclient.AsyncHTTPClient()
response = yield client.fetch("https://httpbin.org/get")
self.write(response.body)async/await。@gen.coroutine 与 async def 可以互相 await/yield,不影响混用。异步 HTTP 请求
AsyncHTTPClient 是 Tornado 内置的异步 HTTP 客户端,非阻塞地发起外部请求:
import json
import tornado.httpclient
class WeatherHandler(tornado.web.RequestHandler):
async def get(self):
city = self.get_argument("city", "Beijing")
client = tornado.httpclient.AsyncHTTPClient()
# 构造请求(可自定义 headers、method、body、timeout 等)
request = tornado.httpclient.HTTPRequest(
url=f"https://api.example.com/weather?city={city}",
method="GET",
headers={"Authorization": "Bearer token123"},
connect_timeout=5,
request_timeout=10,
)
try:
response = await client.fetch(request)
data = json.loads(response.body)
self.write(data)
except tornado.httpclient.HTTPError as e:
self.set_status(e.code)
self.write({"error": str(e)})常用 HTTPRequest 参数:
| 参数 | 说明 |
|---|---|
url | 请求地址 |
method | HTTP 方法(默认 GET) |
headers | 请求头字典 |
body | 请求体(POST/PUT 时使用) |
connect_timeout | 连接超时(秒,默认 20) |
request_timeout | 请求总超时(秒,默认 20) |
follow_redirects | 是否跟随重定向(默认 True) |
validate_cert | 是否验证 SSL 证书(默认 True) |
并行协程
多个异步任务相互独立时,并行执行而非顺序等待可大幅缩短总耗时。
使用 asyncio.gather
import asyncio
import tornado.httpclient
class ParallelHandler(tornado.web.RequestHandler):
async def get(self):
client = tornado.httpclient.AsyncHTTPClient()
# 创建多个协程任务,并行发起请求
results = await asyncio.gather(
client.fetch("https://httpbin.org/get"),
client.fetch("https://httpbin.org/ip"),
client.fetch("https://httpbin.org/headers"),
)
self.write({
"task1": results[0].body.decode(),
"task2": results[1].body.decode(),
"task3": results[2].body.decode(),
})使用 yield 列表(gen.coroutine 风格)
旧式协程中,yield 一个列表或字典即可并行等待:
from tornado import gen
class ParallelHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
client = tornado.httpclient.AsyncHTTPClient()
# yield 列表:并行执行,返回结果列表
r1, r2 = yield [
client.fetch("https://httpbin.org/get"),
client.fetch("https://httpbin.org/ip"),
]
# yield 字典:并行执行,返回同名字典
results = yield {
"info": client.fetch("https://httpbin.org/get"),
"ip": client.fetch("https://httpbin.org/ip"),
}
self.write(results["ip"].body)在 Tornado 中运行同步阻塞代码
若必须调用同步阻塞函数(如旧版 pymysql),可用 tornado.ioloop.IOLoop.current().run_in_executor() 在线程池中执行,避免阻塞事件循环:
import asyncio
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
class DbHandler(tornado.web.RequestHandler):
async def get(self):
loop = asyncio.get_event_loop()
# 在线程池中执行阻塞函数
result = await loop.run_in_executor(executor, blocking_db_query, user_id)
self.write({"result": result})aiomysql,MongoDB 使用 motor,Redis 使用 aioredis。通过线程池运行同步 I/O 只应作为过渡方案。