"""Volume scanner for artifact definitions."""
import logging
import os
import yaml
from dfimagetools import artifact_filters
from dfimagetools import environment_variables
from dfimagetools import windows_registry
from dfvfs.helpers import file_system_searcher as dfvfs_file_system_searcher
from dfvfs.helpers import volume_scanner as dfvfs_volume_scanner
from dfvfs.helpers import windows_path_resolver as dfvfs_windows_path_resolver
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.resolver import resolver as dfvfs_resolver
from dfwinreg import registry as dfwinreg_registry
from dtfabric import errors as dtfabric_errors
from dtfabric.runtime import fabric as dtfabric_fabric
from artifactsrc import resource_file
[docs]
class CheckResults:
"""Check results.
Attributes:
data_formats (set[str]): data formats that were found.
number_of_file_entries (int): number of file entries that were found.
"""
[docs]
def __init__(self):
"""Initializes check results."""
super().__init__()
self.data_formats = set()
self.number_of_file_entries = 0
[docs]
class ArtifactDefinitionsVolumeScanner(dfvfs_volume_scanner.VolumeScanner):
"""Artifact definitions volume scanner."""
# Preserve the absolute path value of __file__ in case it is changed
# at run-time.
_CHECKS_DEFINITIONS_FILE = os.path.join(
os.path.dirname(__file__), "data", "checks.yaml"
)
_DEFINITION_FILES_PATH = os.path.dirname(__file__)
_SYSTEM_DIRECTORY_FIND_SPECS = [
dfvfs_file_system_searcher.FindSpec(
case_sensitive=False, location="/sbin", location_separator="/"
),
dfvfs_file_system_searcher.FindSpec(
case_sensitive=False, location="/System/Library", location_separator="/"
),
dfvfs_file_system_searcher.FindSpec(
case_sensitive=False,
location="\\Windows\\System32",
location_separator="\\",
),
dfvfs_file_system_searcher.FindSpec(
case_sensitive=False, location="\\WINNT\\System32", location_separator="\\"
),
dfvfs_file_system_searcher.FindSpec(
case_sensitive=False,
location="\\WINNT35\\System32",
location_separator="\\",
),
dfvfs_file_system_searcher.FindSpec(
case_sensitive=False, location="\\WTSRV\\System32", location_separator="\\"
),
]
# We need to check for both forward and backward slashes since the path
# specification will be dfVFS back-end dependent.
_WINDOWS_SYSTEM_DIRECTORIES = set(
[
"/windows/system32",
"\\windows\\system32",
"/winnt/system32",
"\\winnt\\system32",
"/winnt35/system32",
"\\winnt35\\system32",
"/wtsrv/system32",
"\\wtsrv\\system32",
]
)
_WINDOWS_DIRECTORIES = frozenset(
[
"C:\\Windows",
"C:\\WINNT",
"C:\\WTSRV",
"C:\\WINNT35",
]
)
_FORMAT_VERSION_STRING = {
"bplist": "bplist 0x{format_version:s}",
"esedb": "esedb 0x{format_version:x}",
"evt": "evt {major_format_version:d}.{minor_format_version:d}",
"evtx": "evtx {major_format_version:d}.{minor_format_version:d}",
"job": "job {format_version:d}",
"regf": "regf {major_format_version:d}.{minor_format_version:d}",
"scca": "scca {format_version:d}",
}
[docs]
def __init__(self, artifacts_registry, mediator=None):
"""Initializes an artifact definitions volume scanner.
Args:
artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifact
definitions registry.
mediator (Optional[dfvfs.VolumeScannerMediator]): a volume scanner
mediator.
"""
super().__init__(mediator=mediator)
self._ascii_codepage = "cp1252"
self._artifacts_registry = artifacts_registry
self._checks_definitions = None
self._data_location = os.path.join("data")
self._data_type_fabric = self._ReadDataTypeFabricDefinitionFile("formats.yaml")
self._data_type_maps = {}
self._environment_variables = []
self._file_system = None
self._file_system_searcher = None
self._filter_generator = None
self._mount_point = None
self._path_resolver = None
self._preferred_language_identifier = "en-US"
self._windows_directory = None
self._windows_registry = None
def _DetermineDataFormat(self, names, file_object):
"""Determines the data format.
Args:
names (list[str]): names of data formats to check.
file_object (file): file-like object.
Returns:
str: data format identifier or None if the data format could not
be determined.
"""
for name in names:
format_data_type_map = self._GetDataTypeMap(name)
layout = getattr(format_data_type_map, "layout", None)
if not layout:
continue
layout_element_definition = layout[0]
if layout_element_definition.offset is None:
continue
data_type_map = self._GetDataTypeMap(layout_element_definition.data_type)
structure_values = self._ReadStructureFromFileObject(
file_object, layout_element_definition.offset, data_type_map
)
if structure_values:
format_string = self._FORMAT_VERSION_STRING.get(name, name)
return format_string.format(**structure_values.__dict__)
return None
def _GetDataTypeMap(self, name):
"""Retrieves a data type map defined by the definition file.
The data type maps are cached for reuse.
Args:
name (str): name of the data type as defined by the definition file.
Returns:
dtfabric.DataTypeMap: data type map which contains a data type definition,
such as a structure, that can be mapped onto binary data.
"""
data_type_map = self._data_type_maps.get(name, None)
if not data_type_map:
data_type_map = self._data_type_fabric.CreateDataTypeMap(name)
self._data_type_maps[name] = data_type_map
return data_type_map
def _OpenMessageResourceFile(self, windows_path):
"""Opens the message resource file specified by the Windows path.
Args:
windows_path (str): Windows path containing the message resource
filename.
Returns:
MessageResourceFile: message resource file or None.
"""
path_spec = self._path_resolver.ResolvePath(windows_path)
if path_spec is None:
return None
return self._OpenMessageResourceFileByPathSpec(path_spec)
def _OpenMessageResourceFileByPathSpec(self, path_spec):
"""Opens the message resource file specified by the path specification.
Args:
path_spec (dfvfs.PathSpec): path specification.
Returns:
MessageResourceFile: message resource file or None.
"""
windows_path = self._path_resolver.GetWindowsPath(path_spec)
if windows_path is None:
logging.warning("Unable to retrieve Windows path.")
try:
file_object = dfvfs_resolver.Resolver.OpenFileObject(path_spec)
except IOError as exception:
logging.warning(
f"Unable to open: {path_spec.comparable:s} with error: {exception!s}"
)
file_object = None
if file_object is None:
return None
message_file = resource_file.MessageResourceFile(
windows_path,
ascii_codepage=self._ascii_codepage,
preferred_language_identifier=self._preferred_language_identifier,
)
message_file.OpenFileObject(file_object)
return message_file
def _ReadChecksDefinitions(self):
"""Reads the checks definitions from checks.yaml.
Returns:
list[dict[str, object]]: checks definitions.
"""
check_definitions = {}
with open(self._CHECKS_DEFINITIONS_FILE, "r", encoding="utf-8") as file_object:
for check_definition in yaml.safe_load_all(file_object):
name = check_definition.get("name", None)
if name:
check_definitions[name.lower()] = check_definition
return check_definitions
def _ReadDataTypeFabricDefinitionFile(self, filename):
"""Reads a dtFabric definition file.
Args:
filename (str): name of the dtFabric definition file.
Returns:
dtfabric.DataTypeFabric: data type fabric which contains the data format
data type maps of the data type definition, such as a structure, that
can be mapped onto binary data or None if no filename is provided.
"""
if not filename:
return None
path = os.path.join(self._DEFINITION_FILES_PATH, filename)
with open(path, "rb") as file_object:
definition = file_object.read()
return dtfabric_fabric.DataTypeFabric(yaml_definition=definition)
def _ReadStructureFromFileObject(self, file_object, file_offset, data_type_map):
"""Reads a structure from a file-like object.
This method currently only supports fixed-size structures.
Args:
file_object (file): a file-like object to parse.
file_offset (int): offset of the structure data relative to the start
of the file-like object.
data_type_map (dtfabric.DataTypeMap): data type map of the structure.
Returns:
object: structure values object or None if the structure cannot be read.
"""
structure_values = None
data_size = data_type_map.GetSizeHint()
if data_size:
file_object.seek(file_offset, os.SEEK_SET)
try:
data = file_object.read(data_size)
structure_values = data_type_map.MapByteStream(data)
except (
dtfabric_errors.ByteStreamTooSmallError,
dtfabric_errors.MappingError,
):
pass
return structure_values
[docs]
def CheckArtifactDefinition(self, artifact_definition):
"""Checks if an artifact definition on a storage media image.
Args:
artifact_definition (artifacts.ArtifactDefinition): artifact definition.
Returns:
CheckResults: check results.
"""
check_result = CheckResults()
if self._checks_definitions is None:
self._checks_definitions = self._ReadChecksDefinitions()
find_specs = list(
self._filter_generator.GetFindSpecs([artifact_definition.name])
)
if find_specs:
path_specs = list(self._file_system_searcher.Find(find_specs=find_specs))
check_result.number_of_file_entries = len(path_specs)
check_definition = self._checks_definitions.get(
artifact_definition.name.lower(), None
)
if check_definition:
for path_spec in path_specs:
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)
if file_entry.size > 0:
file_object = file_entry.GetFileObject()
if file_object:
formats = check_definition.get("formats", [])
data_format = self._DetermineDataFormat(
formats, file_object
)
check_result.data_formats.add(data_format or "unknown")
return check_result
[docs]
def GetWindowsVersion(self):
"""Determines the Windows version from kernel executable file.
Returns:
str: Windows version or None otherwise.
"""
# Window NT variants.
kernel_executable_path = "\\".join(
[self._windows_directory, "System32", "ntoskrnl.exe"]
)
message_file = self._OpenMessageResourceFile(kernel_executable_path)
if not message_file:
# Window 9x variants.
kernel_executable_path = "\\".join(
[self._windows_directory, "System32", "\\kernel32.dll"]
)
message_file = self._OpenMessageResourceFile(kernel_executable_path)
if not message_file:
return None
return message_file.file_version
[docs]
def ScanForOperatingSystemVolumes(self, source_path, options=None):
"""Scans for volumes containing an operating system.
Args:
source_path (str): source path.
options (Optional[dfvfs.VolumeScannerOptions]): volume scanner options.
If None the default volume scanner options are used, which are defined
in the VolumeScannerOptions class.
Returns:
bool: True if a volume with an operating system was found.
Raises:
ScannerError: if the source path does not exists, or if the source path
is not a file or directory, or if the format of or within the source
file is not supported.
"""
if not options:
options = dfvfs_volume_scanner.VolumeScannerOptions()
scan_context = self._ScanSource(source_path)
self._source_path = source_path
self._source_type = scan_context.source_type
base_path_specs = self._GetBasePathSpecs(scan_context, options)
if (
not base_path_specs
or scan_context.source_type == dfvfs_definitions.SOURCE_TYPE_FILE
):
return False
for path_spec in base_path_specs:
file_system = dfvfs_resolver.Resolver.OpenFileSystem(path_spec)
if path_spec.type_indicator == dfvfs_definitions.TYPE_INDICATOR_OS:
mount_point = path_spec
else:
mount_point = path_spec.parent
file_system_searcher = dfvfs_file_system_searcher.FileSystemSearcher(
file_system, mount_point
)
system_directories = []
for system_directory_path_spec in file_system_searcher.Find(
find_specs=self._SYSTEM_DIRECTORY_FIND_SPECS
):
relative_path = file_system_searcher.GetRelativePath(
system_directory_path_spec
)
if relative_path:
system_directories.append(relative_path.lower())
if system_directories or len(base_path_specs) == 1:
self._file_system_searcher = file_system_searcher
self._file_system = file_system
self._mount_point = mount_point
if self._WINDOWS_SYSTEM_DIRECTORIES.intersection(set(system_directories)):
path_resolver = dfvfs_windows_path_resolver.WindowsPathResolver(
file_system, mount_point
)
# TODO: determine Windows directory based on system directories.
windows_directory = None
for windows_path in self._WINDOWS_DIRECTORIES:
windows_path_spec = path_resolver.ResolvePath(windows_path)
if windows_path_spec is not None:
windows_directory = windows_path
break
if windows_directory:
path_resolver.SetEnvironmentVariable(
"SystemRoot", windows_directory
)
path_resolver.SetEnvironmentVariable("WinDir", windows_directory)
registry_file_reader = (
windows_registry.StorageMediaImageWindowsRegistryFileReader(
file_system, path_resolver
)
)
winregistry = dfwinreg_registry.WinRegistry(
registry_file_reader=registry_file_reader
)
collector = (
environment_variables.WindowsEnvironmentVariablesCollector()
)
self._environment_variables = list(collector.Collect(winregistry))
self._path_resolver = path_resolver
self._windows_directory = windows_directory
self._windows_registry = winregistry
if system_directories:
# TODO: on Mac OS prevent detecting the Recovery volume.
break
self._filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator(
self._artifacts_registry, self._environment_variables, []
)
return True