RxPY 使用 Subject

RxPY 使用 Subject

Subject 是一个可观察序列,同时也是一个可以多播的观察者,即可以与订阅的多个观察者进行通信。

我们将讨论以下关于 subject 的主题 −

  • 创建一个 subject
  • 订阅一个 subject
  • 将数据传递给 subject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

创建一个 subject

要使用 subject,我们需要导入 Subject,如下所示−

from rx.subject import Subject

你可以按照以下方式创建一个主语-宾语 –

subject_test = Subject()

这个对象是一个观察者,它有三个方法:−

  • on_next(value)
  • on_error(error) 和
  • on_completed()

订阅一个主题

你可以像下面所示创建多个对该主题的订阅−

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

传递数据给Subject

您可以使用on_next(value)方法将数据传递给已创建的Subject,如下所示 –

subject_test.on_next("A")
subject_test.on_next("B")

这些数据将传递给所有订阅,并添加到主题中。

这里是一个主题的工作示例。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

subject_test对象是通过调用Subject()创建的。subject_test对象引用了on_next(value)、on_error(error)和on_completed()方法。上述示例的输出如下所示:

输出

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

我们可以使用on_completed()方法,如下所示停止subject的执行。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

一旦我们调用complete,下一个被调用的方法将不会被启用。

输出

E:\pyrx>python testrx.py
The value is A
The value is A

让我们现在来看一下如何调用on_error(error)方法。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

输出

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

在调用时,BehaviorSubject将提供最新的值。您可以按照下面所示的方式创建行为主题(BehaviorSubject) −

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

这里是一个使用Behaviour Subject的工作示例

示例

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

输出

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

ReplaySubject

ReplaySubject与BehaviorSubject类似,它可以缓冲值并向新的订阅者重播相同的值。下面是ReplaySubject的一个工作示例。

示例

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

缓冲值是2,用于重放主题。因此,最后两个值将被缓冲并用于调用新的订阅者。

输出

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

在AsyncSubject的情况下,被调用的最后一个值将传递给订阅者,并且只有在调用complete()方法之后才完成。

示例

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

输出

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程