Source code for androlyze.analyze.parallel.ParallelAnalyzer


# encoding: utf-8

__author__ = "Nils Tobias Schmidt"
__email__ = "schmidt89 at informatik.uni-marburg.de"

from multiprocessing import JoinableQueue as Queue
from multiprocessing import cpu_count
import os
import signal

from androlyze.analyze import AnalyzeUtil
from androlyze.analyze.BaseAnalyzer import BaseAnalyzer
from androlyze.analyze.parallel import STOP_SENTINEL
from androlyze.analyze.parallel.AnalysisStatsView import AnalysisStatsView
from androlyze.analyze.parallel.Worker import Worker
from androlyze.log.Log import log
from androlyze.util import Util


[docs]class ParallelAnalyzer(BaseAnalyzer): ''' Parallel analyzer which uses the `multiprocessing` module. ''' def __init__(self, storage, script_list, script_hashes, min_script_needs, apks_or_paths, concurrency = None): ''' See :py:method`.BaseAnalyzer.__init__` for details on the first attributes. Other Parameters ---------------- concurrency : int, optional (default is number of cpu cores) Number of workers to spawn. ''' super(ParallelAnalyzer, self).__init__(storage, script_list, script_hashes, min_script_needs, apks_or_paths) # parallelization parameters if concurrency is None: concurrency = cpu_count() self.__concurrency = concurrency log.info("concurrency: %s", self.concurrency) log.info("Using processes") # parallel stuff, concerning processes self.__work_queue = Queue() self.__work_queue.cancel_join_thread() self.__workers = [] self.__analyzed_apks = Queue()
[docs] def get_analyzed_apks(self): return self.__analyzed_apks
[docs] def set_analyzed_apks(self, value): self.__analyzed_apks = value
[docs] def del_analyzed_apks(self): del self.__analyzed_apks
[docs] def get_work_queue(self): return self.__work_queue
[docs] def get_concurrency(self): return self.__concurrency
[docs] def get_workers(self): return self.__workers
[docs] def set_workers(self, value): self.__workers = value
[docs] def del_workers(self): del self.__workers
analyzed_apks = property(get_analyzed_apks, set_analyzed_apks, del_analyzed_apks, "Queue<FastAPK> : Yet analyzed APKs") concurrency = property(get_concurrency, None, None, "int : Number of workers to spawn.") workers = property(get_workers, set_workers, del_workers, "list<Worker> : List of workers.") work_queue = property(get_work_queue, None, None, "Queue<str> : Queue with paths to apks which shall be analyzed.") def _analyze(self): ''' See doc of :py:method:BaseAnalyzer.analyze`. ''' try: work_queue = self.work_queue # create worker pool log.debug("starting %s workers ...", self.concurrency) for _ in range(self.concurrency): p = Worker(self.script_list, self.script_hashes, self.min_script_needs, work_queue, self.storage, self.cnt_analyzed_apks, self.analyzed_apks, self.storage_results) self.workers.append(p) p.daemon = True # start workers for p in self.workers: p.start() # queue has size limit -> start workers first then enqueue items log.info("Loading apk paths into work queue ...") for apk_stuff in AnalyzeUtil.apk_gen(self.apks_or_paths): # task is apk with all scripts work_queue.put(apk_stuff) for _ in range(self.concurrency): # signal end-of-work work_queue.put(STOP_SENTINEL) # progress view for cli av = AnalysisStatsView(self.cnt_analyzed_apks, self._cnt_apks, self.analyzed_apks) av.daemon = True av.start() # block until workers finished work_queue.join() av.terminate() log.debug("joined on work queue ...") return self.cnt_analyzed_apks.value # try hot shutdown first except KeyboardInterrupt: log.warn("Hot shutdown ... ") try: log.warn("clearing work queue ... ") Util.clear_queue(work_queue) log.warn("cleared work queue ... ") for _ in range(self.concurrency): # signal end-of-work work_queue.put(STOP_SENTINEL) for worker in self.workers: worker.join() log.warn("waited for all workers ... ") return self.cnt_analyzed_apks.value # if user really wants make a cold shutdown -> kill processes except KeyboardInterrupt: log.warn("Cold shutdown ... ") log.warn("Hard shutdown wanted! Killing all workers!") # kill processes via SIGINT -> send CTRL-C for w in self.workers: try: os.kill(w.pid, signal.SIGINT) except: pass return self.cnt_analyzed_apks.value