Python concurrent.futures map

Python concurrent.futures map

在Python中,concurrent.futures模块提供了一种简单且高效的方式来实现并发编程。其中的map方法可以让我们方便地并行执行一个函数,将函数应用到一个可迭代对象的每个元素上,并返回结果。

使用map方法并发执行函数

首先,我们需要导入concurrent.futures模块,并创建一个ThreadPoolExecutor对象。然后,我们可以使用map方法来并发执行一个函数。

import concurrent.futures

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(square, numbers)

for result in results:
    print(result)

Output:

Python concurrent.futures map

在上面的示例中,我们定义了一个square函数,然后创建了一个包含一些数字的列表numbers。我们使用ThreadPoolExecutor创建了一个线程池,并调用map方法并发执行square函数。最后,我们打印出每个元素的平方值。

控制并发度

map方法还可以接受一个max_workers参数,用来控制并发度,即同时执行的线程数。

import concurrent.futures

def cube(x):
    return x * x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    results = executor.map(cube, numbers)

for result in results:
    print(result)

Output:

Python concurrent.futures map

在上面的示例中,我们将max_workers参数设置为2,这样最多只会有两个线程同时执行。这可以帮助我们控制并发度,避免资源过度占用。

异常处理

在并发执行函数时,有时候我们需要处理函数可能抛出的异常。map方法会将异常捕获并存储在result中,我们可以在迭代结果时检查异常。

import concurrent.futures

def divide(x):
    return 10 / x

numbers = [1, 2, 0, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(divide, numbers)

for result in results:
    try:
        print(result)
    except ZeroDivisionError:
        print("Error: division by zero")

在上面的示例中,我们定义了一个divide函数,其中有一个除零的操作。当除数为0时,会抛出ZeroDivisionError异常。我们在迭代结果时,使用try-except语句来捕获异常并处理。

使用多个参数

map方法还支持函数接受多个参数的情况。我们可以将多个可迭代对象传递给map方法,然后在函数中按顺序接收参数。

import concurrent.futures

def add(x, y):
    return x + y

numbers1 = [1, 2, 3, 4, 5]
numbers2 = [5, 4, 3, 2, 1]

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(add, numbers1, numbers2)

for result in results:
    print(result)

Output:

Python concurrent.futures map

在上面的示例中,我们定义了一个add函数,接受两个参数并返回它们的和。我们将两个包含相同长度的列表传递给map方法,然后在函数中按顺序接收参数。

使用lambda表达式

除了定义函数外,我们还可以使用lambda表达式来传递函数给map方法。

import concurrent.futures

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(lambda x: x * 2, numbers)

for result in results:
    print(result)

Output:

Python concurrent.futures map

在上面的示例中,我们使用lambda表达式定义了一个函数,将每个元素乘以2。然后将这个lambda函数传递给map方法,并并发执行。

使用ProcessPoolExecutor

除了ThreadPoolExecutor外,concurrent.futures模块还提供了ProcessPoolExecutor类,可以使用多进程来执行函数。

import concurrent.futures

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(square, numbers)

for result in results:
    print(result)

在上面的示例中,我们使用ProcessPoolExecutor创建了一个进程池,并调用map方法并发执行square函数。这样可以充分利用多核处理器的性能。

使用wait方法

除了map方法外,concurrent.futures模块还提供了wait方法,可以等待所有任务完成。

import concurrent.futures

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(square, num) for num in numbers]
    results = [future.result() for future in concurrent.futures.as_completed(futures)]

for result in results:
    print(result)

Output:

Python concurrent.futures map

在上面的示例中,我们使用submit方法提交每个任务,并将返回的Future对象存储在列表中。然后使用as_completed函数等待所有任务完成,并获取结果。

使用Executor.map方法

Executor对象还提供了map方法,可以直接在Executor对象上调用map方法,并发执行函数。

import concurrent.futures

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = list(executor.map(square, numbers))

for result in results:
    print(result)

Output:

Python concurrent.futures map

在上面的示例中,我们直接在ThreadPoolExecutor对象上调用map方法,并发执行square函数。最后将结果存储在列表中,并打印出来。

使用concurrent.futures.wait方法

concurrent.futures模块还提供了wait方法,可以等待一组Future对象完成。

import concurrent.futures

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(square, num) for num in numbers]
    done, not_done = concurrent.futures.wait(futures)

for future in done:
    print(future.result())

Output:

Python concurrent.futures map

在上面的示例中,我们使用submit方法提交每个任务,并将返回的Future对象存储在列表中。然后使用wait方法等待所有任务完成,并打印出结果。

使用concurrent.futures.as_completed方法

concurrent.futures模块还提供了as_completed方法,可以在Future对象完成时获取结果。

import concurrent.futures

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(square, num) for num in numbers]

for future in concurrent.futures.as_completed(futures):
    print(future.result())

Output:

Python concurrent.futures map

在上面的示例中,我们使用submit方法提交每个任务,并将返回的Future对象存储在列表中。然后使用as_completed方法在Future对象完成时获取结果并打印出来。

使用concurrent.futures.Future对象

concurrent.futures模块中的Future对象表示一个异步操作的结果。我们可以手动创建Future对象,并设置其结果。

import concurrent.futures

def square(x):
    return x * x

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = concurrent.futures.Future()
    future.set_result(square(5))

print(future.result())

Output:

Python concurrent.futures map

在上面的示例中,我们手动创建了一个Future对象,并使用set_result方法设置其结果为square(5)的返回值。然后我们可以通过result方法获取结果并打印出来。

使用concurrent.futures.wait方法超时

wait方法还支持设置超时时间,可以等待一组Future对象完成,如果超时则返回未完成的Future对象。

import concurrent.futures
import time

def square(x):
    time.sleep(2)
    return x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(square, num) for num in numbers]
    done, not_done = concurrent.futures.wait(futures, timeout=3)

for future in done:
    print(future.result())

for future in not_done:
    print("Task did not complete in time")

Output:

Python concurrent.futures map

在上面的示例中,我们定义了一个square函数,其中有一个2秒的延迟。我们使用wait方法等待所有任务完成,设置超时时间为3秒。如果任务在超时时间内未完成,则会打印出相应的提示信息。

使用concurrent.futures.ThreadPoolExecutorshutdown方法

在使用ThreadPoolExecutor完成任务后,我们可以调用shutdown方法关闭线程池。

import concurrent.futures

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(square, numbers)

for result in results:
    print(result)

executor.shutdown()

Output:

Python concurrent.futures map

在上面的示例中,我们在ThreadPoolExecutor对象执行完任务后调用了shutdown方法,关闭了线程池。这样可以释放资源并结束程序。

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程