RxPY – 创建可观察变量
create
这个方法用于创建一个可观察的对象。它将有一个观察者方法,即
- on_next() – 当Observable发出一个项目时,这个函数被调用。
-
on_completed() – 当Observable完成时,该函数被调用。
-
on_error() – 当Observable上发生错误时,这个函数被调用。
这里,是一个工作实例 –
testrx.py
from rx import create
def test_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error occured")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
这里,是创建的可观察的 输出 -
E:\pyrx>python testrx.py
Got - Hello
Job Done!
empty
这个观察变量不会输出任何东西,而是直接发射出完整的状态。
语法
empty()
返回值
它将返回一个没有元素的可观察变量。
例子
from rx import empty
test = empty()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py
Job Done!
never
此方法创建一个永远不会达到完整状态的观测器。
语法
never()
返回值
它将返回一个永远不会完成的可观察变量。
例子
from rx import never
test = never()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
It does not show any output.
throw
此方法将创建一个抛出错误的可观察变量。
语法
throw(exception)
参数
exception:一个有错误细节的对象。
返回值
返回一个带有错误细节的可观察对象。
例子
from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py
Error: There is an Error!
from_
这个方法将把给定的数组或对象转换成一个可观察的对象。
语法
from_(iterator)
参数
迭代器:这是一个对象或数组。
返回值
这将返回给定迭代器的可观察数据。
例子
from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10
Job Done!
interval
此方法将给出超时后产生的一系列数值。
语法
interval(period)
参数
period: 开始整数序列。
返回值
它返回一个包含所有值的顺序的可观察变量。
例子
import rx
from rx import operators as ops
rx.interval(1).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
输出
E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64
The value is 81
The value is 100
The value is 121
The value is 144
The value is 169
The value is 196
The value is 225
The value is 256
The value is 289
The value is 324
The value is 361
The value is 400
just
这个方法将把给定的值转换成一个可观察值。
语法
just(value)
参数
value:将被转换为一个可观察变量。
返回值
它将返回一个具有给定值的可观察变量。
例子
from rx import just
test = just([15, 25,50, 55])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py
The value is [15, 25, 50, 55]
Job Done!
range
此方法将根据给定的输入给出一个整数范围。
语法
range(start, stop=None)
参数
start: 第一个值,范围将从该值开始。
stop:可选的,该范围要停止的最后一个值。
返回值
这将返回一个基于所给输入的整数值的可观察值。
例子
from rx import range
test = range(0,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py
The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
Job Done!
repeat_value
这个方法将创建一个观察变量,它将按照给定的计数重复给定的值。
语法
repeat_value(value=None, repeat_count=None)
参数
value: 可选。要重复的值。
repeat_count:可选。要重复给定值的次数。
返回值
它将返回一个观察变量,该变量将按照给定的次数重复给定的值。
例子
from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!
start
这个方法接收一个函数作为输入,并返回一个将从输入函数返回值的可观察变量。
语法
start(func)
参数
func:一个将被调用的函数。
返回值
它返回一个将具有输入函数返回值的可观察变量。
例子
from rx import start
test = start(lambda : "Hello World")
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py
The value is Hello World
Job Done!
timer
这个方法将在超时完成后依次发射数值。
语法
timer(duetime)
参数
duetime: 时间,在这个时间之后,它应该发出第一个值。
返回值
它将返回一个包含在duetime之后发出的值的可观察变量。
例子
import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
输出
E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64