Skip to content

23 multiprocessing多进程

Python有GIL(全局解释器锁),threading虽然能并发,但同一时刻只有一个线程在执行Python代码。要实现真正的多核并行,得用multiprocessing——它开多个进程,每个进程有独立的GIL,互不影响。

multiprocessing适合CPU密集型任务(大量计算、数据处理),threading适合I/O密集型任务(网络请求、文件读写)。

一、创建进程

1.1 基本用法

python
from multiprocessing import Process
import os

def worker(num):
    print(f"进程 {os.getpid()}, 参数: {num}")

if __name__ == '__main__':
    # 创建进程
    p = Process(target=worker, args=(1,))

    # 启动进程
    p.start()

    # 等待进程完成
    p.join()

    print(f"主进程 {os.getpid()} 继续")

注意:在Windows上必须用if __name__ == '__main__':保护。

1.2 多个进程

python
from multiprocessing import Process
import os

def worker(num):
    print(f"进程 {os.getpid()}, 参数: {num}")
    import time
    time.sleep(2)
    print(f"进程 {num} 完成")

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("所有进程完成")

1.3 继承Process类

python
from multiprocessing import Process
import os

class MyProcess(Process):
    def __init__(self, num):
        super().__init__()
        self.num = num

    def run(self):
        print(f"进程 {os.getpid()}, 参数: {self.num}")

if __name__ == '__main__':
    p = MyProcess(1)
    p.start()
    p.join()

二、进程池

2.1 Pool

进程池可以复用进程,避免频繁创建销毁。

python
from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(4) as pool:  # 4个工作进程
        results = pool.map(square, [1, 2, 3, 4, 5])
    print(results)  # [1, 4, 9, 16, 25]

2.2 apply和apply_async

python
from multiprocessing import Pool
import time

def task(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    with Pool(4) as pool:
        # apply:阻塞调用
        result = pool.apply(task, (5,))
        print(f"apply结果: {result}")

        # apply_async:异步调用
        async_result = pool.apply_async(task, (10,))
        print(f"异步结果: {async_result.get()}")  # get()获取结果

        # map:并行映射
        results = pool.map(task, [1, 2, 3, 4, 5])
        print(f"map结果: {results}")

        # map_async:异步并行映射
        async_results = pool.map_async(task, [1, 2, 3])
        print(f"异步map结果: {async_results.get()}")

2.3 starmap

当函数有多个参数时使用。

python
from multiprocessing import Pool

def add(x, y):
    return x + y

if __name__ == '__main__':
    with Pool(4) as pool:
        # starmap:解包参数
        results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
        print(results)  # [3, 7, 11]

三、进程间通信

3.1 Queue:队列

python
from multiprocessing import Process, Queue

def producer(q):
    for i in range(5):
        q.put(i)
        print(f"生产: {i}")

def consumer(q):
    while True:
        item = q.get()
        if item is None:  # 结束信号
            break
        print(f"消费: {item}")

if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    q.put(None)  # 发送结束信号
    p2.join()

3.2 Pipe:管道

两个进程之间的直接通信。

python
from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("你好")
    conn.send("世界")
    conn.close()

def receiver(conn):
    while True:
        try:
            msg = conn.recv()
            print(f"收到: {msg}")
        except EOFError:
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()

    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

四、共享内存

4.1 Value

共享单个值。

python
from multiprocessing import Process, Value

def increment(counter):
    for _ in range(100000):
        counter.value += 1

if __name__ == '__main__':
    counter = Value('i', 0)  # 'i'表示整数

    processes = [Process(target=increment, args=(counter,)) for _ in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"结果: {counter.value}")  # 500000

4.2 Array

共享数组。

python
from multiprocessing import Process, Array

def modify(arr, index, value):
    arr[index] = value

if __name__ == '__main__':
    arr = Array('i', [0, 0, 0, 0, 0])  # 'i'表示整数

    processes = [Process(target=modify, args=(arr, i, i*10)) for i in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(list(arr))  # [0, 10, 20, 30, 40]

4.3 Manager

更灵活的共享对象。

python
from multiprocessing import Process, Manager

def worker(shared_dict, shared_list, key, value):
    shared_dict[key] = value
    shared_list.append(value)

if __name__ == '__main__':
    with Manager() as manager:
        shared_dict = manager.dict()
        shared_list = manager.list()

        processes = [
            Process(target=worker, args=(shared_dict, shared_list, f"key{i}", i))
            for i in range(5)
        ]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(dict(shared_dict))
        print(list(shared_list))

五、进程同步

5.1 Lock

python
from multiprocessing import Process, Lock, Value

def safe_increment(counter, lock):
    for _ in range(100000):
        with lock:
            counter.value += 1

if __name__ == '__main__':
    counter = Value('i', 0)
    lock = Lock()

    processes = [Process(target=safe_increment, args=(counter, lock)) for _ in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"结果: {counter.value}")

5.2 Semaphore

python
from multiprocessing import Process, Semaphore
import time

def limited_task(sem, id):
    with sem:
        print(f"任务 {id} 开始")
        time.sleep(2)
        print(f"任务 {id} 完成")

if __name__ == '__main__':
    sem = Semaphore(3)  # 最多3个并发

    processes = [Process(target=limited_task, args=(sem, i)) for i in range(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

六、实战场景

6.1 并行计算

python
from multiprocessing import Pool
import time

def cpu_intensive(n):
    """CPU密集型任务"""
    total = 0
    for i in range(n):
        total += i * i
    return total

if __name__ == '__main__':
    numbers = [10**7] * 8

    # 串行
    start = time.time()
    results = [cpu_intensive(n) for n in numbers]
    print(f"串行: {time.time() - start:.2f}秒")

    # 并行
    start = time.time()
    with Pool() as pool:
        results = pool.map(cpu_intensive, numbers)
    print(f"并行: {time.time() - start:.2f}秒")

6.2 批量文件处理

python
from multiprocessing import Pool
from pathlib import Path

def process_file(filepath):
    """处理单个文件"""
    with open(filepath, 'r') as f:
        content = f.read()
    # 处理逻辑
    return f"{filepath}: {len(content)} 字符"

if __name__ == '__main__':
    files = list(Path("./data").glob("*.txt"))

    with Pool(4) as pool:
        results = pool.map(process_file, files)

    for r in results:
        print(r)

七、常见问题

7.1 Windows上的保护

python
# Windows上必须用这个
if __name__ == '__main__':
    p = Process(target=worker)
    p.start()

7.2 序列化问题

python
# 错误:lambda不能序列化
# pool.map(lambda x: x*2, [1,2,3])  # PicklingError

# 正确:用普通函数
def double(x):
    return x * 2

pool.map(double, [1, 2, 3])

八、与threading对比

特性threadingmultiprocessing
并发方式线程进程
GIL限制受限不受限
适合任务I/O密集型CPU密集型
内存共享共享内存独立内存
创建开销
通信方式共享变量Queue/Pipe/Manager

九、总结

multiprocessing模块的核心:

组件用途
Process创建进程
Pool进程池
Queue进程安全队列
Pipe两个进程间的管道
Value/Array共享内存
Manager共享对象管理器
Lock/Semaphore进程同步

使用场景:

  • CPU密集型任务(数据处理、图像处理、科学计算)
  • 需要利用多核CPU
  • 批量任务并行处理

记住:CPU密集型用multiprocessing,I/O密集型用threadingasyncio