跳至内容

进程

进程是操作系统进行资源分配的最小单位,也是正在运行的程序实例。每个进程有独立的内存空间,进程之间数据默认相互隔离。Python 通过 multiprocessing 模块提供多进程支持。

并发与并行

  • 并发:宏观上看起来同时运行,但实际上是 CPU 快速切换(单核也能实现)。
  • 并行:真正意义上的同时执行,需要多个 CPU 核心。

为什么用多进程

CPython 的 GIL 限制了多线程无法真正并行 CPU 密集型任务。多进程每个子进程有独立的解释器,绕过了 GIL,可以充分利用多核 CPU。

场景推荐方案
CPU 密集(数值计算、压缩)multiprocessing / ProcessPoolExecutor
I/O 密集(网络请求、文件读写)threading / asyncio
混合场景ProcessPoolExecutor + ThreadPoolExecutor

创建进程

函数方式

from multiprocessing import Process
import time
import os

def worker(name: str) -> None:
    print(f"[子进程 {os.getpid()}] {name} 开始")
    time.sleep(1)
    print(f"[子进程 {os.getpid()}] {name} 结束")

if __name__ == "__main__":
    p1 = Process(target=worker, args=("任务A",))
    p2 = Process(target=worker, args=("任务B",))

    p1.start()
    p2.start()

    p1.join()   # 等待子进程结束
    p2.join()

    print(f"[主进程 {os.getpid()}] 所有子进程完成")
Windows 上必须将 Process() 的调用放在 if __name__ == "__main__": 保护块内,否则会无限递归创建子进程。

继承 Process 类

from multiprocessing import Process
import time

class DownloadTask(Process):
    def __init__(self, url: str):
        super().__init__()
        self.url = url

    def run(self) -> None:
        print(f"下载: {self.url}")
        time.sleep(1)
        print(f"完成: {self.url}")

if __name__ == "__main__":
    tasks = [
        DownloadTask("https://example.com/file1.zip"),
        DownloadTask("https://example.com/file2.zip"),
    ]
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()

进程间数据隔离

子进程对共享变量的修改不影响父进程:

from multiprocessing import Process

money = 100

def task() -> None:
    global money
    money = 666
    print(f"子进程: money = {money}")   # 666

if __name__ == "__main__":
    p = Process(target=task)
    p.start()
    p.join()
    print(f"主进程: money = {money}")   # 仍然是 100(数据隔离)

守护进程

守护进程随主进程代码执行完毕而终止:

from multiprocessing import Process
import time

def heartbeat() -> None:
    while True:
        print("♥ 子进程心跳")
        time.sleep(1)

if __name__ == "__main__":
    p = Process(target=heartbeat)
    p.daemon = True   # 必须在 start() 之前设置
    p.start()

    time.sleep(3)
    print("主进程结束,守护进程随之退出")

互斥锁(Lock)

多个进程操作同一份数据时,需要加锁防止竞态:

from multiprocessing import Process, Lock
import json
import time

def buy_ticket(name: str, lock: Lock) -> None:
    with open("tickets.json", encoding="utf-8") as f:
        data = json.load(f)
    print(f"{name} 查询到余票: {data['count']}")

    with lock:
        with open("tickets.json", encoding="utf-8") as f:
            data = json.load(f)
        if data["count"] > 0:
            data["count"] -= 1
            with open("tickets.json", "w", encoding="utf-8") as f:
                json.dump(data, f)
            print(f"{name} 抢票成功!")
        else:
            print(f"{name} 票已售完")

if __name__ == "__main__":
    import json
    with open("tickets.json", "w") as f:
        json.dump({"count": 3}, f)

    lock = Lock()
    users = ["用户A", "用户B", "用户C", "用户D", "用户E"]
    processes = [Process(target=buy_ticket, args=(u, lock)) for u in users]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

进程间通信(Queue)

multiprocessing.Queue 是进程安全的消息队列,是进程间通信(IPC)的首选方式:

from multiprocessing import Process, Queue
import time
import random

def producer(q: Queue, name: str) -> None:
    for i in range(3):
        item = f"{name}-{i}"
        q.put(item)
        print(f"生产: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    q.put(None)   # 发送终止信号

def consumer(q: Queue) -> None:
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")
        time.sleep(0.2)

if __name__ == "__main__":
    q: Queue = Queue(maxsize=10)
    p = Process(target=producer, args=(q, "包子"))
    c = Process(target=consumer, args=(q,))
    p.start()
    c.start()
    p.join()
    c.join()

生产者-消费者模型

多生产者多消费者场景,用 JoinableQueue 优雅处理结束信号:

from multiprocessing import Process, JoinableQueue
import time
import random
import os

def producer(name: str, q: JoinableQueue) -> None:
    for i in range(5):
        item = f"{name}-{i}"
        q.put(item)
        print(f"[{os.getpid()}] 生产: {item}")
        time.sleep(random.uniform(0.1, 0.3))
    q.join()   # 等待队列中所有任务被处理

def consumer(q: JoinableQueue) -> None:
    while True:
        item = q.get()
        print(f"[{os.getpid()}] 消费: {item}")
        time.sleep(0.2)
        q.task_done()   # 通知 q.join() 此条目已处理

if __name__ == "__main__":
    q: JoinableQueue = JoinableQueue()

    producers = [
        Process(target=producer, args=("包子", q)),
        Process(target=producer, args=("骨头", q)),
    ]
    consumers = [
        Process(target=consumer, args=(q,), daemon=True),
        Process(target=consumer, args=(q,), daemon=True),
    ]

    for p in producers + consumers:
        p.start()

    for p in producers:
        p.join()

    # 生产者全部完成后,消费者(守护进程)随主进程退出
    print("所有生产者完成,程序退出")

进程池(推荐:ProcessPoolExecutor)

concurrent.futures.ProcessPoolExecutor 是现代 Python 推荐的进程池接口:

from concurrent.futures import ProcessPoolExecutor, as_completed
import os

def cpu_task(n: int) -> int:
    """CPU 密集型任务:计算 n 的阶乘之和"""
    result = sum(range(n))
    print(f"[{os.getpid()}] n={n}{result}")
    return result

if __name__ == "__main__":
    tasks = [10_000, 20_000, 30_000, 40_000, 50_000]

    # map:保序,简洁
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_task, tasks))
    print("map 结果:", results)

    # submit + as_completed:哪个先完成先处理
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {executor.submit(cpu_task, n): n for n in tasks}
        for future in as_completed(futures):
            n = futures[future]
            try:
                print(f"n={n} 完成: {future.result()}")
            except Exception as e:
                print(f"n={n} 失败: {e}")

旧式进程池(multiprocessing.Pool)

from multiprocessing import Pool
import os

def worker(x: int) -> int:
    return x * x

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        # map:阻塞等待所有结果
        results = pool.map(worker, range(10))
        print(results)

        # apply_async:非阻塞,异步提交
        async_results = [pool.apply_async(worker, (i,)) for i in range(10)]
        values = [r.get(timeout=5) for r in async_results]
        print(values)

进程间共享数据(Manager)

当必须共享数据时,使用 Manager(性能开销较大,首选 Queue):

from multiprocessing import Process, Manager, Lock

def worker(d: dict, lock: Lock) -> None:
    with lock:
        d["count"] -= 1

if __name__ == "__main__":
    lock = Lock()
    with Manager() as m:
        shared_dict = m.dict({"count": 100})
        processes = [Process(target=worker, args=(shared_dict, lock)) for _ in range(100)]
        for p in processes:
            p.start()
        for p in processes:
            p.join()
        print(shared_dict)   # {'count': 0}

僵尸进程与孤儿进程

  • 僵尸进程:子进程已结束,但父进程尚未调用 wait()/join() 回收其资源。进程号被占用。
    • 解决:父进程及时调用 p.join() 或在守护模式下运行。
  • 孤儿进程:父进程意外终止,子进程失去父进程。操作系统会将其托管给 init 进程(PID=1)自动回收。
from multiprocessing import Process
import time

def child() -> None:
    time.sleep(2)
    print("子进程结束")

if __name__ == "__main__":
    p = Process(target=child)
    p.start()
    # p.join() 若不调用 join,子进程结束后短暂成为僵尸进程
    # 主进程结束后操作系统回收资源
    p.join()   # 正确做法:等待子进程结束
最后更新于