Source code for score.init.config.helpers

# Copyright © 2015-2018 STRG.AT GmbH, Vienna, Austria
#
# This file is part of the The SCORE Framework.
#
# The SCORE Framework and all its parts are free software: you can redistribute
# them and/or modify them under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation which is in the
# file named COPYING.LESSER.txt.
#
# The SCORE Framework and all its parts are distributed without any WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. For more details see the GNU Lesser General Public
# License.
#
# If you have not received a copy of the GNU Lesser General Public License see
# http://www.gnu.org/licenses/.
#
# The License-Agreement realised between you as Licensee and STRG.AT GmbH as
# Licenser including the issue of its valid conclusion and its pre- and
# post-contractual effects is governed by the laws of Austria. Any disputes
# concerning this License-Agreement including the issue of its valid conclusion
# and its pre- and post-contractual effects are exclusively decided by the
# competent court, in whose district STRG.AT GmbH has its registered seat, at
# the discretion of STRG.AT GmbH also the competent court, in whose district the
# Licensee has his registered seat, an establishment or assets.

import warnings
import collections
import configparser
import importlib
import json
import os
import re
import datetime
from collections import OrderedDict


[docs]def parse_bool(value): """ Converts a string value to a boolean. This function will accept the same strings as the default configuration of python's :mod:`configparser` module. """ if not isinstance(value, str): return bool(value) try: return configparser.ConfigParser.BOOLEAN_STATES[value.lower()] except KeyError: raise ValueError('"%s" does not describe a boolean' % value)
time_interval_multipliers = { 'ms': 0.001, 'millisecond': 0.001, 'milliseconds': 0.001, 's': 1, 'sec': 1, 'second': 1, 'seconds': 1, 'm': 60, 'min': 60, 'minute': 60, 'minutes': 60, 'h': 60 * 60, 'hour': 60 * 60, 'hours': 60 * 60, 'd': 60 * 60 * 24, 'day': 60 * 60 * 24, 'days': 60 * 60 * 24, }
[docs]def parse_datetime(value): """ Returns the given value as a `datetime` without timezone information. The following formats are currently supported: * ``YYYY-MM-DD HH:II`` * ``YYYY-MM-DD HH:II:SS`` * ``YYYY-MM-DD HH:II:SS.microsecond`` * timestamp """ if re.match(r'^\d+$', value): return datetime.datetime.fromtimestamp(int(value)) regex = re.compile( r''' ^\s* (?P<year> \d{4}) \s*-\s* (?P<month> \d{2}) \s*-\s* (?P<day> \d{2}) \s+ (?P<hour> \d{2}) \s*:\s* (?P<minute> \d{2}) (\s*:\s* (?P<second> \d{2}))? (\. (?P<micros> \d+))? \s*$ ''', re.VERBOSE) match = regex.match(value) if not match: raise ValueError('"%s" does not describe a valid datetime' % value) return datetime.datetime( int(match.group('year')), int(match.group('month')), int(match.group('day')), int(match.group('hour')), int(match.group('minute')), int(match.group('second')) if match.group('second') else 0, int(match.group('micros')) if match.group('micros') else 0, )
[docs]def parse_time_interval(value): """ Converts a human readable time interval string to a float in seconds. >>> parse_time_interval('3s') 3.0 >>> parse_time_interval('5 milliseconds') 0.005 >>> parse_time_interval('1 minute') 60.0 >>> parse_time_interval('2 hours') 7200.0 >>> parse_time_interval('365days') 31536000.0 """ if value == 0: return value value = value.strip() if value == '0': return float(0) match = re.match(r'^(\d+)\s*([a-z]+)$', value.lower()) if match is None or match.group(2) not in time_interval_multipliers: raise ValueError('"%s" does not describe a valid time interval' % value) return float(match.group(1)) * time_interval_multipliers[match.group(2)]
[docs]def parse_dotted_path(value): """ Converts a dotted python path to the denoted object. The following will return the :func:`randint` function from the :mod:`random` module, for example:: parse_dotted_path('random.randint') """ if not isinstance(value, str): return value if '.' not in value: raise ValueError('"%s" does not describe a valid dotted path' % value) module, classname = value.rsplit('.', 1) return getattr(importlib.import_module(module), classname)
[docs]def parse_call(value, args=tuple(), kwargs={}): """ Parses a string containing a function call or an object construction. The given *value* is expected to call the path to a python object (as interpreted by :func:`parse_dotted_path`), followed by an opening parenthesis, arguments and keywords separated by commas, and a closing parenthesis. This will look a lot like real python code, but the actual invocation will be enriched with the *args* and *kwargs* given to this function. If this function is invoked like the following ... >>> parse_call('foo.Test(3, ovr=b)', (1, 2), kwargs={'ovr': 'c', 'bar': 4}) ... it will invoke and return the result of the following code: >>> foo.Test(1, 2, '3', ovr='b', bar=4) Note that all arguments (and keyword arguments) in the *value* string will be passed as strings! """ if '(' not in value: cls = value else: args = list(args) kwargs = kwargs.copy() cls, argstr = value.rstrip(')').split('(') if argstr: for argstr in map(lambda s: s.strip(), argstr.split(',')): if '=' in argstr: k, v = argstr.split('=') kwargs[k] = v else: args.append(argstr) cls = parse_dotted_path(cls) return cls(*args, **kwargs)
[docs]def parse_list(value): """ Converts a string value to a corresponding list of strings. Substrings are assumed to be delimited by newline characters. """ if isinstance(value, list): return value return list(filter(None, (part.strip() for part in value.split('\n'))))
[docs]def parse_host_port(value, fallback=None): """ Extracts a host and a port definition from given *value*. Valid values are: - hostname - hostname:port The return value will be a 2-tuple containing the hostname and the port. If the given *value* is empty, or contains no port definition, these values can be dropped in from a give *fallback* value, which can have the same format as defined for the first parameter (a `str`), or the same format as the return value (a `tuple`). The following call would return ``('example.com', 5109)``: >>> parse_host_port('example.com', 'localhost:5109') """ if not value: if isinstance(fallback, str): return parse_host_port(fallback) parts = value.split(':') if len(parts) == 1: if isinstance(fallback, str): fallback = parse_host_port(fallback) if fallback and len(fallback) > 1: parts.append(fallback[1]) if len(parts) < 2: raise ValueError('Missing port definition') return parts[0], int(parts[1])
[docs]def parse_object(confdict, key, args=tuple(), kwargs={}): """ Creates an object from a :term:`confdict`. This function either expects a string accepted by :func:`parse_call`, or a more verbose and flexible configuration. If the given value in the *confdict* contains an opening parenthesis, it is assumed to be in the terse format, in which case it will be parsed by parse_call. In any other case, the function assumes that the *confdict* contains a class name under the specified *key*, which will be parsed using :func:`parse_dotted_path`. The function will then extract all other keys from *confdict* that start with the same *key*, process them and pass the resulting `dict` to the class's contructor as keyword arguments. It is possible to provide additional arguments to the constructor as *args*. The aforementioned processing phase will replace all multi-line values with arrays using :func:`.parse_list`. For example, the following configuration:: versionmanager = score.webassets.versioning.Mercurial versionmanager.folder = /usr/share/versionmanager versionmanager.repos = /var/www/project /var/www/library1 /var/www/library2 will invoke the constructor like this:: Mercurial(folder="/usr/share/versionmanager", repos=[ "/var/www/project", "/var/www/library1", "/var/www/library2", ]) """ if key not in confdict: raise ValueError('"%s" not found in confdict' % key) if '(' in confdict[key]: return parse_call(confdict[key], args, kwargs) cls = parse_dotted_path(confdict[key]) kwargs = kwargs.copy() for key, value in extract_conf(confdict, key + '.').items(): if '\n' in value: value = parse_list(value) kwargs[key] = value return cls(*args, **kwargs)
[docs]def parse_json(value): """ Converts a string value to a python object. Currently this function is just a wrapper around :func:`json.loads`. """ return json.loads(value, object_pairs_hook=collections.OrderedDict)
def init_object(*args, **kwargs): """ Backward-compatibility for :func:`.parse_object`. """ warnings.warn('The function init_object was renamed to parse_object', DeprecationWarning, stacklevel=2) return parse_object(*args, **kwargs)
[docs]def init_cache_folder(confdict, key, autopurge=False): """ Initializes the cache folder described in the :term:`confdict` with the given *key*. This function will make sure that the folder exists and is writable and return its absolute path. If *autopurge* is `True`, it will further write the whole *confdict* into a file called :file:`__conf__` in the folder to detect changes to the *confdict*. If the function thus detects a confdict change during the next initialization, it will delete the contents of the folder, assuming that its contents have become obsolete. """ if key not in confdict: raise ValueError('"%s" not found in confdict' % key) folder = confdict[key] os.makedirs(folder, exist_ok=True) folder = os.path.realpath(folder) if not os.access(folder, os.R_OK | os.W_OK): raise ValueError( 'Configured cache folder "%s" is not writable' % folder) if not autopurge: return folder confdict = OrderedDict(confdict.items()) del confdict[key] confitems = list(confdict.items()) confitems.sort() confstr = str(confitems) conffile = os.path.join(folder, '__conf__') try: oldconfstr = open(conffile, 'r').read() except OSError: pass else: if confstr != oldconfstr: for root, dirs, files in os.walk(folder, topdown=False): for name in files: os.remove(os.path.join(root, name)) for name in dirs: os.rmdir(os.path.join(root, name)) open(conffile, 'w').write(confstr) return folder
[docs]def extract_conf(configuration, prefix, defaults=dict()): """ This function can be used to extract :term:`confdict` values with a given *prefix*. When called with the *prefix* ``spam.``, for example, it will return all values in the *confdict* that start with that string. If a *defaults* `dict` is present, it will be used as the base for the return value. >>> defaults = { ... 'eggs': 'Spam and eggs', ... } >>> conf = { ... 'spam.eggs': 'Eggs with Spam!', ... 'spam.bacon.eggs': 'Spam, bacon and eggs', ... 'bacon.spam': 'Bacon and Spam' ... } >>> extract_conf(conf, 'spam.', defaults) {'eggs': 'Eggs with Spam!', 'bacon.eggs': 'Spam, bacon and eggs'} """ conf = OrderedDict(defaults.items()) for key, value in configuration.items(): if key.startswith(prefix): conf[key[len(prefix):]] = value return conf