Python concurrent.futures map
在Python中,concurrent.futures模块提供了一种简单且高效的方式来实现并发编程。其中的map
方法可以让我们方便地并行执行一个函数,将函数应用到一个可迭代对象的每个元素上,并返回结果。
使用map
方法并发执行函数
首先,我们需要导入concurrent.futures
模块,并创建一个ThreadPoolExecutor
对象。然后,我们可以使用map
方法来并发执行一个函数。
import concurrent.futures
def square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(square, numbers)
for result in results:
print(result)
Output:
在上面的示例中,我们定义了一个square
函数,然后创建了一个包含一些数字的列表numbers
。我们使用ThreadPoolExecutor
创建了一个线程池,并调用map
方法并发执行square
函数。最后,我们打印出每个元素的平方值。
控制并发度
map
方法还可以接受一个max_workers
参数,用来控制并发度,即同时执行的线程数。
import concurrent.futures
def cube(x):
return x * x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(cube, numbers)
for result in results:
print(result)
Output:
在上面的示例中,我们将max_workers
参数设置为2,这样最多只会有两个线程同时执行。这可以帮助我们控制并发度,避免资源过度占用。
异常处理
在并发执行函数时,有时候我们需要处理函数可能抛出的异常。map
方法会将异常捕获并存储在result
中,我们可以在迭代结果时检查异常。
import concurrent.futures
def divide(x):
return 10 / x
numbers = [1, 2, 0, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(divide, numbers)
for result in results:
try:
print(result)
except ZeroDivisionError:
print("Error: division by zero")
在上面的示例中,我们定义了一个divide
函数,其中有一个除零的操作。当除数为0时,会抛出ZeroDivisionError
异常。我们在迭代结果时,使用try-except
语句来捕获异常并处理。
使用多个参数
map
方法还支持函数接受多个参数的情况。我们可以将多个可迭代对象传递给map
方法,然后在函数中按顺序接收参数。
import concurrent.futures
def add(x, y):
return x + y
numbers1 = [1, 2, 3, 4, 5]
numbers2 = [5, 4, 3, 2, 1]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(add, numbers1, numbers2)
for result in results:
print(result)
Output:
在上面的示例中,我们定义了一个add
函数,接受两个参数并返回它们的和。我们将两个包含相同长度的列表传递给map
方法,然后在函数中按顺序接收参数。
使用lambda表达式
除了定义函数外,我们还可以使用lambda表达式来传递函数给map
方法。
import concurrent.futures
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(lambda x: x * 2, numbers)
for result in results:
print(result)
Output:
在上面的示例中,我们使用lambda表达式定义了一个函数,将每个元素乘以2。然后将这个lambda函数传递给map
方法,并并发执行。
使用ProcessPoolExecutor
除了ThreadPoolExecutor
外,concurrent.futures
模块还提供了ProcessPoolExecutor
类,可以使用多进程来执行函数。
import concurrent.futures
def square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(square, numbers)
for result in results:
print(result)
在上面的示例中,我们使用ProcessPoolExecutor
创建了一个进程池,并调用map
方法并发执行square
函数。这样可以充分利用多核处理器的性能。
使用wait
方法
除了map
方法外,concurrent.futures
模块还提供了wait
方法,可以等待所有任务完成。
import concurrent.futures
def square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(square, num) for num in numbers]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
for result in results:
print(result)
Output:
在上面的示例中,我们使用submit
方法提交每个任务,并将返回的Future
对象存储在列表中。然后使用as_completed
函数等待所有任务完成,并获取结果。
使用Executor.map
方法
Executor
对象还提供了map
方法,可以直接在Executor
对象上调用map
方法,并发执行函数。
import concurrent.futures
def square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(square, numbers))
for result in results:
print(result)
Output:
在上面的示例中,我们直接在ThreadPoolExecutor
对象上调用map
方法,并发执行square
函数。最后将结果存储在列表中,并打印出来。
使用concurrent.futures.wait
方法
concurrent.futures
模块还提供了wait
方法,可以等待一组Future
对象完成。
import concurrent.futures
def square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(square, num) for num in numbers]
done, not_done = concurrent.futures.wait(futures)
for future in done:
print(future.result())
Output:
在上面的示例中,我们使用submit
方法提交每个任务,并将返回的Future
对象存储在列表中。然后使用wait
方法等待所有任务完成,并打印出结果。
使用concurrent.futures.as_completed
方法
concurrent.futures
模块还提供了as_completed
方法,可以在Future
对象完成时获取结果。
import concurrent.futures
def square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(square, num) for num in numbers]
for future in concurrent.futures.as_completed(futures):
print(future.result())
Output:
在上面的示例中,我们使用submit
方法提交每个任务,并将返回的Future
对象存储在列表中。然后使用as_completed
方法在Future
对象完成时获取结果并打印出来。
使用concurrent.futures.Future
对象
concurrent.futures
模块中的Future
对象表示一个异步操作的结果。我们可以手动创建Future
对象,并设置其结果。
import concurrent.futures
def square(x):
return x * x
with concurrent.futures.ThreadPoolExecutor() as executor:
future = concurrent.futures.Future()
future.set_result(square(5))
print(future.result())
Output:
在上面的示例中,我们手动创建了一个Future
对象,并使用set_result
方法设置其结果为square(5)
的返回值。然后我们可以通过result
方法获取结果并打印出来。
使用concurrent.futures.wait
方法超时
wait
方法还支持设置超时时间,可以等待一组Future
对象完成,如果超时则返回未完成的Future
对象。
import concurrent.futures
import time
def square(x):
time.sleep(2)
return x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(square, num) for num in numbers]
done, not_done = concurrent.futures.wait(futures, timeout=3)
for future in done:
print(future.result())
for future in not_done:
print("Task did not complete in time")
Output:
在上面的示例中,我们定义了一个square
函数,其中有一个2秒的延迟。我们使用wait
方法等待所有任务完成,设置超时时间为3秒。如果任务在超时时间内未完成,则会打印出相应的提示信息。
使用concurrent.futures.ThreadPoolExecutor
的shutdown
方法
在使用ThreadPoolExecutor
完成任务后,我们可以调用shutdown
方法关闭线程池。
import concurrent.futures
def square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(square, numbers)
for result in results:
print(result)
executor.shutdown()
Output:
在上面的示例中,我们在ThreadPoolExecutor
对象执行完任务后调用了shutdown
方法,关闭了线程池。这样可以释放资源并结束程序。