RxPY 使用 Observables 进行工作
下面提到的主题将在本章中进行详细学习。
- 创建 Observables
-
订阅并执行 Observable
创建 observables
要创建 observable,我们将使用 create() 方法,并将函数传递给它,该函数具有以下内容。
- on_next() - 当 Observable 发出一个项时,调用该函数。
-
on_completed() - 当 Observable 完成时,调用该函数。
-
on_error() - 当 Observable 发生错误时,调用该函数。
要使用 create() 方法,首先按如下所示导入该方法 –
from rx import create
这里是一个工作示例,用于创建一个可观察对象 −
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error")
observer.on_completed()
source = create(test_observable).
订阅并执行一个Observable
要订阅一个Observable,我们需要使用subscribe()函数,并传递on_next、on_error和on_completed的回调函数。
下面是一个工作示例:
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
subscribe()方法负责执行可观察对象。必须将回调函数 on_next 、 on_error 以及 on_completed 传递给subscribe方法。而subscribe方法会执行test_observable()函数。
不强制要求将所有三个回调函数传递给subscribe()方法。可以根据需要传递on_next()、on_error()和on_completed()。
lambda函数用于on_next、on_error和on_completed。它将接收参数并执行给定的表达式。
下面是创建的可观察对象的输出−
E:\pyrx>python testrx.py
Got - Hello
Job Done!