Python 多态与类型匹配,一些函数式语言提供了多种方式来灵活处理静态函数类型定义,但问题是许多函数的数据类型是完全抽象的。以统计函数为例,只要保证做除法得到的商是numbers.Real
的子类(例如Decimal
、Fraction
或者float
类型),其函数定义对整数和实数是完全一致的。这些函数式语言提供了复杂的类型和类型匹配机制,使得编译器可以通过统一的抽象定义处理多种数据类型。Python不存在这个问题,也不需要类型匹配。
Python的实现方法与静态类型的函数式语言借助于(可能是)复杂的语言特征截然相反。基于参与运算的数据类型,Python动态确定运算符的最终实现方式。使用Python编写的函数本就是抽象的,不与任何数据类型绑定,Python运行时根据对象类型确定使用何种实现。关于从运算符到具体处理方法名称之间的映射细节,可参阅Python语言参考手册的3.3.7节以及类库中numbers
模块的具体实现。
这意味着编译器并不能保证函数的输入数据和输出数据的类型是正确的,通常使用单元测试和mypy工具检测类型。
特殊情况下,当确实需要根据数据类型决定处理方法时,有如下两种解决方案:
- 使用
isinstance()
函数确定具体属于哪种情况; - 创建
numbers.Number
或者NamedTuple
的子类并实现符合具体情况的多态方法。
有时需要两种方法兼用来纳入需要转换的所有数据类型,并通过cast()
函数为mypy显式指定数据类型。
前面等级排序的例子基于输入值是简单数据对的假设,虽然斯皮尔曼等级相关系数的函数定义满足该要求,但实践中经常需要计算多个变量间的所有相关系数。
下面抽象化等级排序,首先定义如下命名元组,包含等级值和原始数据项。
from typing import NamedTuple, Tuple, Any
class Rank_Data(NamedTuple):
rank_seq: Tuple[float]
raw: Any
这个类的典型应用场景如下所示:
>>> data = {'key1': 1, 'key2': 2}
>>> r = Rank_Data((2, 7), data)
>>> r.rank_seq[0]
2
>>>r.raw
{'key1': 1, 'key2': 2}
原始数据的每一行是一个字典,每个数据项包含两个等级值。应用既可以获取等级值,也可以获取原始数据本身。
然后为等级排序函数添加一些语法糖。前面许多例子使用了for
语句处理可迭代对象和集合,但这里不会过多地使用for
语句。在一些函数中,我们显式使用iter()
函数将集合转化为可迭代对象,这里使用isinstance()
函数进行类型检测,如下所示:
def some_function(seq_or_iter: Union[Sequence, Iterator]):
if isinstance(seq_or_iter, Sequence):
yield from some_function(iter(seq_or_iter), key)
return
# Do the real work of the function using the Iterator
通过类型检查序列和迭代器对象间的微小差别。这里使用iter()
函数将序列对象转化为迭代器,然后递归调用自身来处理数据。
这里使用了Union[Sequence, Iterator]
表示等级排序的数据结构,源数据排序后才能获得等级值,最简单的方法是用list()
函数将迭代器转化为序列。虽然仍使用isinstance()
函数,但不像前面那样基于序列生成迭代器,而是将迭代器转化为序列。
应尽量把等级排序函数定义得抽象一些。下面两个表达式定义了输入数据:
Source = Union[Rank_Data, Any]
Union[Sequence[Source], Iterator[Source]]
两类输入数据对应四种组合:
Sequence[Rank_Data]
Sequence[Any]
Iterator[Rank_Data]
Iterator[Any]
下面的rank_data()
函数分三种情况处理上面四种组合。
from typing import (
Callable, Sequence, Iterator, Union, Iterable,
TypeVar, cast, Union
)
K_ = TypeVar("K_") # Some comparable key type used for ranking.
Source = Union[Rank_Data, Any]
def rank_data(
seq_or_iter: Union[Sequence[Source], Iterator[Source]],
key: Callable[[Rank_Data], K_] = lambda obj: cast(K_, obj)
) -> Iterable[Rank_Data]:
if isinstance(seq_or_iter, Iterator):
# Iterator? Materialize a sequence object
yield from rank_data(list(seq_or_iter), key)
return
data: Sequence[Rank_Data]
if isinstance(seq_or_iter[0], Rank_Data):
# Collection of Rank_Data is what we prefer.
data = seq_or_iter
else:
# Convert to Rank_Data and process.
empty_ranks: Tuple[float] = cast(Tuple[float], ())
data = list(
Rank_Data(empty_ranks, raw_data)
for raw_data in cast(Sequence[Source], seq_or_iter)
)
for r, rd in rerank(data, key):
new_ranks = cast(
Tuple[float],
rd.rank_seq + cast(Tuple[float], (r,)))
yield Rank_Data(new_ranks, rd.raw)
前面把四类数据结构分为三种情况,下面详细说明。
- 如果输入是迭代器(没有实现
__getitem__()
方法的对象),把它实例化为一个列表对象,就都可以处理Rank_Data
或其他类型的源数据了。这种情况包括Iterable[Rank_Data]
和Iterable[Any]
类型对象。 - 如果输入是
Sequence[Any]
类型对象,把这个未知对象包装进一个带有空等级值的Rank_Data
元组中,创建一个Sequence[Rank_Data]
对象。 - 最后,如果输入是
Sequence[Rank_Data]
类型对象,添加新的等级排序值到原有的Rank_Data
容器中。
第一种情况递归调用rank_data()
自身,另外两种情况使用rerank()
函数基于算出的等级值构建新的Rank_Data
对象。结构复杂的源数据将包含多个等级排序值。
为了消除等级元组数据类型的模糊性,需要使用复杂的cast()
表达式。可以用mypy工具提供的reveal_type()
函数调试编译器的类型推断。
rerank()
函数与前面定义的rank()
函数略有不同,它返回包括等级值与源数据的二元组。
def rerank(
rank_data_iter: Iterable[Rank_Data],
key: Callable[[Rank_Data], K_]
) -> Iterator[Tuple[float, Rank_Data]]:
sorted_iter = iter(
sorted(
rank_data_iter, key=lambda obj: key(obj.raw)
)
)
# Apply ranker to head, *tail = sorted(rank_data_iter)
head = next(sorted_iter)
yield from ranker(sorted_iter, 0, [head], key)
rerank()
的实现思路是对Rank_Data
序列进行排序,其头部元素head
用作ranker()
函数的种子,ranker()
函数在剩余项中查找与头部元素匹配(等级计算标准相同)的其他对象,从而计算出一组匹配对象的等级值。
ranker()
函数接收一组排序后的可迭代对象、一个基准值和一个等级值最小的排序对象集合,返回结果是一个由等级值和相应Rank_Data
对象组成的二元组序列。
def ranker(
sorted_iter: Iterator[Rank_Data],
base: float,
same_rank_seq: List[Rank_Data],
key: Callable[[Rank_Data], K_]
) -> Iterator[Tuple[float, Rank_Data]]:
try:
value = next(sorted_iter)
except StopIteration:
dups = len(same_rank_seq)
yield from yield_sequence(
(base + 1 + base + dups) / 2, iter(same_rank_seq))
return
if key(value.raw) == key(same_rank_seq[0].raw):
yield from ranker(
sorted_iter, base, same_rank_seq + [value], key)
else:
dups = len(same_rank_seq)
yield from yield_sequence(
(base + 1 + base + dups) / 2, iter(same_rank_seq))
yield from ranker(
sorted_iter, base + dups, [value], key)
首先从排序后的Rank_Data
组成的序列中取一个对象,如果出现StopIteration
异常,说明数据源已空,没有待处理元素了。基于拥有相同等级值的same_rank_seq
返回最终结果。
如果能取到下一个对象,就用key()
函数计算键值。如果与same_rank_seq
中元素的键值相同,把它追加到当前相同等级值序列中。基于sorted_iter
中的剩余数据、当前等级值、包含了head
的新same_rank_seq
以及key()
函数计算最终结果。
如果当前对象的键值与相同等级值集合中元素的key
值不一致,则返回结果包含两部分。第一部分来自same_rank_seq
中包含的相同等级值序列。第二部分来自排序后集合中的剩余元素,其基准值在当前相同等级值序列的基础上增加,初始化新的相同等级序列键值,key()
函数保持不变。
ranker()
函数使用yield_sequence()
输出结果,如下所示:
def yield_sequence(
rank: float,
same_rank_iter: Iterator[Rank_Data]
) -> Iterator[Tuple[float, Rank_Data]]:
head = next(same_rank_iter)
yield rank, head
yield from yield_sequence(rank, same_rank_iter)
该实现强调了算法的递归含义,实践中应使用for
语句进行优化。
运用尾调用优化技术将递归优化为循环时,首先要做好单元测试,确保递归版本通过单元测试后再开始优化。
接下来用前面定义的函数为数据标记(以及再次标记)等级值,从由简单标量组成的集合开始。
>>> scalars= [0.8, 1.2, 1.2, 2.3, 18]
>>> list(rank_data(scalars))
[Rank_Data(rank_seq=(1.0,), raw=0.8),
Rank_Data(rank_seq=(2.5,), raw=1.2),
Rank_Data(rank_seq=(2.5,), raw=1.2),
Rank_Data(rank_seq=(4.0,), raw=2.3),
Rank_Data(rank_seq=(5.0,), raw=18)]
源数据变成了Rank_Data
对象的raw
属性。
处理更复杂的数据时会出现多个等级值。下面是一个二元组序列:
>>> pairs = ((2, 0.8), (3, 1.2), (5, 1.2), (7, 2.3), (11, 18))
>>> rank_x = list(rank_data(pairs, key=lambda x:x[0]))
>>> rank_x
[Rank_Data(rank_seq=(1.0,), raw=(2, 0.8)),
Rank_Data(rank_seq=(2.0,), raw=(3, 1.2)),
Rank_Data(rank_seq=(3.0,), raw=(5, 1.2)),
Rank_Data(rank_seq=(4.0,), raw=(7, 2.3)),
Rank_Data(rank_seq=(5.0,), raw=(11, 18))]
>>> rank_xy = list(rank_data(rank_x, key=lambda x:x[1]))
>>> rank_xy
[Rank_Data(rank_seq=(1.0, 1.0), raw=(2, 0.8)),
Rank_Data(rank_seq=(2.0, 2.5), raw=(3, 1.2)),
Rank_Data(rank_seq=(3.0, 2.5), raw=(5, 1.2)),
Rank_Data(rank_seq=(4.0, 4.0), raw=(7, 2.3)),
Rank_Data(rank_seq=(5.0, 5.0), raw=(11, 18))]
首先定义了二元组集合,然后将二元组生成的Rank_Data
对象序列标记等级值赋给rank_x
,接着对Rank_Data
对象序列再次标记等级值,并赋给rank_xy
。
对rank_corr()
函数稍做修改,就可以基于Rank_Data
对象的rank_seq
属性计算任意序列的等级相关度了,具体修改留给读者练习。