RxPY 创建 Observable

RxPY 创建 Observable

create

这个方法用于创建 Observable。它将包含观察者方法,即:

  • on_next() − 当 Observable 发出一个项时被调用的函数。

  • on_completed() − 当 Observable 完成时被调用的函数。

  • on_error() − 当 Observable 发生错误时被调用的函数。

这里是一个工作示例 −

testrx.py

from rx import create
def test_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error occured")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

这里是创建的可观察对象的 输出

E:\pyrx>python testrx.py
Got - Hello
Job Done!

empty

此可观察对象不会输出任何内容,直接发出完成状态。

语法

empty()

返回值

它将返回一个没有元素的可观察对象。

示例

from rx import empty
test = empty()
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

输出

E:\pyrx>python testrx.py
Job Done!

never

该方法创建一个永远不会达到完成状态的可观察对象。

语法

never()

返回值

它将返回一个永远不会完成的可观察对象。

示例

from rx import never
test = never()
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

结果

It does not show any output.

throw

这个方法将创建一个可观察对象,将抛出一个错误。

语法

throw(exception)

参数

exception:具有错误详情的对象。

返回值

返回一个带有错误详情的可观察对象。

示例

from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

输出

E:\pyrx>python testrx.py
Error: There is an Error!

from_

该方法将给定的数组或对象转换为可观察对象。

语法

from_(iterator)

参数

iterator: 这是一个对象或数组。

返回值

这将返回给定迭代器的可观测对象。

示例

from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

输出

E:\pyrx>python testrx.py
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10
Job Done!

interval

该方法在超时后会产生一系列的值。

语法

interval(period)

参数

period: 开始整数序列的点。

返回值

它返回一个按顺序排列的包含所有值的可观察对象。

示例

import rx
from rx import operators as ops
rx.interval(1).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")

输出

E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64
The value is 81
The value is 100
The value is 121
The value is 144
The value is 169
The value is 196
The value is 225
The value is 256
The value is 289
The value is 324
The value is 361
The value is 400

just

此方法将给定的值转换为一个可观察对象。

语法

just(value)

参数

value: 要转换为observable的值。

返回值

它将返回一个带有给定值的observable。

示例

from rx import just
test = just([15, 25,50, 55])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

输出

E:\pyrx>python testrx.py
The value is [15, 25, 50, 55]
Job Done!

range

此方法将根据给定的输入给出一系列整数。

语法

range(start, stop=None)

参数

start: 范围的起始值。

stop: 可选,范围的结束值。

返回值

根据给定的输入,将返回一个含有整数值的可观察对象。

示例

from rx import range
test = range(0,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

输出

E:\pyrx>python testrx.py
The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
Job Done!

repeat_value

此方法将创建一个可观察对象,根据给定的次数重复给定的值。

语法

repeat_value(value=None, repeat_count=None)

参数

value:可选项。要重复的值。

repeat_count:可选项。要重复给定值的次数。

返回值

它将返回一个可观察对象,根据给定的计数重复给定的值。

示例

from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

输出

E:\pyrx>python testrx.py
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!

start

此方法接受一个函数作为输入,并返回一个可观察对象,该对象将返回来自输入函数的值。

语法

start(func)

参数

func:将被调用的函数。

返回值

它返回一个可观察对象,该对象将具有输入函数的返回值。

示例

from rx import start
test = start(lambda : "Hello World")
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

输出

E:\pyrx>python testrx.py
The value is Hello World
Job Done!

timer

此方法将在超时结束后按顺序发出值。

语法

timer(duetime)

参数

duetime:应该发出第一个值的时间。

返回值

它将返回一个在duetime之后发出值的可观察对象。

示例

import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")

输出

E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程