Python 解析日志文件之收集行数据,解析大量文件的第一步是读取每个文件并生成一个简单的行序列。由于日志文件是以.gzip
格式保存的,因此需要使用gzip.open()
函数打开每个文件,而不是使用io.open()
函数或__builtins__.open()
函数。
如下面的命令片段所示,函数local_gzip()
会从本地缓存的文件中读取行数据。
from typing import Iterator
def local_gzip(pattern: str) -> Iterator[Iterator[str]]:
zip_logs = glob.glob(pattern)
for zip_file in zip_logs:
with gzip.open(zip_file, "rb") as log:
yield (
line.decode('us-ascii').rstrip()
for line in log)
上述函数遍历了与给定模式匹配的所有文件。对于每个文件,生成的值是一个生成器函数,它会遍历该文件中的所有行数据。前面已经封装了一些东西,其中包括文件通配符匹配,打开以.gzip
格式压缩的日志文件的实现细节,以及将一个文件分解为不包含任何尾部换行符(\n
)的行序列。
这里的基本设计模式是生成的值是每个文件的生成器表达式。可以把上述函数重写为一个函数和一个映射,该映射将这个特定函数应用于每个文件。对于需要识别个别文件的罕见情况,这种方法非常有用。在某些情况下,可以使用yield from
对其进行优化,这会使得所有日志文件形似单个行数据流。
还有其他几种方法可以得到类似的输出,例如下面给出了上述示例中内层for
循环的一个替代版本,其中的line_iter()
函数也能得到给定文件的行数据。
def line_iter(zip_file: str) -> Iterator[str]:
log = gzip.open(zip_file, "rb")
return (line.decode('us-ascii').rstrip() for line in log)
函数line_iter()
使用了gzip.open()
函数和一些行清理操作。可以通过映射将line_iter()
函数应用于模式匹配的所有文件,如下所示:
map(line_iter, glob.glob(pattern))
尽管这个替代版本很简洁,但其缺点是打开的文件对象必须一直等待,直到没有更多文件引用才能被当作垃圾进行回收。在处理大量文件时,这会产生不必要的开销。出于这个原因,我们将考虑使用之前讲的local_gzip()
函数。
上述映射的替代方案有一个明显优势:它和multiprocessing
模块相得益彰。我们可以创建一个工作池并将任务(如文件读取)映射至该进程池中。这样做让我们可以并行读取这些文件,而打开的文件对象也将作为独立进程的一部分。
这个设计的扩展包含了第二个函数,它使用FTP传输来自Web主机的文件。当从Web服务器收集到文件后,便可以使用local_gzip()
函数来分析它们了。
函数access_iter()
会使用local_gzip()
函数的结果来为描述文件访问的源文件中的每一行创建命名元组。