欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > Python的并行任务(进程池、线程池)

Python的并行任务(进程池、线程池)

2024/10/24 2:01:06 来源:https://blog.csdn.net/qq_39065491/article/details/140935227  浏览:    关键词:Python的并行任务(进程池、线程池)

python的并行任务(进程池、线程池)

在Python中,进程(Process)和线程(Thread)是并发编程的两种主要方式,它们各自适用于不同的场景。了解何时使用进程或线程,可以帮助你更有效地设计并发程序。
使用内置基本库concurrent.futures来实现并发,简单使用这个模块,包括并行线程和并行进程执行器 。

进程(Process)适用场景:

  • CPU密集型任务:当任务主要是计算密集型时,使用进程通常比线程更有效。因为Python的全局解释器锁(GIL)限制了同一时刻只有一个线程可以执行Python字节码。对于CPU密集型任务,使用多进程可以绕过GIL的限制,充分利用多核CPU的计算能力。
  • 需要隔离的应用:如果程序的不同部分需要高度的隔离性(例如,不同的内存空间、文件描述符等),使用进程是更好的选择。进程之间的通信(IPC)虽然比线程间通信(如共享内存)复杂,但提供了更高的安全性。
  • 跨平台兼容性:虽然Python的线程在大多数平台上都能很好地工作,但在某些平台上(特别是Windows),线程的行为可能与预期不同。在这些情况下,使用进程可能是一个更可靠的解决方案。

线程(Thread)适用场景:

  • I/O密集型任务:当任务主要是等待I/O操作(如文件读写、网络请求等)完成时,使用线程是更合适的选择。因为线程之间的切换成本较低,可以更有效地利用等待时间。
  • 需要快速响应的应用:对于需要快速响应用户输入或网络请求的应用,使用线程可以更快地处理这些请求,因为线程之间的切换比进程快得多。
  • 共享数据:如果多个任务需要频繁地访问和修改共享数据,使用线程可能更方便,因为线程可以共享进程的内存空间。然而,这也需要小心处理数据同步和竞争条件的问题。

注意:

  • 全局解释器锁(GIL):Python的GIL限制了同一时刻只有一个线程可以执行Python字节码。这意味着,对于CPU密集型任务,使用多线程可能不会带来性能上的提升。然而,对于I/O密集型任务,多线程仍然可以显著提高程序的效率。
  • 线程和进程的选择:在选择使用线程还是进程时,需要综合考虑任务的性质、系统的资源以及程序的复杂度。对于简单的I/O密集型任务,线程可能是更好的选择;而对于复杂的、需要高度隔离的或CPU密集型任务,进程可能更合适。
  • 并发库:Python提供了多种并发编程的库,如threading(用于线程)、multiprocessing(用于进程)以及concurrent.futures(提供了更高级的并发执行框架)。你可以根据具体需求选择合适的库来实现你的并发程序。

1. 并行线程ThreadPoolExecutor

import concurrent.futures
import numpy as np  
import time  # 定义矩阵计算  
def complex_matrix_operation(n, iterations):  """  执行复杂矩阵运算来测试CPU性能。  参数:  n (int): 矩阵的大小,即n x n的矩阵。  iterations (int): 重复矩阵运算的次数。  返回:  float: 最终矩阵的迹(对角线元素之和)。  """  # 初始化随机矩阵  A = np.random.rand(n, n)  B = np.random.rand(n, n)  # 开始计时  start_time = time.time()  # 执行多次矩阵乘法  result = np.eye(n)  # 初始化为对角线矩阵  for _ in range(iterations): # 矩阵乘积result = np.dot(result, np.dot(A, B))  # 计算并返回迹  trace = np.trace(result)  # 结束计时  end_time = time.time()  print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")  return trace  if __name__ == '__main__':# 调用函数  n = 16  # 最大矩阵大小  iterations = 16  # 迭代次数  # 开始时间start_time = time.time()  # 创建线程池执行器 最多16线程with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:# 提交任务到线程池futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]# 使用wait函数等待所有任务完成# done, not_done = concurrent.futures.wait(futures, timeout=None)# for future in done:#     result = future.result()#     print('wait:',result)# # 使用as_completed函数按照完成顺序获取结果# for future in concurrent.futures.as_completed(futures):#     result = future.result()#     print('completed:',result)# 结束计时  end_time = time.time()  print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")  

执行结果:

计算完成 0,耗时: 0.000437 秒
计算完成 1,耗时: 0.001399 秒
计算完成 2,耗时: 0.001689 秒
计算完成 4,耗时: 0.005548 秒
计算完成 3,耗时: 0.006699 秒
计算完成 6,耗时: 0.000553 秒
计算完成 5,耗时: 0.010176 秒
计算完成 10,耗时: 0.000951 秒
计算完成 9,耗时: 0.000789 秒
计算完成 11,耗时: 0.001857 秒
计算完成 7,耗时: 0.006189 秒
计算完成 15,耗时: 0.004257 秒
计算完成 13,耗时: 0.004155 秒
计算完成 14,耗时: 0.007222 秒
计算完成 12,耗时: 0.006338 秒
计算完成 8,耗时: 0.007463 秒
计算完成,总共耗时: 0.030619

从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。

2. 并行进程ProcessPoolExecutor

import concurrent.futures
import numpy as np  
import time  # 定义矩阵计算  
def complex_matrix_operation(n, iterations):  """  执行复杂矩阵运算来测试CPU性能。  参数:  n (int): 矩阵的大小,即n x n的矩阵。  iterations (int): 重复矩阵运算的次数。  返回:  float: 最终矩阵的迹(对角线元素之和)。  """  # 初始化随机矩阵  A = np.random.rand(n, n)  B = np.random.rand(n, n)  # 开始计时  start_time = time.time()  # 执行多次矩阵乘法  result = np.eye(n)  # 初始化为对角线矩阵  for _ in range(iterations): # 矩阵乘积result = np.dot(result, np.dot(A, B))  # 计算并返回迹  trace = np.trace(result)  # 结束计时  end_time = time.time()  print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")  return trace  if __name__ == '__main__':# 调用函数  n = 16  # 最大矩阵大小  iterations = 16  # 迭代次数  # 开始时间start_time = time.time()  # 创建线程池执行器 最多16进程with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:# 提交任务到线程池futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]# 使用wait函数等待所有任务完成# done, not_done = concurrent.futures.wait(futures, timeout=None)# for future in done:#     result = future.result()#     print('wait:',result)# # 使用as_completed函数按照完成顺序获取结果# for future in concurrent.futures.as_completed(futures):#     result = future.result()#     print('completed:',result)# 结束计时  end_time = time.time()  print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")  

执行结果:

计算完成 1,耗时: 0.000953 秒计算完成 2,耗时: 0.000691 秒
计算完成 0,耗时: 0.000837 秒计算完成 3,耗时: 0.000993 秒
计算完成 6,耗时: 0.001096 秒计算完成 8,耗时: 0.000336 秒计算完成 7,耗时: 0.000955 秒计算完成 4,耗时: 0.000751 秒计算完成 9,耗时: 0.000916 秒计算完成 13,耗时: 0.000758 秒计算完成 12,耗时: 0.001723 秒计算完成 11,耗时: 0.000999 秒计算完成 10,耗时: 0.001201 秒计算完成 14,耗时: 0.001045 秒计算完成 5,耗时: 0.001212 秒计算完成 15,耗时: 0.000172 秒计算完成,总共耗时: 0.432435

从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。
进程用时比线程更多 ,如果把n调到256 ,差距更大。
ThreadPoolExecutor

计算完成,总共耗时: 4.962590

ProcessPoolExecutor

计算完成,总共耗时: 8.072993

3.concurrent.futures 模块

concurrent.futures模块除了提供ThreadPoolExecutor和ProcessPoolExecutor之外,还提供了一些其他的函数来执行异步任务和处理结果。以下是一些常用的concurrent.futures模块函数:

(1)concurrent.futures.as_completed

concurrent.futures.as_completed(fs, timeout=None):
  • 接收一个可迭代的Future对象集合fs,返回一个生成器,在每个Future对象完成时产生结果。
  • 可选地指定timeout参数,用于限制等待结果的最长时间。

(2)concurrent.futures.wait

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):
  • 接收一个可迭代的Future对象集合fs,等待所有的Future对象完成。
  • 可选地指定timeout参数,用于限制等待结果的最长时间。
  • 可选地指定return_when参数,用于指定何时返回结果,可选值包括FIRST_COMPLETED、FIRST_EXCEPTION和ALL_COMPLETED。

4.Executor 基类

concurrent.futures模块中的Executor是一个抽象基类,用于表示执行器对象。它定义了一些共同的方法和行为,用于管理并发执行的任务。但要通过它的子类调用,而不是直接调用。

Executor类并不直接实例化,而是通过具体的子类如ThreadPoolExecutor和ProcessPoolExecutor来创建实例。

常用的Executor类方法:

(1)submit(fn, *args, **kwargs): 提交一个可调用对象和它的参数给执行器,返回一个Future对象,表示该任务的未来结果。

(2)map(fn, *iterables, timeout=None): 批量提交任务,并按原始迭代器的顺序返回结果。它类似于内置函数map(),但是可以异步地并发执行任务。

(3)shutdown(wait=True): 关闭执行器,不再接受新的任务。如果wait参数为True(默认值),则在所有任务完成后再关闭执行器。

(4)submit_to_executor(fn, executor, *args, **kwargs): 将任务提交给指定的执行器对象,并返回一个Future对象。

(5)map_to_executor(fn, executor, *iterables, timeout=None): 将任务批量提交给指定的执行器对象,并返回结果。

这些方法使得在执行任务时更加方便和灵活。可以根据具体的需求选择合适的方法和执行器类型。

5.Future 对象

concurrent.futures.Future是concurrent.futures模块中的一个类,用于表示一个异步任务的未来结果。它将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。

以下是concurrent.futures.Future的一些常用方法:

(1)result(): 等待并返回异步任务的结果。如果任务尚未完成,该方法会阻塞直到任务完成并返回结果。

(2)done(): 判断异步任务是否已经完成,返回布尔值。

(3)cancel(): 取消异步任务的执行。如果任务已经开始执行或已经完成,则无法取消。

(4)add_done_callback(fn): 添加一个回调函数,当异步任务完成时会调用该回调函数。

参考文章

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com