Python 使用多种元组结构代替状态类,前面的例子使用了打包-拆包设计模式处理不可变元组和命名元组,其核心思想是用不可变对象包含其他不可变对象,来代替面向对象范式中的可变实例变量。
斯皮尔曼等级相关系数是一种用于表征两组变量相关度的统计量。比较两组变量的等级,由于变量值可能在尺度上有差异,所以它不比较具体的值,而比较相对顺序。有关该算法的更多信息,可参考维基百科。
计算斯皮尔曼等级相关系数时需要给每个样本赋一个等级值,使用enumerate(sorted())
可以实现。对给定的可能存在相关性的两组数据,分别将每组数据转换为一系列等级值,然后计算相关度。
这里使用“打包-拆包”设计模式实现算法,首先用等级值打包数据,便于后续计算相关系数。
前面介绍了如何解析数据集。从数据集中提取4份样本的做法如下:
>>> from Chapter_3.ch03_ex5 import (
... series, head_map_filter, row_iter)
>>> with open("Anscombe.txt") as source:
... data = list(head_map_filter(row_iter(source)))
得到的数据集每行包含4个序列,series()
函数从所有行中提取指定序列,返回结果是个二元组,如果是命名元组会更好。
代表每对数据的命名元组如下:
from typing import NamedTuple
class Pair(NamedTuple):
x: float
y: float
下面引入一个变换函数,将匿名元组转换为命名元组。
from typing import Callable, List, Tuple, Iterable
RawPairIter = Iterable[Tuple[float, float]]
pairs: Callable[[RawPairIter], List[Pair]] \
= lambda source: list(Pair(*row) for row in source)
RawPairIter
类型代表series()
函数返回的中间输出结果:一个输出二元组的可迭代序列。pairs
匿名函数接收一个可迭代序列,返回一个由Pair
命名元组组成的列表。
使用pairs()
函数和series()
函数从源数据中抽取数据对,如下所示:
>>> series_I = pairs(series(0, data))
>>> series_II = pairs(series(1, data))
>>> series_III = pairs(series(2, data))
>>> series_IV = pairs(series(3, data))
每个序列是一个由Pair
对象组成的列表,每个Pair
对象包含x
和y
两个属性,数据如下所示:
[Pair(x=10.0, y=8.04),
Pair(x=8.0, y=6.95),
...,
Pair(x=5.0, y=5.68)]
为了便于计算等级,需要构造一个包含等级值和原始数据对的组合对象,该二元组的类型定义如下所示:
from typing import Tuple
RankedPair = Tuple[int, Pair]
Pair
是前面定义的命名元组,RankedPair
是包含一个整数和一个Pair
对象的二元组的类型别名。
如下所示的生成器函数将包含Pair
的可迭代集合转换为RankedPair
:
from typing import Iterable, Iterator
def rank_y(pairs: Iterable[Pair]) -> Iterator[RankedPair]:
return enumerate(sorted(pairs, key=lambda p: p.y))
对RankedPair
对象应用enumerate()
函数创建迭代器。按Pair
对象的y
值排序。将每个Pair
对象和一个等级值包装成一个二元组。
更复杂的实现如下:
Rank2Pair = Tuple[int, RankedPair]
def rank_x(
ranked_pairs: Iterable[RankedPair]
) -> Iterator[Rank2Pair]:
return enumerate(
sorted(ranked_pairs, key=lambda rank: rank[1].x)
)
将RankedPair
对象包装进Rand2Pair
对象中。第二次包装生成了一个包含二元组的二元组,这个复杂的数据结构表明类型别名能为被处理的数据提供有效的类型提示。
y_rank = list(rank_y(series_I))
的运行结果如下:
[(0, Pair(x=8.0, y=5.25)),
(1, Pair(x=8.0, y=5.56)),
...,
(10, Pair(x=19.0, y=12.5))
]
为了计算相关度,需要使用rank_x()
函数和rank_y()
函数,xy_rank = list(rank_x (y_rank))
的值是由深度嵌套对象组成的列表,如下所示:
[(0, (0, Pair(x=4.0, y=4.26))),
(1, (2, Pair(x=5.0, y=5.68))),
...,
(10, (9, Pair(x=14.0, y=9.96)))
]
这样就可以基于x
和y
的等级值,而不是原始Pair
对象计算等级序列的相关度了。
要提取两个等级值,需要两个复杂表达式。对于数据集中每个标记了等级值的样本r
,需要比较r[0]
和r[1][0]
,这是与前面包装过程相对应的拆包过程。有时也将这类函数称为选择器函数,因为它们从复杂的数据结构中选择数据项。
为了避免对r[0]
和r[1][0]
的复杂引用,可以创建如下所示的选择器函数:
x_rank = lambda ranked: ranked[0]
y_rank = lambda ranked: ranked[1][0]
raw = lambda ranked: ranked[1][1]
这样就可以通过x_rank_(r)
和y_rank_(r)
来计算相关度了,引用表达式比原来的版本易读。
总的处理策略包含两部分操作:包装和拆包,rank_x()
函数和rank_y()
函数包装Pair
对象,用等级值和原始数据创建元组。通过创建复杂度逐渐增加的数据结构,避免了使用有状态的类定义。
为什么要创建深层嵌套元组呢?原因很简单:惰性求值。对元组进行拆包,生成新的扁平元组很花时间,包装已有元组则简单多了。使用扁平数据结构能大幅降低后续处理的复杂度,下面对已有的处理逻辑做如下改进。
- 平铺数据结构,
rank_x()
函数和rank_y()
函数的类型标示显示了其复杂度,一个在Tuple[int, Pair]
上迭代,另一个在Tuple[int, RankedPair]
上迭代。 -
enumerate()
函数不能正确计算有多个相同值序列的等级值。如果样本中有两个值大小相同,相应的等级值应相同。值的大小应该是这些相同值位置的平均数,例如序列[0.8, 1.2, 1.2, 2.3, 18]
的等级值是1, 2.5, 2.5, 4, 5
,处在第2位和第3位的两个相同值的等级值是它们位置的平均值:2.5。
下面通过编写更智能的等级计算函数完成上述优化。