Python数字取证 基于日志的人工制品的调查
到现在为止,我们已经看到了如何使用Python在Windows中获得工件。在这一章中,让我们学习一下使用Python调查基于日志的工件。
简介
基于日志的人工制品是信息的宝库,对数字取证专家来说非常有用。尽管我们有各种监控软件来收集信息,但从它们中解析出有用信息的主要问题是我们需要大量的数据。
各种基于日志的工件和在Python中进行调查
在本节中,让我们讨论各种基于日志的工件和它们在Python中的调查 —
时间戳
时间戳传达了日志中活动的数据和时间。它是任何日志文件的重要元素之一。请注意,这些数据和时间值可以有多种格式。
下面显示的Python脚本将把原始日期时间作为输入,并提供一个格式化的时间戳作为其输出。
对于这个脚本,我们需要遵循以下步骤– 1.
- 首先,设置参数,这些参数将接收原始数据值以及数据来源和数据类型。
-
现在,提供一个类,为不同日期格式的数据提供通用接口。
Python代码
让我们看看如何使用Python代码来达到这个目的 —
首先,导入以下Python模块 –
from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from datetime import datetime as dt
from datetime import timedelta
现在像往常一样,我们需要为命令行处理程序提供参数。这里它将接受三个参数,第一个是要处理的日期值,第二个是该日期值的来源,第三个是它的类型:
if __name__ == '__main__':
parser = ArgumentParser('Timestamp Log-based artifact')
parser.add_argument("date_value", help="Raw date value to parse")
parser.add_argument(
"source", help = "Source format of date",choices = ParseDate.get_supported_formats())
parser.add_argument(
"type", help = "Data type of input value",choices = ('number', 'hex'), default = 'int')
args = parser.parse_args()
date_parser = ParseDate(args.date_value, args.source, args.type)
date_parser.run()
print(date_parser.timestamp)
Now, we need to define a class which will accept the arguments for date value, date source, and the value type −
class ParseDate(object):
def __init__(self, date_value, source, data_type):
self.date_value = date_value
self.source = source
self.data_type = data_type
self.timestamp = None
现在我们将定义一个方法,就像main()方法一样充当控制器:
def run(self):
if self.source == 'unix-epoch':
self.parse_unix_epoch()
elif self.source == 'unix-epoch-ms':
self.parse_unix_epoch(True)
elif self.source == 'windows-filetime':
self.parse_windows_filetime()
@classmethod
def get_supported_formats(cls):
return ['unix-epoch', 'unix-epoch-ms', 'windows-filetime']
现在,我们需要定义两个方法,分别处理Unix的纪元时间和FILETIME —
def parse_unix_epoch(self, milliseconds=False):
if self.data_type == 'hex':
conv_value = int(self.date_value)
if milliseconds:
conv_value = conv_value / 1000.0
elif self.data_type == 'number':
conv_value = float(self.date_value)
if milliseconds:
conv_value = conv_value / 1000.0
else:
print("Unsupported data type '{}' provided".format(self.data_type))
sys.exit('1')
ts = dt.fromtimestamp(conv_value)
self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
def parse_windows_filetime(self):
if self.data_type == 'hex':
microseconds = int(self.date_value, 16) / 10.0
elif self.data_type == 'number':
microseconds = float(self.date_value) / 10
else:
print("Unsupported data type '{}' provided".format(self.data_type))
sys.exit('1')
ts = dt(1601, 1, 1) + timedelta(microseconds=microseconds)
self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
运行上述脚本后,通过提供一个时间戳,我们可以得到易读格式的转换值。
网络服务器日志
从数字取证专家的角度来看,网络服务器日志是另一个重要的人工制品,因为它们可以得到有用的用户统计数据,以及关于用户和地理位置的信息。以下是Python脚本,它将在处理网络服务器日志后创建一个电子表格,以方便分析信息。
首先,我们需要导入以下Python模块—
from __future__ import print_function
from argparse import ArgumentParser, FileType
import re
import shlex
import logging
import sys
import csv
logger = logging.getLogger(__file__)
现在,我们需要定义将从日志中解析的模式:
iis_log_format = [
("date", re.compile(r"\d{4}-\d{2}-\d{2}")),
("time", re.compile(r"\d\d:\d\d:\d\d")),
("s-ip", re.compile(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|)){4}")),
("cs-method", re.compile(
r"(GET)|(POST)|(PUT)|(DELETE)|(OPTIONS)|(HEAD)|(CONNECT)")),
("cs-uri-stem", re.compile(r"([A-Za-z0-1/\.-]*)")),
("cs-uri-query", re.compile(r"([A-Za-z0-1/\.-]*)")),
("s-port", re.compile(r"\d*")),
("cs-username", re.compile(r"([A-Za-z0-1/\.-]*)")),
("c-ip", re.compile(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|)){4}")),
("cs(User-Agent)", re.compile(r".*")),
("sc-status", re.compile(r"\d*")),
("sc-substatus", re.compile(r"\d*")),
("sc-win32-status", re.compile(r"\d*")),
("time-taken", re.compile(r"\d*"))]
现在,为命令行处理程序提供一个参数。这里它将接受两个参数,第一个是要处理的IIS日志,第二个是想要的CSV文件路径。
if __name__ == '__main__':
parser = ArgumentParser('Parsing Server Based Logs')
parser.add_argument('iis_log', help = "Path to IIS Log",type = FileType('r'))
parser.add_argument('csv_report', help = "Path to CSV report")
parser.add_argument('-l', help = "Path to processing log",default=__name__ + '.log')
args = parser.parse_args()
logger.setLevel(logging.DEBUG)
msg_fmt = logging.Formatter(
"%(asctime)-15s %(funcName)-10s ""%(levelname)-8s %(message)s")
strhndl = logging.StreamHandler(sys.stdout)
strhndl.setFormatter(fmt = msg_fmt)
fhndl = logging.FileHandler(args.log, mode = 'a')
fhndl.setFormatter(fmt = msg_fmt)
logger.addHandler(strhndl)
logger.addHandler(fhndl)
logger.info("Starting IIS Parsing ")
logger.debug("Supplied arguments: {}".format(", ".join(sys.argv[1:])))
logger.debug("System " + sys.platform)
logger.debug("Version " + sys.version)
main(args.iis_log, args.csv_report, logger)
iologger.info("IIS Parsing Complete")
现在我们需要定义main()方法,该方法将处理大量日志信息的脚本 –
def main(iis_log, report_file, logger):
parsed_logs = []
for raw_line in iis_log:
line = raw_line.strip()
log_entry = {}
if line.startswith("#") or len(line) == 0:
continue
if '\"' in line:
line_iter = shlex.shlex(line_iter)
else:
line_iter = line.split(" ")
for count, split_entry in enumerate(line_iter):
col_name, col_pattern = iis_log_format[count]
if col_pattern.match(split_entry):
log_entry[col_name] = split_entry
else:
logger.error("Unknown column pattern discovered. "
"Line preserved in full below")
logger.error("Unparsed Line: {}".format(line))
parsed_logs.append(log_entry)
logger.info("Parsed {} lines".format(len(parsed_logs)))
cols = [x[0] for x in iis_log_format]
logger.info("Creating report file: {}".format(report_file))
write_csv(report_file, cols, parsed_logs)
logger.info("Report created")
最后,我们需要定义一个方法,将输出写入电子表格—-。
def write_csv(outfile, fieldnames, data):
with open(outfile, 'w', newline="") as open_outfile:
csvfile = csv.DictWriter(open_outfile, fieldnames)
csvfile.writeheader()
csvfile.writerows(data)
运行上述脚本后,我们将在电子表格中得到基于网络服务器的日志。
使用YARA扫描重要文件
YARA(Yet Another Recursive Algorithm)是一个模式匹配工具,用于恶意软件识别和事件响应。我们将使用YARA来扫描文件。在下面的Python脚本中,我们将使用YARA。
我们可以通过以下命令来安装YARA —
pip install YARA
我们可以按照下面给出的步骤使用YARA规则来扫描文件 –
- 首先,设置并编译YARA规则
-
然后,扫描单个文件,然后在目录中迭代,处理单个文件。
-
最后,我们将把结果导出到CSV。
Python代码
让我们看看如何使用Python代码来达到这个目的–
首先,我们需要导入以下Python模块 –
from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import os
import csv
import yara
接下来,为命令行处理程序提供参数。注意,这里它将接受两个参数–第一个是YARA规则的路径,第二个是要扫描的文件。
if __name__ == '__main__':
parser = ArgumentParser('Scanning files by YARA')
parser.add_argument(
'yara_rules',help = "Path to Yara rule to scan with. May be file or folder path.")
parser.add_argument('path_to_scan',help = "Path to file or folder to scan")
parser.add_argument('--output',help = "Path to output a CSV report of scan results")
args = parser.parse_args()
main(args.yara_rules, args.path_to_scan, args.output)
现在我们将定义main()函数,它将接受yara规则的路径和要扫描的文件 –
def main(yara_rules, path_to_scan, output):
if os.path.isdir(yara_rules):
yrules = yara.compile(yara_rules)
else:
yrules = yara.compile(filepath=yara_rules)
if os.path.isdir(path_to_scan):
match_info = process_directory(yrules, path_to_scan)
else:
match_info = process_file(yrules, path_to_scan)
columns = ['rule_name', 'hit_value', 'hit_offset', 'file_name',
'rule_string', 'rule_tag']
if output is None:
write_stdout(columns, match_info)
else:
write_csv(output, columns, match_info)
现在,定义一个方法,该方法将遍历目录,并将结果传递给另一个方法进行进一步的处理–
def process_directory(yrules, folder_path):
match_info = []
for root, _, files in os.walk(folder_path):
for entry in files:
file_entry = os.path.join(root, entry)
match_info += process_file(yrules, file_entry)
return match_info
接下来,定义两个函数。注意,首先我们将使用 match() 方法来处理 对象 ,如果用户没有指定任何输出文件,另一个函数将向控制台报告该匹配信息。观察下面显示的代码 –
def process_file(yrules, file_path):
match = yrules.match(file_path)
match_info = []
for rule_set in match:
for hit in rule_set.strings:
match_info.append({
'file_name': file_path,
'rule_name': rule_set.rule,
'rule_tag': ",".join(rule_set.tags),
'hit_offset': hit[0],
'rule_string': hit[1],
'hit_value': hit[2]
})
return match_info
def write_stdout(columns, match_info):
for entry in match_info:
for col in columns:
print("{}: {}".format(col, entry[col]))
print("=" * 30)
最后,我们将定义一个方法,将输出写入CSV文件,如下图所示。
def write_csv(outfile, fieldnames, data):
with open(outfile, 'w', newline="") as open_outfile:
csvfile = csv.DictWriter(open_outfile, fieldnames)
csvfile.writeheader()
csvfile.writerows(data)
一旦你成功运行上述脚本,我们可以在命令行提供适当的参数,并可以生成CSV报告。