RxPY 使用调度器实现并发
RxPy的一个重要特性是并发,即允许任务并行执行。为了实现这一点,我们有两个操作符subscribe_on()和observe_on(),它们将与一个调度器一起工作,该调度器将决定被订阅任务的执行方式。
这里是一个工作示例,展示了使用subscribe_on()、observe_on()和调度器的需求。
示例
在上面的示例中,我有两个任务:任务1和任务2。任务的执行顺序是顺序执行的。第二个任务只有在第一个任务完成后才开始。
输出
RxPy支持多种调度器,在这里,我们将使用ThreadPoolScheduler。ThreadPoolScheduler主要尝试根据可用的CPU线程进行管理。
在之前的示例中,我们将使用multiprocessing模块来获取cpu_count。计数将被传递给ThreadPoolScheduler,它将基于可用的线程并行处理任务。
下面是一个实例:
在上面的示例中,我有2个任务,cpu_count是4。由于任务数是2,可用的线程数是4,所以这两个任务可以并行开始。
输出
如果你看到输出,那么两个任务都是并行开始的。
现在,考虑一个情景,即任务数多于CPU数量,如CPU数量为4而任务为5。在这种情况下,我们需要检查任务完成后是否有任何线程空闲,以便将其分配给队列中的新任务。
为此,我们可以使用observe_on()操作符来观察调度程序是否有任何空闲线程。下面是一个使用observe_on()的工作示例:
示例
输出
如果您看到输出,一旦任务4完成,线程就会转移到下一个任务,即任务5,并开始执行。