目录
- 引言
- 1. multiprocessing 模块的定义和原理
- 1.1 定义
- 1.2 原理
- 1.3 导入
- 2. multiprocessing 的核心组件和功能
- 2.1 进程创建(Process)
- 2.2 进程池(Pool)
- 2.3 进程通信
- 2.4 同步机制
- 2.5 共享内存
- 3. 应用场景
- 4. 示例:多进程爬虫
- 5. 最佳实践
- 6. 注意事项
- 7. 总结
引言
python 的 multiprocessing 模块是一个标准库模块,用于实现多进程并行计算。它通过创建独立的进程,绕过 Python 的全局解释器锁(GIL),在多核 CPU 上实现真正的并行,特别适合 CPU 密集型任务(如数值计算、图像处理)。相比线程(threading 模块),multiprocessing 更适合需要高性能计算的场景。本文将详细介绍 multiprocessing 模块的定义、功能、用法、示例、应用场景、最佳实践和注意事项。
1. multiprocessing 模块的定义和原理
1.1 定义
multiprocessing 是一个跨平台的模块,提供创建和管理进程的 API,支持进程间通信(IPC)、同步机制和共享资源管理。它模仿了 threading 模块的接口,方便开发者从线程迁移到进程。
核心功能:
- 进程创建:创建独立进程,运行指定函数或任务。
- 进程池:管理一组工作进程,分配任务。
- 进程通信:支持管道(
Pipe)、队列(Queue)等 IPC 机制。 - 同步原语:提供锁(
Lock)、信号量(Semaphore)、事件(Event)等。 - 共享内存:支持共享基本数据类型(
Value)和数组(Array)。 - 跨平台:在 Windows、linux、MACOS 上运行一致。
依赖:标准库,无需额外安装。
1.2 原理
- 进程 vs 线程:
- 进程:独立的内存空间,拥有自己的 Python 解释器和 GIL,适合 CPU 密集型任务。
- 线程:共享内存空间,受 GIL 限制,适合 I/O 密集型任务。
- GIL 绕过:每个进程有独立的 GIL,允许多核并行。
- 进程创建:
- Linux/macOS:使用
fork(复制父进程),或spawn(新进程)。 - Windows:始终使用
spawn,启动新解释器。
- Linux/macOS:使用
- 通信开销:进程间通信(如
Queue)比线程慢,需优化设计。
1.3 导入
import multiprocessing
2. multiprocessing 的核心组件和功能
2.1 进程创建(Process)
通过 multiprocessing.Process 创建进程,运行指定函数。
构造函数:
Process(target=None, args=(), kwargs={}, name=None, daemon=None)
target:目标函数。args/kwargs:函数参数。name:进程名称。daemon:是否为守护进程(随主进程退出)。
主要方法:
start():启动进程。join():等待进程结束。terminate():强制终止进程。is_alive():检查进程是否存活。
示例:
import multiprocessing
def worker(num):
print(f"Worker {num} running in process {muljstiprocessing.current_process().name}")
if __name__ == "__main__":
processes = [multiprocessing.Process(target=worker, args=(i,)) for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
输出(顺序可能不同):
Worker 0 running in process Process-1 Worker 1 running in process Process-2 Worker 2 running in process Process-3
- 说明:创建 3 个进程,每个运行
worker函数。
2.2 进程池(Pool)
Pool 用于管理固定数量的进程,适合并行处理大量任务。
构造函数:
Pool(processes=None, initializer=Nonhttp://www.devze.come, initargs=())
processes:进程数(默认 CPU 核心数)。initialandroidizer:每个进程的初始化函数。initargs:初始化函数参数。
主要方法:
map(func, iterable):并行执行func应用于iterable,返回结果列表。imap(func, iterable):惰性版本,返回迭代器。apply(func, args=(), kwds={}):同步执行单任务。apply_async(func, args=(), kwds={}):异步执行单任务。close():关闭池,禁止新任务。join():等待池内进程完成。
示例:
from multiprocessing import Pool
def square(n):
return n * n
if __name__ == "__main__":
with Pool(processes=4) as pool:
results = pool.map(square, range(10))
print(results) # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
2.3 进程通信
支持 Pipe 和 Queue 实现进程间数据交换。
Pipe
- 双向或单向管道,适合两个进程通信。
构造函数:
Pipe(duplex=True)
- 返回
(conn1, conn2),两个连接对象。 duplex=True:双向;False:单向。
示例:
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from sender")
conn.close()
def receiver(conn):
print(conn.recv())
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=send编程客栈er, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
输出:
Hello from sender
Queue
- 线程和进程安全的队列,适合多生产者/消费者场景。
构造函数:
Queue(maxsize=0)
maxsize:最大容量(0 表示无限制)。
示例:
from multiprocessing import Process, Queue
def producer(queue):
queue.put("Data from producer")
def consumer(queue):
print(queue.get())
if __name__ == "__main__":
queue = Queue()
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
2.4 同步机制
提供锁、信号量等原语,确保进程安全访问共享资源。
Lock
- 互斥锁,防止多个进程同时访问资源。
- 示例:
from multiprocessing import Process, Lock
def printer(lock, msg):
with lock:
print(msg)
if __name__ == "__main__":
lock = Lock()
processes = [Process(target=printer, args=(lock, f"Message {i}")) for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
Semaphore
- 控制有限资源的并发访问。
- 示例:
from multiprocessing import Process, Semaphore
def worker(sem, name):
with sem:
print(f"{name} acquired resource")
# 模拟工作
if __name__ == "__main__":
sem = Semaphore(2) # 允许 2 个进程同时访问
processes = [Process(target=worker, args=(sem, f"Worker {i}")) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
Event
- 进程间信号通知。
- 示例:
from multiprocessing import Process, Event
import time
def wait_for_event(event):
event.wait()
print("Event triggered")
if __name__ == "__main__":
event = Event()
p = Process(target=wait_for_event, args=(event,))
p.start()
time.sleep(1)
event.set() # 触发事件
p.join()
2.5 共享内存
通过 Value 和 Array 共享基本数据类型。
- Value:单个共享值。
- Array:共享数组。
示例:
from multiprocessing import Process, Value, Array
def modify(shared_num, shared_arr):
shared_num.value += 1
for i in range(len(shared_arr)):
shared_arr[i] += 1
if __name__ == "__main__":
num = Value("i", 0) # 共享整数
arr = Array("i", [1, 2, 3]) # 共享数组
p = Process(target=modify, args=(num, arr))
p.start()
p.join()
print(num.value) # 输出: 1
print(list(arr)) # 输出: [2, 3, 4]
3. 应用场景
数值计算:
- 并行处理矩阵运算、蒙特卡洛模拟。
- 示例:计算大数组的平方。
图像处理:
- 并行处理图像滤波、特征提取。
- 示例:批量应用卷积滤波。
机器学习:
- 并行训练模型或处理数据预处理。
- 示例:并行特征提取。
数据处理:
- 并行处理 CSV 文件、数据库查询。
- 示例:多进程解析日志文件。
爬虫:
- 并行抓取网页(注意网络限制)。
- 示例:结合
urllib并发下载。
4. 示例:多进程爬虫
结合 urllib 和 Queue 实现并行网页抓取。
示例:
import urllib.request
from multiprocessing import Process, Queue
from urllib.error import URLError
def fetch_url(queue, url):
try:
with urllib.request.urlopen(url) as response:
content = response.read().decode("utf-8")
queue.put((url, len(content)))
except URLError as e:
queue.put((url, str(e)))
def main():
urls = ["https://example.com", "https://python.org", "https://invalid-url"]
queue = Queue()
processes = [Process(target=fetch_url, args=(queue, url)) for url in urls]
for p in processes:
p.start()
for p in processes:
p.join()
while not queue.empty():
url, result = queue.get()
print(f"{url}: {result}")
if __name__ == "__main__":
main()
输出(示例):
https://example.com: 1256 https://python.org: 50000 https://invalid-url: [Errno 11001] getaddrinfo failed
5. 最佳实践
使用 if __name__ == "__main__"::
- 防止 Windows 和某些 Unix 系统重复导入模块。
示例:
if __name__ == "__main__":
p = Process(target=worker)
p.start()
选择进程池:
- 对于批量任务,使用
Pool简化管理。
示例:
with Pool(4) as pool:
results = pool.map(func, data)
优化通信:
- 尽量减少进程间通信,使用共享内存或批量传递数据。
示例:
arr = Array("i", [0] * size)
异常处理:
- 在子进程中捕获异常,通过
Queue或日志返回。
示例:
def worker(queue):
try:
# 工作代码
except Exception as e:
queue.put(str(e))
测试代码:
- 使用
pytest测试多进程行为。
示例:
import pytest
from multiprocessing import Process
def test_process():
def worker():
print("Test")
p = Process(target=worker)
p.start()
p.join()
assert p.exitcode == 0
进程数选择:
- 默认使用 CPU 核心数(
multiprocessing.cpu_count())。
示例:
processes = min(len(tasks), multiprocessing.cpu_count())
6. 注意事项
GIL 限制:
multiprocessing绕过 GIL,适合 CPU 密集型任务;I/O 密集型任务考虑threading或asyncio。
示例:
# I/O 密集型:使用 asyncio
import asyncio
async def fetch():
pass
Windows 兼容性:
- Windows 使用
spawn,需确保代码在if __name__ == "__main__":中。
示例:
if __name__ == "__main__":
main()
资源管理:
- 及时关闭进程和池,释放资源。
示例:
with Pool() as pool:
pool.map(func, data)
序列化开销:
- 传递大数据到子进程(如通过
Queue)可能慢,使用共享内存。
示例:
shared_data = Value("d", 0.0)
调试难度:
- 子进程错误可能不易捕获,使用日志或
Queue返回错误。
示例:
importjs logging logging.basicConfig(level=logging.INFO)
7. 总结
Python 的 multiprocessing 模块是实现多进程并行的强大工具,绕过 GIL,适合 CPU 密集型任务。其核心特点包括:
- 定义:提供进程创建、通信、同步和共享内存的 API。
- 功能:支持
Process、Pool、Queue、Pipe、Lock等。 - 应用:数值计算、图像处理、机器学习、数据处理、爬虫。
- 最佳实践:使用
if __name__ == "__main__":、优化通信、测试代码。
以上就是Python使用multiprocessing模块实现多进程并行计算的详细内容,更多关于Python multiprocessing多进程并行计算的资料请关注编程客栈(www.devze.com)其它相关文章!
加载中,请稍侯......
精彩评论