RxPY 使用 Observables 进行工作

RxPY 使用 Observables 进行工作

下面提到的主题将在本章中进行详细学习。

  • 创建 Observables

  • 订阅并执行 Observable

创建 observables

要创建 observable,我们将使用 create() 方法,并将函数传递给它,该函数具有以下内容。

  • on_next() - 当 Observable 发出一个项时,调用该函数。

  • on_completed() - 当 Observable 完成时,调用该函数。

  • on_error() - 当 Observable 发生错误时,调用该函数。

要使用 create() 方法,首先按如下所示导入该方法 –

from rx import create

这里是一个工作示例,用于创建一个可观察对象 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

订阅并执行一个Observable

要订阅一个Observable,我们需要使用subscribe()函数,并传递on_next、on_error和on_completed的回调函数。

下面是一个工作示例:

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe()方法负责执行可观察对象。必须将回调函数 on_nexton_error 以及 on_completed 传递给subscribe方法。而subscribe方法会执行test_observable()函数。

不强制要求将所有三个回调函数传递给subscribe()方法。可以根据需要传递on_next()、on_error()和on_completed()。

lambda函数用于on_next、on_error和on_completed。它将接收参数并执行给定的表达式。

下面是创建的可观察对象的输出−

E:\pyrx>python testrx.py
Got - Hello
Job Done!

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程