RxPY – 使用Scheduler的并发性
RxPy的一个重要特征是并发性,即允许任务并行执行。为了实现这一点,我们有两个操作符subscribe_on()和observe_on(),它们将与一个调度器一起工作,该调度器将决定订阅任务的执行。
下面是一个工作实例,它显示了对subscibe_on()、observe_on()和调度器的需求。
例子
在上述例子中,我有2个任务:任务1和任务2。任务的执行是有顺序的。当第一个任务完成后,第二个任务才开始。
输出
RxPy支持许多调度器,在这里,我们将使用ThreadPoolScheduler。ThreadPoolScheduler主要尝试管理可用的CPU线程。
在前面的例子中,我们将使用一个多处理模块,它将为我们提供cpu_count。这个数字将被交给ThreadPoolScheduler,它将根据可用的线程来管理任务的并行工作。
下面是一个工作实例
在上面的例子中,我有2个任务,cpu_count是4,由于任务是2,而我们可用的线程是4,所以两个任务可以并行启动。
输出
如果你看到输出结果,这两个任务都是平行启动的。
现在,考虑一种情况,即任务多于CPU数量,即CPU数量为4,任务为5。在这种情况下,我们需要检查是否有任何线程在任务完成后有空闲,这样,它就可以被分配给队列中的新任务。
为此,我们可以使用observe_on()操作符,它将观察调度器是否有线程空闲。下面是一个使用observe_on()的工作例子
例子
输出
如果你看到输出,在任务4完成的那一刻,线程被交给了下一个任务,即任务5,同样开始执行。