python starmap 如何监控进度

python starmap 如何监控进度

python starmap 如何监控进度

在使用 Python 进行并行计算时,我们经常会使用 multiprocessing 模块中的 Pool 类,其中的 starmap 函数可以方便地将任务分配给多个进程并行执行。但是,当任务较为耗时时,我们希望能够监控任务的执行进度,以及输出一些有关任务进度的信息。

本文将介绍如何使用 starmap 函数进行并行计算,并同时实现对任务进度的监控和输出。

1. 导入模块和定义任务函数

首先,我们需要导入 multiprocessing 模块和其他可能需要用到的模块,同时定义任务函数 task_func。以下是一个简单的示例任务函数:

import time

def task_func(a, b):
    # 模拟耗时任务
    time.sleep(1)
    return a + b
Python

2. 定义进度监控函数

我们可以定义一个函数 progress_monitor 来监控任务的执行进度并输出相应信息。该函数接受三个参数:已完成任务数量、总任务数量和开始执行任务的时间。

import time

def progress_monitor(num_done, total_num, start_time):
    elapsed_time = time.time() - start_time
    progress = num_done / total_num * 100
    print(f"任务进度:{num_done}/{total_num},已完成 {progress:.2f}%,已用时 {elapsed_time:.2f} 秒")
Python

在该函数中,我们通过计算已完成任务数量和总任务数量的比例来得到任务的完成进度。同时也计算了从开始执行任务到现在的总用时,以便后续输出。

3. 使用 starmap 进行并行计算

接下来,我们可以使用 Pool 类和 starmap 函数来对任务进行并行计算。以下是一个使用 starmap 的示例:

import multiprocessing

pool = multiprocessing.Pool()

# 定义任务参数列表
task_args = [(1, 2), (3, 4), (5, 6), (7, 8)]

start_time = time.time()

# 使用 starmap 并行执行任务
results = pool.starmap(task_func, task_args)

# 输出结果
print(results)

end_time = time.time()

# 计算并输出总耗时
total_time = end_time - start_time
print(f"所有任务已完成,总耗时:{total_time:.2f} 秒")
Python

在以上示例中,我们首先创建了一个 Pool 对象,然后定义了一个包含多个任务参数的列表 task_args,其中每个任务参数都是一个元组。在使用 starmap 函数时,将任务函数 task_func 和任务参数列表 task_args 作为参数传入,就可以实现并行计算。

需要注意的是,虽然我们可以直接在进程池中使用 starmap 函数来执行任务,但我们并没有使用类似 for result in results 的循环来迭代结果,因为这样会阻塞主线程,并且无法在执行任务的同时监控任务进度。

4. 监控任务进度并输出

在以上示例中,我们已经预先定义了进度监控函数 progress_monitor。为了能够在任务执行过程中不断输出任务进度,我们需要对 starmap 进行一些修改。

首先,我们可以使用 partial 函数先固定住 task_args 作为 starmap 的第二个参数,这样我们在调用 starmap 时可以只传入任务函数 task_func,从而方便后续修改。

from functools import partial

# 使用 partial 函数固定住 task_args
partial_starmap = partial(pool.starmap, task_args=task_args)

# 使用 partial_starmap 并行执行任务
results = partial_starmap(task_func)
Python

接下来,我们可以通过在任务函数中调用进度监控函数来实现任务进度的监控和输出。具体地,在任务函数的开始和结束前分别调用 progress_monitor 函数,并传入相应的参数。

def task_func(a, b):
    # 调用进度监控函数,输出任务开始信息
    progress_monitor(0, len(task_args), start_time)

    # 模拟耗时任务
    time.sleep(1)

    # 调用进度监控函数,输出任务完成信息
    progress_monitor(1, len(task_args), start_time)

    return a + b
Python

在上述代码中,我们在任务函数开始之前调用 progress_monitor,并传入已完成任务数量为 0,总任务数量为 len(task_args),以及任务开始时间。在任务函数结束之前,我们再次调用 progress_monitor,并传入已完成任务数量为 1,总任务数量不变,仍为 len(task_args),以及任务开始时间。

这样,当每个任务开始时,都会输出任务开始信息;当每个任务结束时,都会输出任务完成信息,包括任务进度、总用时等。

5. 完整示例代码及输出

以下是完整的示例代码,包括导入模块、定义任务函数、定义进度监控函数、使用 starmap 进行并行计算等部分。

import multiprocessing
from functools import partial
import time

def task_func(a, b):
    # 调用进度监控函数,输出任务开始信息
    progress_monitor(0, len(task_args), start_time)

    # 模拟耗时任务
    time.sleep(1)

    # 调用进度监控函数,输出任务完成信息
    progress_monitor(1, len(task_args), start_time)

    return a + b

def progress_monitor(num_done, total_num, start_time):
    elapsed_time = time.time() - start_time
    progress = num_done / total_num * 100
    print(f"任务进度:{num_done}/{total_num},已完成 {progress:.2f}%,已用时 {elapsed_time:.2f} 秒")

if __name__ == '__main__':
    pool = multiprocessing.Pool()

    # 定义任务参数列表
    task_args = [(1, 2), (3, 4), (5, 6), (7, 8)]

    start_time = time.time()

    # 使用 partial 函数固定住 task_args
    partial_starmap = partial(pool.starmap, task_args=task_args)

    # 使用 partial_starmap 并行执行任务
    results = partial_starmap(task_func)

    # 输出结果
    print(results)

    end_time = time.time()

    # 计算并输出总耗时
    total_time = end_time - start_time
    print(f"所有任务已完成,总耗时:{total_time:.2f} 秒")
Python

以下是上述代码运行的示例输出:

任务进度:0/4,已完成 0.00%,已用时 0.00 
任务进度:1/4,已完成 25.00%,已用时 1.00 
任务进度:2/4,已完成 50.00%,已用时 2.00 
任务进度:3/4,已完成 75.00%,已用时 3.00 
任务进度:4/4,已完成 100.00%,已用时 4.00 
[3, 7, 11, 15]
所有任务已完成,总耗时:4.00 
Python

以上输出展示了每个任务开始时的进度信息,以及所有任务完成时的总耗时信息。根据进度信息,我们可以清楚地了解每个任务的执行进度,并且总耗时也得到了准确的统计。

这样,我们就成功地实现了使用 starmap 进行并行计算,并同时监控任务进度和输出相关信息的功能。通过这种方式,我们可以更好地了解任务执行的情况,并对任务的整体进展进行掌控。

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程

登录

注册