Python 多进程池进行并发处理,可以使用multiprocessing
模块创建一个Pool
处理对象,并将任务分配给进程池中的各个进程。我们将利用操作系统来交错各个进程之间的执行。如果每个进程都混合了I/O和计算,那么处理器会处于满负荷工作状态。在进程等待I/O完成的同时,其他进程可以执行各自的计算。当I/O完成后,进程将启动并与其他进程竞争处理器时间。
将任务映射到独立进程的方法如下所示:
import multiprocessing
with multiprocessing.Pool(4) as workers:
workers.map(analysis, glob.glob(pattern))
上面创建了一个含4个独立进程的Pool
对象,并将这个Pool
对象赋给了workers
变量。然后使用该进程池将analysis
函数映射到了一个待完成的可迭代任务队列。该可迭代队列会为workers
池中的每个进程分配任务。在本例中,该队列是使用glob.glob(pattern)
属性后得到的,是一个文件名序列。
当analysis()
函数返回结果时,创建Pool
对象的父进程便可以收集这些结果了。这允许我们创建多个并发构建的Counter
对象,并将它们合并为单个的复合结果。
如果在进程池中启动 P 个进程,那么整个应用程序将包括p+1个进程,由一个父进程和 P 个子进程组成。由于父进程在子进程池启动之后几乎无事可做,因此这种做法通常很有效。通常我们会将worker分配至单独的CPU(或计算核心),而父进程则与Pool
对象中的一个子进程共享一个CPU。
常规的Linux父/子进程规则适用于由该模块创建的子进程。如果父进程在正确收集到子进程最终状态之前崩溃了,那么便会留下仍在运行的僵尸进程。鉴于此,进程的
Pool
对象会作为上下文管理器。当通过with
语句使用该进程池时,在上下文结束后子进程都会适时终止。
默认情况下,一个Pool
对象会根据multiprocessing.cpu_count()
函数的值创建多个worker。该数值通常是最优的,因此简单使用属性multiprocessing.Pool() as workers:
就足够了。
在某些情况下,创建比CPU个数更多的worker也会有帮助。当每个worker都需要进行I/O密集型处理时,这种情况便会存在。让许多worker进程等待I/O完成可以缩短应用程序的运行时间。
如果给定的Pool
对象中有 P 个worker,那么这种映射可以将处理器时间几乎缩减至连续处理这些日志所需时间的 (\frac{1}{p}) 。实际上,Pool
对象中父进程和子进程之间的通信存在一定开销。如果将任务细分为非常小的并发单元,那么这些开销会限制并发的效率。
多进程的Pool
对象有4个类map()
方法,分别是map()
、imap()
、imap_unordered()
和starmap()
函数,它们负责在进程池中分配任务。这4个函数是一个通用模式的变体,该模式将函数赋给进程池中的每个函数,并将数据映射至该函数。它们分配任务和收集结果的方式不同。
map(function, iterable
)方法将迭代项分配给进程池中的每个worker。结果按照分配Pool
对象时的顺序进行收集,以保留原始顺序。
imap(function, iterable)
方法比map()
“懒惰”。默认情况下,它会把每个单独的迭代项发送给下一个可用的worker。这可能会引入额外的通信开销,因此建议使用大于1的块。
imap_unordered(function, iterable)
方法与imap()
方法类似,但它不会保留结果的顺序。允许乱序处理映射意味着一旦进程完成处理便收集结果,否则必须按序收集结果。
starmap(function, iterable)
方法类似于itertools.starmap()
函数。每个迭代项必须是元组,并且为了让元组中的每个值成为位置参数,使用了*
修饰符将其传递给函数。实际上,它执行的是function(*iterable[0])
,function(*iterable[1])
等函数。
上述映射模式的一个变体如下所示:
import multiprocessing
pattern = "*.gz"
combined = Counter()
with multiprocessing.Pool() as workers:
result_iter = workers.imap_unordered(
analysis, glob.glob(pattern))
for result in result_iter:
combined.update(result)
这样就创建了一个Counter()
函数,用于合并进程池中每个worker
的结果。我们基于可用的CPU创建了一个子进程池,并使用Pool
对象作为上下文管理器。然后将analysis()
函数映射至文件匹配模式中的每个文件。通过analysis()
函数得到的所有Counter
对象最终合并成了一个计数器。
该版本分析一批日志文件花费了大约68秒。使用多个并发进程可大幅缩减分析日志的用时。单进程的基准运行时间是150秒。如果想确定让系统满负荷运作所需的worker数量,需要在更大的进程池中进行额外的试验。
我们使用multiprocessing
模块中的Pool.map()
函数创建了一个双层“映射-归约”过程。第一层是analysis()
函数,它对单个日志文件进行映射和归约,然后用更高层的归约操作合并这些归约结果。
使用apply()
发送单个请求
除了map()
函数的变体,也可以使用apply(function, *args, **kw)
方法向工作池传递值。map()
方法实际上只是一个封装了apply()
方法的for
循环。例如,可以使用以下命令:
list(
workers.apply(analysis, f)
for f in glob.glob(pattern)
)
尚不清楚这样做能否显著带来改进,但可以用map()
函数来表示需要做的几乎任何事。
使用map_async()
、starmap_async()
和starmap_async()
等函数
函数map_async()
、starmap_async()
和starmap_async()
负责将任务分配给Pool
对象中的子进程,当子进程完成处理后便从子进程收集结果。这会导致子进程必须等待父进程来收集结果。函数_async()
的变体不会等待子进程完成。这些函数会返回一个可供查询的对象,用于从子进程中获取单独的结果。
使用map_async()
方法的一个变体如下:
import multiprocessing
pattern = "*.gz"
combined = Counter()
with multiprocessing.Pool() as workers:
results = workers.map_async(
analysis, glob.glob(pattern))
data = results.get()
for c in data:
combined.update(c)
这样就创建了一个Counter()
函数,用于合并进程池中每个worker的结果。我们基于可用的CPU创建了一个子进程池,并使用Pool
对象作为上下文管理器。然后将analysis()
函数映射至文件匹配模式中的每个文件。函数map_async()
返回的是一个MapResult
对象,可以借此查询结果和工作池的整体状态。这个例子使用了get()
方法来获取Couter
对象序列。
函数analysis()
生成的Counter
对象最终组合成了单个Counter
对象。这种聚合汇总了这些日志文件的所有信息。这种处理方式并不比上一个例子快,但使用map_async()
函数允许父进程在等待子进程完成的同时处理一些额外的工作。
更复杂的多进程架构
multiprocessing
包支持各种架构。我们可以创建跨多个服务器的多进程结构,并提供官方身份认证技术以设立必要的安全等级。可以使用队列和管道将对象从一个进程传递到另一个进程,也可以在进程间共享内存。还可以在进程之间共享底层锁,来同步对共享资源(如文件)访问。
这些架构中,大多数都涉及显式管理多个工作进程的状态,特别是使用锁和共享内存,它们在本质上是命令式的,并不适合函数式编程方法。
在一定程度上,可以用函数式的方法处理队列和管道。我们的目标是将设计分解为生产者函数和消费者函数。生产者函数可以创建对象并将它们插入队列中。消费者函数会从队列中提取对象并进行处理,它可能还会将中间结果放入另一个队列中。这样就创建了一个并发处理网络,其工作负载分布于不同的进程之间。使用pycsp
包可以简化进程间基于队列的消息交换。更多相关信息,请访问https://pypi.python.org/pypi/pycsp。
这种设计技术适用于复杂应用程序服务器的设计。各个子进程可以存在于服务器的整个生命周期中,并发处理各自的请求。
使用concurrent.futures
模块
除了multiprocessing
包,还可以利用concurrent.futures
模块。该模块也提供了一种将数据映射到并发的线程池或进程池的方法。该模块的API相对简单,并且在许多方面都与multiprocessing.Pool()
函数的接口类似。
它们之间的相似度如下所示:
from concurrent.futures import ProcessPoolExecutor
pool_size = 4
pattern = "*.gz"
combined = Counter()
with ProcessPoolExecutor(max_workers=pool_size) as workers:
for result in workers.map(analysis, glob.glob(pattern)):
combined.update(result)
这个例子和先前那些例子之间最主要的不同在于使用了一个concurrent.futures.ProcessPoolExecutor
对象的实例,而不是multiprocessing.Pool
方法。这里的基本设计模式是使用可用的工作池将analysis()
函数映射到文件名列表。将生成的Counter
对象合并来创建最终结果。
模块concurrent.futures
的性能与multiprocessing
模块几乎相同。
使用concurrent.futures
线程池
模块concurrent.futures
为应用程序提供了第二种执行器。可以创建并使用ThreadPoolExecutor
对象来代替concurrent.futures.ProcessPoolExecutor
对象,这会在单个进程内创建一个线程池。
线程池的语法与使用ProcessPoolExecutor
对象几乎一致,然而性能却相差很大。在这个分析日志文件的示例中,I/O占据了主要任务。由于进程中的所有线程都受到统一的操作系统调度限制,因此多线程日志文件分析的整体性能与串行处理日志文件的性能差不多。
使用示例中的日志文件和一台运行macOS X的小型四核笔记本电脑,下面的结果显示了共享I/O资源的线程与进程之间的性能差异。
- 使用
concurrent.futures
线程池,运行时间为168秒。 - 使用进程池,运行时间为68秒。
在这两种情况下,Pool
对象的大小都为4。单进程和单线程的基准运行时间为150秒,引入更多的线程反而拖慢了运行速度。对于执行大量输入和输出的程序来说,这是较为典型的结论。多线程可能更适于处理用户界面:线程长时间处于空闲状态,或者等待用户移动鼠标和触摸屏幕。
使用threading
模块和queue
模块
Python的threading
包有许多支持构建命令式应用程序的构造体。该模块的重点不在于编写函数式应用程序。我们可以利用queue
模块中线程安全的队列将对象从一个线程传递到另一个线程。
模块threading
中并没有简单的方法能将任务分配至各个线程,其API并不太适合函数式编程。
与multiprocessing
模块中更为原始的特性一样,可以尝试隐藏锁与队列的状态特性和命令特性。然而,使用concurrent.futures
模块中的ThreadPoolExecutor
方法似乎更简便。ProcessPoolExecutor.map()
方法有一个非常易用的接口,可用于并发处理集合元素。
使用map()
函数原语分配任务似乎很符合函数式编程的要求,因此要多关注concurrent.futures
模块,它最适于编写并发函数式应用程序。
设计并发处理
从函数式编程的角度看,可以使用以下3种方法将map()
函数的概念应用于数据项的并发处理:
multiprocessing.Pool
concurrent.futures.ProcessPoolExecutor
concurrent.futures.ThreadPoolExecutor
这些方法同我们与之交互的方式几乎相同,因为这3种方法都有一个map()
方法将某个函数应用于可迭代集合数据项,这与其他函数式编程技术非常相称。并发线程和并发进程因性质不同,性能也有所不同。
随着设计逐步完善,日志分析应用程序可分为以下两大块。
- 底层解析:几乎所有日志分析应用程序都可以使用的通用解析。
- 高层分析应用:针对具体应用程序需求的过滤和归约处理。
底层解析可以分解为如下4个阶段。
- 从多个源日志文件读取所有行数据。这是从文件名到行序列的
local_gzip()
映射。 - 通过文件集合中每一行日志条目创建的简单命名元组。这是从文本行到
Access
对象的access_iter()
映射。 - 解析更复杂字段(如日期和URL)的详细信息。这是从
Access
对象到AccessDetails
对象的access_detail_iter()
映射。 - 从日志中排除不感兴趣的路径名。也可以将其视作只接收感兴趣的路径名。与其说是映射操作,它更像是过滤器,一个捆绑成
path_filter()
函数的过滤器集合。
前面定义了一个完整的analysis()
函数来解析和分析给定的日志文件。该函数使用高层的过滤器和归约器来处理底层的解析结果,同时它也适用于使用通配符的文件集合。
考虑到所涉及的映射个数,有多种方法能将该问题分解为可以映射到线程池或进程池的任务。可以考虑将以下映射方法作为设计备选方案。
- 将
analysis()
函数映射至各个文件。本章示例统一使用了该方法。 - 从整个
analysis()
函数中重构出local_gzip()
函数,就可以将修改后的analysis()
函数映射到local_gzip()
函数的结果上。 - 从整个
analysis()
函数中重构出access_iter(local_gzip(pattern))
函数,就可以将修改后的analysis()
函数映射到可迭代的Access
对象序列上。 - 将
access_detail_iter(access-iter(local_gzip(pattern)))
函数重构为单独的迭代函数,然后将path_filter()
函数与高层的过滤器和归约器映射到可迭代的AccessDetail
对象序列上。 - 还可以将底层解析重构为与高层分析相分离的函数,将分析过滤器和归约器映射到底层解析的输出上。
所有这些用于重构示例应用程序的方法都相对比较简单。运用函数式编程技术的好处是整个过程中的每个部分都可以定义为一个映射。这在考察不同架构以确定最佳设计时非常实用。
但是在这种情况下,需要将I/O处理尽量分配给更多的CPU或计算核心。大部分可行的重构方法都会在父进程中执行所有I/O,而只将计算部分分配给多个并发进程,所以不会带来什么益处,因此采用尽可能将I/O分配到更多计算核心的映射方法。
最小化进程之间传递的数据量通常是很重要的。在这个例子中,我们只提供给每个工作进程以简短的文件名字符串。相比压缩每个日志文件得到的10MB数据详情,生成的Counter
对象要小得多。通过剔除只出现一次的数据项,或者限制应用程序只保留频率最高的20项,便可以进一步减小每个Counter
对象。
可以自由地重新组织这个应用程序的设计并不意味着应该这么做。我们可以运行一些基准测试实验来验证猜测,即日志文件的解析主要是由读取文件的时间决定的。