# Copyright © 2015-2018 STRG.AT GmbH, Vienna, Austria
#
# This file is part of the The SCORE Framework.
#
# The SCORE Framework and all its parts are free software: you can redistribute
# them and/or modify them under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation which is in the
# file named COPYING.LESSER.txt.
#
# The SCORE Framework and all its parts are distributed without any WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. For more details see the GNU Lesser General Public
# License.
#
# If you have not received a copy of the GNU Lesser General Public License see
# http://www.gnu.org/licenses/.
#
# The License-Agreement realised between you as Licensee and STRG.AT GmbH as
# Licenser including the issue of its valid conclusion and its pre- and
# post-contractual effects is governed by the laws of Austria. Any disputes
# concerning this License-Agreement including the issue of its valid conclusion
# and its pre- and post-contractual effects are exclusively decided by the
# competent court, in whose district STRG.AT GmbH has its registered seat, at
# the discretion of STRG.AT GmbH also the competent court, in whose district the
# Licensee has his registered seat, an establishment or assets.
import abc
import configparser
import importlib
from inspect import signature, Parameter
import logging
import pkgutil
import sys
from .config import parse_list, parse_config_file
from .exceptions import InitializationError, ConfigurationError
from .dependency import DependencySolver
from collections import OrderedDict
from types import ModuleType
log = logging.getLogger(__name__)
[docs]def init(confdict, *, overrides={}, init_logging=True, finalize=True):
"""
This function automates the process of initializing all other modules. It
will operate on given *confdict*, which is expected to be a
:class:`configparser.ConfigParser` object or a 2-dimensional `dict` mapping
names of modules to their respective :term:`confdicts <confdict>`. The
recommended way of acquiring such a confdict is through
:func:`.parse_config_file`, but any 2-dimensional `dict` is fine.
The *confdict* should also contain the configuration for this module, which
interprets the configuration key ``modules`` (which should be accessible as
``confdict['score.init']['modules']``):
:confkey:`modules`
A list of module names that shall be initialized. If this value is
missing, you will end up with an empty :class:`.ConfiguredScore` object.
The provided *overrides* will be integrated into the actual *confdict*
prior to initialization. While the confdict is assumed to be retrieved from
external resources (like a configuration file), this parameter aims to make
programmatic adjustment of the configuration a bit easier.
The final parameter *init_logging* makes sure python's own logging
facility is initialized with the provided configuration, too.
This function returns a :class:`.ConfiguredScore` object.
"""
if init_logging and 'formatters' in confdict:
import logging.config
# the fileConfig() function below expects a RawConfigParser instance;
# this function, however, has no such limitation -> convert the confdict
# if it is not an object of that type
if isinstance(confdict, configparser.RawConfigParser):
parser = confdict
else:
parser = configparser.RawConfigParser()
parser.read_dict(confdict)
logging.config.fileConfig(parser, disable_existing_loggers=False)
if isinstance(confdict, configparser.RawConfigParser):
_confdict = OrderedDict()
for section in confdict:
_confdict[section] = OrderedDict()
for k, v in confdict[section].items():
_confdict[section][k] = v
else:
_confdict = confdict
for section in overrides:
if section not in _confdict:
_confdict[section] = OrderedDict()
for key, value in overrides[section].items():
_confdict[section][key] = value
try:
paths = _confdict['score.init']['autoimport']
except KeyError:
pass
else:
_perform_autoimport(parse_list(paths))
return _init(_confdict, finalize)
def _perform_autoimport(paths):
if isinstance(paths, str):
return _perform_autoimport([paths])
for path in paths:
__import__(path)
module = sys.modules.get(path)
try:
module.__path__
except AttributeError:
# not a package
continue
for importer, modname, ispkg in pkgutil.walk_packages(module.__path__):
if modname[0] == '_':
continue
if ispkg:
_perform_autoimport('%s.%s' % (path, modname))
else:
__import__('%s.%s' % (path, modname))
def _init(confdict, finalize=True):
try:
modconf = parse_list(confdict['score.init']['modules'])
except KeyError:
# TODO: issue a warning through the warnings module
return ConfiguredScore(confdict, dict(), dict())
modules, dependency_aliases = _collect_modules(modconf)
dependency_map = _collect_dependencies(modules, dependency_aliases)
initialized = dict()
sorted_aliases = _sort_modules(
dependency_map, dependency_aliases, 'initialization')
for alias in sorted_aliases:
modname = modules[alias]
module_dependencies = dependency_map[alias]
modconf = OrderedDict()
if alias in confdict:
modconf = confdict[alias]
for key in confdict:
if key.startswith('%s:' % alias):
key_prefix = key[len(alias)+1:] + '.'
modconf.update((key_prefix + k, v)
for k, v in confdict[key].items())
kwargs = {}
for dep in module_dependencies:
try:
dependency_alias = dependency_aliases[alias][dep]
except KeyError:
kwargs[dep] = initialized[dep]
else:
kwargs[dep] = initialized[dependency_alias]
log.debug('Initializing %s as %s' % (modname, alias))
conf = importlib.import_module(modname).init(modconf, **kwargs)
if not isinstance(conf, ConfiguredModule):
raise InitializationError(
__package__,
'%s initializer did not return ConfiguredModule but %s' %
(alias, repr(conf)))
initialized[alias] = conf
score = ConfiguredScore(confdict, initialized, dependency_aliases)
if finalize:
score._finalize()
return score
[docs]def init_from_file(file, *, overrides={}, init_logging=True):
"""
Reads configuration from given *file* using
:func:`.config.parse_config_file` and initializes score using :func:`.init`.
See the documentation of :func:`.init` for a description of all keyword
arguments.
"""
return init(parse_config_file(file, return_configparser=init_logging),
overrides=overrides,
init_logging=init_logging)
[docs]def init_logging_from_file(file):
"""
Just the part of :func:`.init_from_file` that would initialize logging.
"""
import logging.config
confdict = parse_config_file(file, return_configparser=True)
if 'formatters' in confdict:
logging.config.fileConfig(confdict, disable_existing_loggers=False)
def _collect_modules(modconf):
modules = OrderedDict()
dependency_aliases = {}
for line in modconf:
parts = line.split(':', 2)
if len(parts) == 2:
module, alias = parts
elif '.' in line:
module = line
alias = line[line.rindex('.') + 1:]
else:
module = alias = line
if '(' in alias:
assignments = alias[alias.index('('):].strip(' ()').split(',')
alias = alias[:alias.index('(')].strip()
if '(' in module:
module = module[:module.index('(')].strip()
dependency_aliases[alias] = {}
for assignment in assignments:
key, value = assignment.split('=')
dependency_aliases[alias][key.strip()] = value.strip()
modules[alias] = module
return modules, dependency_aliases
def _import(module_name):
"""
Will import and return a python package with given *module_name*. If the
module cannot be found, this function will instead return `None`.
"""
try:
return importlib.import_module(module_name)
except ImportError as e:
if e.name != module_name:
raise
return None
def _collect_dependencies(modules, dependency_aliases):
missing = []
dependency_map = {}
for alias, modname in modules.items():
if modname == 'score.init':
continue
module = _import(modname)
if not module:
missing.append(modname)
continue
if not hasattr(module, 'init'):
raise InitializationError(
__package__,
'Cannot initialize %s: it has no init() function' % modname)
if not callable(module.init):
raise InitializationError(
__package__,
'Cannot initialize %s: its init is not a function' % modname)
module_dependencies = []
sig = signature(module.init)
for i, (param_name, param) in enumerate(sig.parameters.items()):
if i == 0:
# this should be the confdict
continue
module_dependencies.append(
(param_name, param.default != Parameter.empty))
dependency_map[alias] = module_dependencies
if missing:
raise ConfigurationError(
__package__,
'Could not find the following modules:\n - ' +
'\n - '.join(missing))
_remove_missing_optional_dependencies(
modules, dependency_map, dependency_aliases)
return dependency_map
def _remove_missing_optional_dependencies(modules, dependency_map,
dependency_aliases):
missing = {}
for alias, module_dependencies in dependency_map.items():
newdeps = []
for dependency, is_optional in module_dependencies:
try:
dependency_alias = dependency_aliases[alias][dependency]
except KeyError:
dependency_alias = dependency
if dependency_alias in modules:
newdeps.append(dependency)
continue
if is_optional:
continue
if dependency not in missing:
missing[dependency] = []
missing[dependency].append(alias)
dependency_map[alias] = newdeps
if not missing:
return
msglist = []
for dependency, dependants in missing.items():
msglist.append('%s (required by %s)' %
(dependency, ', '.join(dependants)))
raise ConfigurationError(
__package__,
'Could not find the following dependencies:\n - ' +
'\n - '.join(msglist))
def _sort_modules(dependency_map, dependency_aliases, operation):
depsolv = DependencySolver()
for alias, module_dependencies in dependency_map.items():
depsolv.add(alias)
for dep in module_dependencies:
if dep == 'score':
continue
if alias in dependency_aliases and dep in dependency_aliases[alias]:
dep = dependency_aliases[alias][dep]
depsolv.add(alias, dep)
return depsolv.solve()