跳至内容

线程

线程是操作系统调度的最小单位,同一进程内的多个线程共享该进程的内存空间。Python 通过 threading 模块提供线程支持。

进程与线程的关系

  • 进程:资源分配的基本单位,拥有独立的内存空间。
  • 线程:执行/调度的基本单位,比进程更轻量,创建和切换开销更小。
  • 一个进程至少有一个主线程,可以创建多个子线程;同一进程内的所有线程共享内存,通信方便但需要同步控制。

Python 的 GIL

CPython 解释器有一把全局解释器锁(GIL,Global Interpreter Lock),同一时刻只允许一个线程执行 Python 字节码。这意味着:

  • 多线程无法利用多核 CPU 实现真正的并行计算(CPU 密集型任务)。
  • 但多线程可以实现并发:一个线程 I/O 等待时,其他线程可以运行(I/O 密集型任务)。
  • CPU 密集型任务应改用 multiprocessing(多进程)或 concurrent.futures.ProcessPoolExecutor
Python 3.13 引入了实验性的无 GIL 模式--disable-gil),未来有望实现真正的多核并行。

创建线程

import threading
import time

def worker(name: str, delay: float) -> None:
    print(f"[{name}] 开始")
    time.sleep(delay)
    print(f"[{name}] 完成")

# 方式一:Thread 对象
t1 = threading.Thread(target=worker, args=("线程A", 1))
t2 = threading.Thread(target=worker, args=("线程B", 2))

t1.start()
t2.start()

# join() 等待线程结束
t1.join()
t2.join()
print("所有线程完成")

继承 Thread 类

import threading

class DownloadTask(threading.Thread):
    def __init__(self, url: str):
        super().__init__()
        self.url = url

    def run(self) -> None:
        """线程启动后执行此方法。"""
        import time
        print(f"下载: {self.url}")
        time.sleep(1)
        print(f"完成: {self.url}")

tasks = [
    DownloadTask("https://example.com/file1.zip"),
    DownloadTask("https://example.com/file2.zip"),
]

for t in tasks:
    t.start()

for t in tasks:
    t.join()

daemon 守护线程

守护线程会在主线程结束时自动退出,不会阻止程序退出。适合后台任务(日志上报、心跳检测等):

import threading
import time

def heartbeat() -> None:
    while True:
        print("♥ 心跳")
        time.sleep(2)

# 设置为守护线程,主线程结束后自动终止
t = threading.Thread(target=heartbeat, daemon=True)
t.start()

time.sleep(5)
print("主线程结束,守护线程随之退出")

线程安全:Lock

多个线程修改共享变量时,需要使用锁(Lock)来防止竞态条件:

import threading

counter = 0
lock = threading.Lock()

def increment(n: int) -> None:
    global counter
    for _ in range(n):
        with lock:   # 等价于 lock.acquire() ... lock.release()
            counter += 1

threads = [threading.Thread(target=increment, args=(10000,)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最终计数: {counter}")   # 50000(正确结果)

不加锁时,counter 结果可能小于 50000(丢失更新)。

RLock — 可重入锁

同一线程可以多次获取 RLock,避免死锁:

import threading

rlock = threading.RLock()

def outer():
    with rlock:
        print("外层锁定")
        inner()   # 同线程再次获取锁,RLock 允许,Lock 会死锁

def inner():
    with rlock:
        print("内层锁定")

t = threading.Thread(target=outer)
t.start()
t.join()

Semaphore — 信号量

控制同时访问某资源的最大线程数(如连接池、并发请求限制):

import threading
import time

# 最多允许 3 个线程同时进入
sem = threading.Semaphore(3)

def access_resource(name: str) -> None:
    with sem:
        print(f"{name} 正在使用资源")
        time.sleep(1)
        print(f"{name} 释放资源")

threads = [threading.Thread(target=access_resource, args=(f"线程{i}",)) for i in range(8)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Event — 线程间信号

import threading
import time

event = threading.Event()

def producer() -> None:
    print("生产者: 准备数据...")
    time.sleep(2)
    event.set()          # 发送信号
    print("生产者: 数据就绪")

def consumer() -> None:
    print("消费者: 等待数据...")
    event.wait()         # 阻塞直到收到信号
    print("消费者: 开始处理数据")

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t2.start()
t1.start()
t1.join()
t2.join()

Queue — 线程安全队列

queue.Queue 内置锁,是线程间安全传递数据的首选方式:

import threading
import queue
import time

task_queue: queue.Queue = queue.Queue(maxsize=10)

def producer(q: queue.Queue, n: int) -> None:
    for i in range(n):
        item = f"任务{i}"
        q.put(item)   # 队列满时自动阻塞
        print(f"生产: {item}")
        time.sleep(0.1)
    q.put(None)       # 发送终止信号

def consumer(q: queue.Queue) -> None:
    while True:
        item = q.get()  # 队列空时自动阻塞
        if item is None:
            break
        print(f"消费: {item}")
        q.task_done()

t1 = threading.Thread(target=producer, args=(task_queue, 5))
t2 = threading.Thread(target=consumer, args=(task_queue,))
t1.start()
t2.start()
t1.join()
t2.join()

ThreadPoolExecutor — 线程池(推荐)

concurrent.futures.ThreadPoolExecutor 是更高层、更易用的线程池接口:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch(url: str) -> str:
    time.sleep(0.5)   # 模拟 I/O
    return f"{url} → 200 OK"

urls = [f"https://api.example.com/item/{i}" for i in range(10)]

with ThreadPoolExecutor(max_workers=4) as executor:
    # map:保序,简洁
    results = list(executor.map(fetch, urls))
    print(results)

# submit + as_completed:哪个先完成先处理
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(fetch, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
            print(result)
        except Exception as e:
            print(f"{url} 失败: {e}")

线程本地存储(threading.local)

每个线程有自己独立的副本,互不干扰:

import threading

local_data = threading.local()

def worker(value: int) -> None:
    local_data.value = value   # 每个线程独立存储
    import time
    time.sleep(0.1)
    print(f"线程 {threading.current_thread().name}: {local_data.value}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()
最后更新于