# encoding: utf-8
__author__ = "Nils Tobias Schmidt"
__email__ = "schmidt89 at informatik.uni-marburg.de"
import ConfigParser
from os.path import abspath, expanduser
import sys
from androlyze.settings.exception import ConfigFileNotFoundError, ConfigError
[docs]class Settings(object):
''' This class is a wrapper around a `ConfigParser` to retrieve and write settings in a more convenient way.
For (over)writing settings, be sure to use the with construct! Otherwise the file handle will not be closed.
Examples
--------
>>> # just for doctest passing tests (file needs to exist)
>>> f = open("test.conf", "w")
>>> f.close()
>>> # the entries
>>> entry = ("SectionName", "Option1")
>>> entry2 = ("SectionName", "Option2")
>>> entry3 = ("SectionName2", "Option1")
>>> # If you want to write some settings use the with construct to close the file after writing!
>>> s = None
>>> try:
... with Settings("test.conf") as s:
... s[entry] = "value1"
... s[entry2] = "value2"
... s[entry3] = "value1"
... except ConfigFileNotFoundError as e:
... pass
... finally:
... print s
Settings: SectionName : [('option1', 'value1'), ('option2', 'value2')], SectionName2 : [('option1', 'value1')]
>>> # read settings
>>> try:
... print s[entry]
... print len(s)
... print entry in s
... for section, items in s:
... print '%s : %s' % (section, items)
... except ConfigFileNotFoundError:
... pass
value1
3
True
SectionName : [('option1', 'value1'), ('option2', 'value2')]
SectionName2 : [('option1', 'value1')]
'''
def __init__(self, config_path, default_path = None, *args, **kwargs):
'''
The file under `config_path` has to exists (at least if no `default_path` has been given).
Parameters
----------
config_path : str
The path to the config file
default_path : str, optional (default is None).
The path to the config file with default values.
None means don't load default values.
Raises
------
ConfigFileNotFoundError
If the config file could not be opened.
'''
super(Settings, self).__init__()
self.__config_parser = ConfigParser.ConfigParser(*args, **kwargs)
self.__config_path = config_path
# file pointer for writing
self.__fp_write = None
# try to open the config files
try:
if default_path is not None:
with open(default_path, "r") as fp:
self.__config_parser.readfp(fp)
try:
with open(config_path, "r") as fp:
self.__config_parser.readfp(fp)
except IOError:
# only throw exception that config has not been found if no default config specified
if default_path is None:
raise
except (IOError,ConfigParser.Error) as e:
raise ConfigFileNotFoundError(abspath(config_path), e), None, sys.exc_info()[2]
def __str__(self):
return 'Settings: %s' % '\n\t'.join(('%s : %s' % (section, key_val_list) for section, key_val_list in self))
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.__config_path)
[docs] def get_config_parser(self):
return self.__config_parser
[docs] def set_config_parser(self, value):
self.__config_parser = value
[docs] def del_config_parser(self):
del self.__config_parser
config_parser = property(get_config_parser, set_config_parser, del_config_parser, "ConfigParser : the parser for the config file")
############################################################
#---With support
############################################################
def __enter__(self):
try:
# open file
self.__fp_write = open(self.__config_path, "w")
return self
except IOError as e:
raise ConfigFileNotFoundError(abspath(self.__config_path), e), None, sys.exc_info()[2]
def __exit__(self, _type, value, traceback):
try:
self.__config_parser.write(self.__fp_write)
except IOError as e:
raise ConfigFileNotFoundError(abspath(self.__config_path), e), None, sys.exc_info()[2]
finally:
# close file pointer
self.__fp_write.close()
############################################################
#---Container magic methods
############################################################
def __len__(self):
''' Count number of entries for each section and sum it up '''
return sum((len(items) for _, items in self))
def __getitem__(self, key, default = 'default_val'):
'''
Get an value from a section and option.
Parameters
----------
key : tuple<str, str>
First argument is the section and the second the option
default : str, optional (default is 'default_val')
If not found, return the `default` value.
If value is the default, an exception will be raised.
You can also use None as a default value.
Raises
------
ConfigError
If `default` not customized.
'''
section, option = key
try:
return self.config_parser.get(section, option)
except ConfigParser.Error as e:
# no default value given
if default == 'default_val':
raise ConfigError('%s: %s' % (e, self.__config_path)), None, sys.exc_info()[2]
return default
def __setitem__(self, key, value):
'''
Set a value for a section and option.
Parameters
----------
key : tuple<str, str>
First argument is the section and the second the option
Raises
------
ConfigError
'''
try:
section, option = key
cp = self.config_parser
# create section if not already present
if not cp.has_section(section):
cp.add_section(section)
self.config_parser.set(section, option, value)
except ConfigParser.Error as e:
raise ConfigError(e), None, sys.exc_info()[2]
def __delitem__(self, key):
'''Delete the option from the section but not the section!.
Parameters
----------
key : tuple<str, str>
First argument is the section and the second the option
Raises
------
ConfigError
'''
section, option = key
try:
self.config_parser.remove_option(section, option)
except ConfigParser.Error as e:
raise ConfigError(e), None, sys.exc_info()[2]
def __iter__(self):
''' Get a generator over the settings.
Returns
-------
generator<tuple<str, list< tuple<str, str> >>>
Generator over settings with the section as first entry and list of tuples with option and value as second.
Raises
------
ConfigError
'''
cp = self.config_parser
try:
section = cp.sections()
for s in section:
yield s, cp.items(s)
except ConfigParser.Error as e:
raise ConfigError(e), None, sys.exc_info()[2]
def __contains__(self, item):
''' Check if the section has the specified option.
Parameters
----------
key : tuple<str, str>
First argument is the section and the second the option
'''
try:
return self[item]
except (ConfigParser.Error, KeyError):
return False
return True
############################################################
#--Other return types
############################################################
[docs] def get_int(self, key, **kwargs):
'''
Get an int value from a section and option.
Parameters
----------
key : tuple<str, str>
First argument is the section and the second the option
Other Parameters
----------------
default : str, optional (default is 'default_val')
If not found, return the `default` value.
If value is the default, an exception will be raised.
You can also use None as a default value.
Raises
------
ConfigError
If `default` not customized.
'''
res = self.__getitem__(key, **kwargs)
# probably default value
if res is None:
return res
return int(res.strip())
[docs] def get_bool(self, key, **kwargs):
'''
Get an boolean value from a section and option.
Parameters
----------
key : tuple<str, str>
First argument is the section and the second the option
Other Parameters
----------------
default : str, optional (default is 'default_val')
If not found, return the `default` value.
If value is the default, an exception will be raised.
You can also use None as a default value.
Raises
------
ConfigError
If `default` not customized.
'''
res = self.__getitem__(key, **kwargs)
if isinstance(res, str):
return res.lower().strip() in ('true', 'y', 'yes')
# probably default value
return res
[docs] def get_list(self, key, **kwargs):
'''
Get an list value from a section and option.
Values have to be comma separated (without " or ').
Parameters
----------
key : tuple<str, str>
First argument is the section and the second the option
Other Parameters
----------------
default : str, optional (default is 'default_val')
If not found, return the `default` value.
If value is the default, an exception will be raised.
You can also use None as a default value.
Raises
------
ConfigError
If `default` not customized.
'''
res = self.__getitem__(key, **kwargs)
if isinstance(res, str):
# split into values and return list
return map(lambda x : x.strip(), res.split(","))
# probably default value
return res
############################################################
#---AndroLyzeLab specific
############################################################
[docs] def get_mongodb_settings(self):
''' Get mongodb settings.
Username and password are set to None if not given '''
import androlyze.settings as s
mongodb_name = self[(s.SECTION_RESULT_DB, s.KEY_RESULT_DB_NAME)]
mongodb_ip = self[(s.SECTION_RESULT_DB, s.KEY_RESULT_DB_IP)]
mongodb_port = self.get_int((s.SECTION_RESULT_DB, s.KEY_RESULT_DB_PORT))
mongodb_username = self.__getitem__((s.SECTION_RESULT_DB, s.KEY_RESULT_DB_AUTH_USERNAME), default = None)
mongodb_passwd = self.__getitem__((s.SECTION_RESULT_DB, s.KEY_RESULT_DB_AUTH_PASSWD), default = None)
mongodb_use_ssl = self.get_bool((s.SECTION_RESULT_DB, s.KEY_RESULT_DB_USE_SSL))
mongodb_ca_cert = self[(s.SECTION_RESULT_DB, s.KEY_RESULT_DB_CA_CERT)]
return mongodb_name, mongodb_ip, mongodb_port, mongodb_username, mongodb_passwd, mongodb_use_ssl, mongodb_ca_cert
[docs] def get_s3_settings(self):
''' Get S3 settings '''
import androlyze.settings as s
aws_access_key_id = self[(s.SECTION_S3_STORAGE, s.KEY_S3_STORAGE_AWS_ACCESS_KEY_ID)]
aws_secret_access_key = self[(s.SECTION_S3_STORAGE, s.KEY_S3_STORAGE_AWS_SECRET_ACCESS_KEY)]
aws_apk_bucket = self[(s.SECTION_S3_STORAGE, s.KEY_S3_STORAGE_AWS_APK_BUCKET)]
aws_s3_host = self.__getitem__((s.SECTION_S3_STORAGE, s.KEY_S3_STORAGE_AWS_HOST_URL), default = None)
return aws_access_key_id, aws_secret_access_key, aws_apk_bucket, aws_s3_host
[docs] def get_apk_storage_engine(self):
''' Get the apk storage engine. See keys settings.APK_STORAGE_ENGINE_* '''
import androlyze.settings as s
return self[(s.SECTION_APK_DISTRIBUTED_STORAGE, s.KEY_APK_STORAGE_ENGINE)]
[docs] def get_celery_broker_ssl_opts(self):
''' Create dictionary which can be directly passed to `BROKER_USE_SSL` celery config '''
from androlyze.settings import SECTION_BROKER, KEY_BROKER_USE_SSL,\
KEY_BROKER_SSL_CA_CERT, KEY_BROKER_SSL_CLIENT_AUTH,\
KEY_BROKER_SSL_CLIENT_KEYFILE, KEY_BROKER_SSL_CLIENT_CERT
import ssl as s
ssl = {}
if self.get_bool((SECTION_BROKER, KEY_BROKER_USE_SSL)):
ssl['ca_certs'] = expanduser(self[(SECTION_BROKER, KEY_BROKER_SSL_CA_CERT)])
ssl['cert_reqs'] = s.CERT_NONE
# client authentication
if self.get_bool((SECTION_BROKER, KEY_BROKER_SSL_CLIENT_AUTH)):
ssl['keyfile'] = expanduser(self[(SECTION_BROKER, KEY_BROKER_SSL_CLIENT_KEYFILE)])
ssl['certfile'] = expanduser(self[(SECTION_BROKER, KEY_BROKER_SSL_CLIENT_CERT)])
ssl['cert_reqs'] = s.CERT_REQUIRED
return ssl
[docs] def script_hash_validation_enabled(self):
# prevent import error
from androlyze.settings import SECTION_ANALYSIS, KEY_ANALYSIS_SCRIPT_HASH_VALIDATION
return self[(SECTION_ANALYSIS, KEY_ANALYSIS_SCRIPT_HASH_VALIDATION)]