# encoding: utf-8
__author__ = "Nils Tobias Schmidt"
__email__ = "schmidt89 at informatik.uni-marburg.de"
from datetime import datetime
from fnmatch import fnmatch
from io import BytesIO
import itertools
import os
from os.path import isfile, join, isdir
import sys
from androlyze.loader.exception import ApkImportError
from androlyze.log import log
from androlyze.model.android.Constants import ANDROID_FILE_EXTENSION
from androlyze.model.android.apk.FastApk import FastApk
from androlyze.storage.exception import StorageException
[docs]class ApkImporter(object):
''' Class for loading/importing a list of apk files '''
def __init__(self, apk_paths, storage):
'''
Parameters
----------
apk_paths: iterable<str>
List of apk files (paths) or directories..
storage : RedundantStorage
The storage to use for importing.
'''
self.__apk_paths = self.get_apks_from_list_or_dir(apk_paths)
self.__storage = storage
[docs] def get_apk_paths(self):
# use absolute paths in iterators
self.__apk_paths, new_it = itertools.tee(self.__apk_paths)
return (os.path.abspath(p) for p in new_it)
[docs] def set_apk_paths(self, value):
self.__apk_paths = value
[docs] def del_apk_paths(self):
del self.__apk_paths
[docs] def get_storage(self):
return self.__storage
[docs] def set_storage(self, value):
self.__storage = value
[docs] def del_storage(self):
del self.__storage
storage = property(get_storage, set_storage, del_storage, "StorageInterface : storage for audit results of apks")
apk_paths = property(get_apk_paths, set_apk_paths, del_apk_paths, "iterable<str> : List of apk files (paths)")
[docs] def import_apks(self, copy_apk = False, copy_to_mongodb = False, update = False, tag = None):
''' Import APKs.
Create a storage entry and copy the apk if `copy_apk` and not already in the storage.
Will also set the path of the `Apk`s (the directory to which it got imported)
at least if `copy_apk` is true.
Parameters
----------
copy_apk : bool, optional (default is False)
If true also import the apk file (copy it)
copy_to_mongodb : bool, optional (default is False)
Also import into MongoDB. Needed for the distributed analysis.
update : bool, optional (default is False)
Update apks that have already been imported.
tag : str, optional (default is None)
Tag the apks.
Returns
-------
generator<Apk>
The imported `Apk`s. Even if you don't want the result value,
you have to force the generator to continue until it's empty to get the import process finished.
'''
# get apks from directory
apk_gen = self.apk_paths
for apk_abs_path in apk_gen:
try:
with open(apk_abs_path, "rb") as apk_fh:
# file-like object in memory
# avoids double loading of file (for hashing and copying)
apk_in_memory = BytesIO(apk_fh.read())
apk = self.import_from_flo(apk_in_memory, apk_abs_path,
# copy apk
copy2disk=copy_apk, copy2mongodb=copy_to_mongodb,
update = update, tag = tag)
yield apk
except ApkImportError as e:
log.warn(e)
[docs] def import_from_flo(self, file_like_object, import_path_str = "file-like object", copy2disk = False, copy2mongodb = False, update = False, tag = None):
'''
Import an apk from a `file_like_object` if not already in the storage.
Will also set the path (absolute) of the returned `Apk` (the directory to which it got imported)
at least if `copy2disk` is true.
Also sets the import date and tag.
Parameters
----------
file_like_object
import_path_str : str, optional (default is "file-like object")
Optional string which will be passed to the Exceptions if they get raised.
Describes from which the import failed.
copy2disk : bool, optional (default is False)
If true also import the apk file (copy it)
copy2mongodb : bool, optional (default is False)
Also import into MongoDB. Needed for the distributed analysis.
update : bool, optional (default is False)
Update apks that have already been imported.
tag : str, optional (default is None)
Tag the apks.
Raises
------
ApkImportError
Returns
-------
Apk
If no error occurred.
'''
try:
apk = FastApk.fast_load_from_io(file_like_object, import_path_str, calculate_hash = True)
storage = self.storage
def set_apk_meta(apk):
apk.import_date = datetime.utcnow()
apk.tag = tag
# set apk meta
set_apk_meta(apk)
# set import path as new path for apk file
# needed to have the correct path when creating entry !
# copy to disk and/or db
_id, file_path = storage.copy_apk(apk, file_like_object, copy2db = copy2mongodb, copy2fs = copy2disk)
# set path where file has been copied to
# otherwise use supplied path
if copy2disk:
apk.path = file_path
# create entry in storage
storage.create_entry_for_apk(apk, update, tag)
return apk
except (StorageException, IOError) as e:
raise ApkImportError(e), None, sys.exc_info()[2]
@staticmethod
[docs] def get_apks_from_dir(apk_dir):
'''
Get a list of apk files from the given directory (recursive).
Parameters
----------
apk_dir: str
the directory from which to import the apk files
Returns
-------
generator<str>
Apk file names
'''
for dirpath, _, filenames in os.walk(apk_dir):
filenames.sort()
for _file in filenames:
if ApkImporter.is_apk_file(_file):
yield join(dirpath, _file)
@staticmethod
[docs] def get_apks_from_list(apk_list):
'''
Filter the files in the list by checking if they have the APK file extension.
Parameters
----------
apk_paths : list<str>
List of APK files
Returns
-------
itertools.ifilter<str>
Generator over apk files
'''
return itertools.ifilter(ApkImporter.is_apk_file, apk_list)
@staticmethod
[docs] def get_apks_from_list_or_dir(apks):
''' Get a list of apk files from directory or list by checking the .apk file extension.
Parameters
----------
apks: generator<str>
List of directories or apk files.
Returns
-------
generator<str>
Path to Apks.
Examples
--------
>>> from androlyze.loader.ApkImporter import ApkImporter
>>> ApkImporter.get_apks_from_list_or_dir["foo.apk", "bar.apk"]
>>> ApkImporter.get_apks_from_list_or_dir["foo/", "bar/"]
>>> ApkImporter.get_apks_from_list_or_dir["foo.apk", "apk_dir/"]
'''
generators = []
for apk in apks:
apk_gen = None
if isfile(apk):
apk_gen = ApkImporter.get_apks_from_list([apk])
elif isdir(apk):
apk_gen = ApkImporter.get_apks_from_dir(apk)
if apk_gen is not None:
generators.append(apk_gen)
return itertools.chain(*generators)
@staticmethod
[docs] def is_apk_file(filename):
''' Check if the `filename` has the .apk extension '''
return fnmatch(filename, "*." + ANDROID_FILE_EXTENSION)