RxPY – 与主题一起工作
主体是一个可观察的序列,也是一个可以组播的观察者,即与许多已订阅的观察者交谈。
我们将讨论以下关于主题的话题 —
- 创建一个主题
- 订阅一个主题
- 将数据传递给主体
- 行为主体(BehaviorSubject)
- 回放主体
- 异步主体
创建一个主题
为了处理一个主题,我们需要导入主题,如下图所示
from rx.subject import Subject
你可以按以下方式创建一个主客体-
subject_test = Subject()
该对象是一个观察者,它有三个方法 –
- on_next(value)
- on_error(error)和
- on_completed()。
订阅一个主题
你可以在主题上创建多个订阅,如下图所示
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
传递数据给主题
你可以使用on_next(value)方法向创建的主题传递数据,如下所示
subject_test.on_next("A")
subject_test.on_next("B")
这些数据将被传递给所有的订阅,添加到主题上。
这里,是一个关于主题的工作实例。
例子
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")
subject_test 对象是通过调用 Subject() 创建的。subject_test对象有对on_next(value)、on_error(error)和on_completed()方法的引用。上述例子的输出如下所示
输出
E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B
我们可以使用on_completed()方法,来停止主题的执行,如下图所示。
例子
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
一旦我们调用complete,后面调用的下一个方法就不会被调用。
输出
E:\pyrx>python testrx.py
The value is A
The value is A
现在让我们看看,如何调用on_error(error)方法。
例子
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))
输出
E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!
BehaviorSubject
BehaviorSubject在被调用时将给你最新的值。你可以创建行为主体,如下图所示
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
这里,是一个使用行为主体的工作实例
例子
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")
输出
E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject
Repaysubject
Repaysubject类似于行为主体,它可以缓冲数值并将其重放给新的订阅者。这里,是一个重放主体的工作实例。
例子
from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)
在重放主题上使用的缓冲值是2。因此,最后两个值将被缓冲并用于调用新的用户。
输出
E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5
AsyncSubject
在AsyncSubject的情况下,最后调用的值会传递给订阅者,而且只有在调用complete()方法后才会完成。
例子
from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
输出
E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2