线程
线程是操作系统调度的最小单位,同一进程内的多个线程共享该进程的内存空间。Python 通过 threading 模块提供线程支持。
进程与线程的关系
- 进程:资源分配的基本单位,拥有独立的内存空间。
- 线程:执行/调度的基本单位,比进程更轻量,创建和切换开销更小。
- 一个进程至少有一个主线程,可以创建多个子线程;同一进程内的所有线程共享内存,通信方便但需要同步控制。
Python 的 GIL
CPython 解释器有一把全局解释器锁(GIL,Global Interpreter Lock),同一时刻只允许一个线程执行 Python 字节码。这意味着:
- 多线程无法利用多核 CPU 实现真正的并行计算(CPU 密集型任务)。
- 但多线程可以实现并发:一个线程 I/O 等待时,其他线程可以运行(I/O 密集型任务)。
- CPU 密集型任务应改用
multiprocessing(多进程)或concurrent.futures.ProcessPoolExecutor。
Python 3.13 引入了实验性的无 GIL 模式(
--disable-gil),未来有望实现真正的多核并行。创建线程
import threading
import time
def worker(name: str, delay: float) -> None:
print(f"[{name}] 开始")
time.sleep(delay)
print(f"[{name}] 完成")
# 方式一:Thread 对象
t1 = threading.Thread(target=worker, args=("线程A", 1))
t2 = threading.Thread(target=worker, args=("线程B", 2))
t1.start()
t2.start()
# join() 等待线程结束
t1.join()
t2.join()
print("所有线程完成")继承 Thread 类
import threading
class DownloadTask(threading.Thread):
def __init__(self, url: str):
super().__init__()
self.url = url
def run(self) -> None:
"""线程启动后执行此方法。"""
import time
print(f"下载: {self.url}")
time.sleep(1)
print(f"完成: {self.url}")
tasks = [
DownloadTask("https://example.com/file1.zip"),
DownloadTask("https://example.com/file2.zip"),
]
for t in tasks:
t.start()
for t in tasks:
t.join()daemon 守护线程
守护线程会在主线程结束时自动退出,不会阻止程序退出。适合后台任务(日志上报、心跳检测等):
import threading
import time
def heartbeat() -> None:
while True:
print("♥ 心跳")
time.sleep(2)
# 设置为守护线程,主线程结束后自动终止
t = threading.Thread(target=heartbeat, daemon=True)
t.start()
time.sleep(5)
print("主线程结束,守护线程随之退出")线程安全:Lock
多个线程修改共享变量时,需要使用锁(Lock)来防止竞态条件:
import threading
counter = 0
lock = threading.Lock()
def increment(n: int) -> None:
global counter
for _ in range(n):
with lock: # 等价于 lock.acquire() ... lock.release()
counter += 1
threads = [threading.Thread(target=increment, args=(10000,)) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终计数: {counter}") # 50000(正确结果)不加锁时,counter 结果可能小于 50000(丢失更新)。
RLock — 可重入锁
同一线程可以多次获取 RLock,避免死锁:
import threading
rlock = threading.RLock()
def outer():
with rlock:
print("外层锁定")
inner() # 同线程再次获取锁,RLock 允许,Lock 会死锁
def inner():
with rlock:
print("内层锁定")
t = threading.Thread(target=outer)
t.start()
t.join()Semaphore — 信号量
控制同时访问某资源的最大线程数(如连接池、并发请求限制):
import threading
import time
# 最多允许 3 个线程同时进入
sem = threading.Semaphore(3)
def access_resource(name: str) -> None:
with sem:
print(f"{name} 正在使用资源")
time.sleep(1)
print(f"{name} 释放资源")
threads = [threading.Thread(target=access_resource, args=(f"线程{i}",)) for i in range(8)]
for t in threads:
t.start()
for t in threads:
t.join()Event — 线程间信号
import threading
import time
event = threading.Event()
def producer() -> None:
print("生产者: 准备数据...")
time.sleep(2)
event.set() # 发送信号
print("生产者: 数据就绪")
def consumer() -> None:
print("消费者: 等待数据...")
event.wait() # 阻塞直到收到信号
print("消费者: 开始处理数据")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t2.start()
t1.start()
t1.join()
t2.join()Queue — 线程安全队列
queue.Queue 内置锁,是线程间安全传递数据的首选方式:
import threading
import queue
import time
task_queue: queue.Queue = queue.Queue(maxsize=10)
def producer(q: queue.Queue, n: int) -> None:
for i in range(n):
item = f"任务{i}"
q.put(item) # 队列满时自动阻塞
print(f"生产: {item}")
time.sleep(0.1)
q.put(None) # 发送终止信号
def consumer(q: queue.Queue) -> None:
while True:
item = q.get() # 队列空时自动阻塞
if item is None:
break
print(f"消费: {item}")
q.task_done()
t1 = threading.Thread(target=producer, args=(task_queue, 5))
t2 = threading.Thread(target=consumer, args=(task_queue,))
t1.start()
t2.start()
t1.join()
t2.join()ThreadPoolExecutor — 线程池(推荐)
concurrent.futures.ThreadPoolExecutor 是更高层、更易用的线程池接口:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch(url: str) -> str:
time.sleep(0.5) # 模拟 I/O
return f"{url} → 200 OK"
urls = [f"https://api.example.com/item/{i}" for i in range(10)]
with ThreadPoolExecutor(max_workers=4) as executor:
# map:保序,简洁
results = list(executor.map(fetch, urls))
print(results)
# submit + as_completed:哪个先完成先处理
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(result)
except Exception as e:
print(f"{url} 失败: {e}")线程本地存储(threading.local)
每个线程有自己独立的副本,互不干扰:
import threading
local_data = threading.local()
def worker(value: int) -> None:
local_data.value = value # 每个线程独立存储
import time
time.sleep(0.1)
print(f"线程 {threading.current_thread().name}: {local_data.value}")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()最后更新于