进程
进程是操作系统进行资源分配的最小单位,也是正在运行的程序实例。每个进程有独立的内存空间,进程之间数据默认相互隔离。Python 通过 multiprocessing 模块提供多进程支持。
并发与并行
- 并发:宏观上看起来同时运行,但实际上是 CPU 快速切换(单核也能实现)。
- 并行:真正意义上的同时执行,需要多个 CPU 核心。
为什么用多进程
CPython 的 GIL 限制了多线程无法真正并行 CPU 密集型任务。多进程每个子进程有独立的解释器,绕过了 GIL,可以充分利用多核 CPU。
| 场景 | 推荐方案 |
|---|---|
| CPU 密集(数值计算、压缩) | multiprocessing / ProcessPoolExecutor |
| I/O 密集(网络请求、文件读写) | threading / asyncio |
| 混合场景 | ProcessPoolExecutor + ThreadPoolExecutor |
创建进程
函数方式
from multiprocessing import Process
import time
import os
def worker(name: str) -> None:
print(f"[子进程 {os.getpid()}] {name} 开始")
time.sleep(1)
print(f"[子进程 {os.getpid()}] {name} 结束")
if __name__ == "__main__":
p1 = Process(target=worker, args=("任务A",))
p2 = Process(target=worker, args=("任务B",))
p1.start()
p2.start()
p1.join() # 等待子进程结束
p2.join()
print(f"[主进程 {os.getpid()}] 所有子进程完成")Windows 上必须将
Process() 的调用放在 if __name__ == "__main__": 保护块内,否则会无限递归创建子进程。继承 Process 类
from multiprocessing import Process
import time
class DownloadTask(Process):
def __init__(self, url: str):
super().__init__()
self.url = url
def run(self) -> None:
print(f"下载: {self.url}")
time.sleep(1)
print(f"完成: {self.url}")
if __name__ == "__main__":
tasks = [
DownloadTask("https://example.com/file1.zip"),
DownloadTask("https://example.com/file2.zip"),
]
for t in tasks:
t.start()
for t in tasks:
t.join()进程间数据隔离
子进程对共享变量的修改不影响父进程:
from multiprocessing import Process
money = 100
def task() -> None:
global money
money = 666
print(f"子进程: money = {money}") # 666
if __name__ == "__main__":
p = Process(target=task)
p.start()
p.join()
print(f"主进程: money = {money}") # 仍然是 100(数据隔离)守护进程
守护进程随主进程代码执行完毕而终止:
from multiprocessing import Process
import time
def heartbeat() -> None:
while True:
print("♥ 子进程心跳")
time.sleep(1)
if __name__ == "__main__":
p = Process(target=heartbeat)
p.daemon = True # 必须在 start() 之前设置
p.start()
time.sleep(3)
print("主进程结束,守护进程随之退出")互斥锁(Lock)
多个进程操作同一份数据时,需要加锁防止竞态:
from multiprocessing import Process, Lock
import json
import time
def buy_ticket(name: str, lock: Lock) -> None:
with open("tickets.json", encoding="utf-8") as f:
data = json.load(f)
print(f"{name} 查询到余票: {data['count']}")
with lock:
with open("tickets.json", encoding="utf-8") as f:
data = json.load(f)
if data["count"] > 0:
data["count"] -= 1
with open("tickets.json", "w", encoding="utf-8") as f:
json.dump(data, f)
print(f"{name} 抢票成功!")
else:
print(f"{name} 票已售完")
if __name__ == "__main__":
import json
with open("tickets.json", "w") as f:
json.dump({"count": 3}, f)
lock = Lock()
users = ["用户A", "用户B", "用户C", "用户D", "用户E"]
processes = [Process(target=buy_ticket, args=(u, lock)) for u in users]
for p in processes:
p.start()
for p in processes:
p.join()进程间通信(Queue)
multiprocessing.Queue 是进程安全的消息队列,是进程间通信(IPC)的首选方式:
from multiprocessing import Process, Queue
import time
import random
def producer(q: Queue, name: str) -> None:
for i in range(3):
item = f"{name}-{i}"
q.put(item)
print(f"生产: {item}")
time.sleep(random.uniform(0.1, 0.5))
q.put(None) # 发送终止信号
def consumer(q: Queue) -> None:
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(0.2)
if __name__ == "__main__":
q: Queue = Queue(maxsize=10)
p = Process(target=producer, args=(q, "包子"))
c = Process(target=consumer, args=(q,))
p.start()
c.start()
p.join()
c.join()生产者-消费者模型
多生产者多消费者场景,用 JoinableQueue 优雅处理结束信号:
from multiprocessing import Process, JoinableQueue
import time
import random
import os
def producer(name: str, q: JoinableQueue) -> None:
for i in range(5):
item = f"{name}-{i}"
q.put(item)
print(f"[{os.getpid()}] 生产: {item}")
time.sleep(random.uniform(0.1, 0.3))
q.join() # 等待队列中所有任务被处理
def consumer(q: JoinableQueue) -> None:
while True:
item = q.get()
print(f"[{os.getpid()}] 消费: {item}")
time.sleep(0.2)
q.task_done() # 通知 q.join() 此条目已处理
if __name__ == "__main__":
q: JoinableQueue = JoinableQueue()
producers = [
Process(target=producer, args=("包子", q)),
Process(target=producer, args=("骨头", q)),
]
consumers = [
Process(target=consumer, args=(q,), daemon=True),
Process(target=consumer, args=(q,), daemon=True),
]
for p in producers + consumers:
p.start()
for p in producers:
p.join()
# 生产者全部完成后,消费者(守护进程)随主进程退出
print("所有生产者完成,程序退出")进程池(推荐:ProcessPoolExecutor)
concurrent.futures.ProcessPoolExecutor 是现代 Python 推荐的进程池接口:
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
def cpu_task(n: int) -> int:
"""CPU 密集型任务:计算 n 的阶乘之和"""
result = sum(range(n))
print(f"[{os.getpid()}] n={n} → {result}")
return result
if __name__ == "__main__":
tasks = [10_000, 20_000, 30_000, 40_000, 50_000]
# map:保序,简洁
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_task, tasks))
print("map 结果:", results)
# submit + as_completed:哪个先完成先处理
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(cpu_task, n): n for n in tasks}
for future in as_completed(futures):
n = futures[future]
try:
print(f"n={n} 完成: {future.result()}")
except Exception as e:
print(f"n={n} 失败: {e}")旧式进程池(multiprocessing.Pool)
from multiprocessing import Pool
import os
def worker(x: int) -> int:
return x * x
if __name__ == "__main__":
with Pool(processes=4) as pool:
# map:阻塞等待所有结果
results = pool.map(worker, range(10))
print(results)
# apply_async:非阻塞,异步提交
async_results = [pool.apply_async(worker, (i,)) for i in range(10)]
values = [r.get(timeout=5) for r in async_results]
print(values)进程间共享数据(Manager)
当必须共享数据时,使用 Manager(性能开销较大,首选 Queue):
from multiprocessing import Process, Manager, Lock
def worker(d: dict, lock: Lock) -> None:
with lock:
d["count"] -= 1
if __name__ == "__main__":
lock = Lock()
with Manager() as m:
shared_dict = m.dict({"count": 100})
processes = [Process(target=worker, args=(shared_dict, lock)) for _ in range(100)]
for p in processes:
p.start()
for p in processes:
p.join()
print(shared_dict) # {'count': 0}僵尸进程与孤儿进程
- 僵尸进程:子进程已结束,但父进程尚未调用
wait()/join()回收其资源。进程号被占用。- 解决:父进程及时调用
p.join()或在守护模式下运行。
- 解决:父进程及时调用
- 孤儿进程:父进程意外终止,子进程失去父进程。操作系统会将其托管给
init进程(PID=1)自动回收。
from multiprocessing import Process
import time
def child() -> None:
time.sleep(2)
print("子进程结束")
if __name__ == "__main__":
p = Process(target=child)
p.start()
# p.join() 若不调用 join,子进程结束后短暂成为僵尸进程
# 主进程结束后操作系统回收资源
p.join() # 正确做法:等待子进程结束最后更新于