Source code for score.es._init

# Copyright © 2015-2017 STRG.AT GmbH, Vienna, Austria
#
# This file is part of the The SCORE Framework.
#
# The SCORE Framework and all its parts are free software: you can redistribute
# them and/or modify them under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation which is in the
# file named COPYING.LESSER.txt.
#
# The SCORE Framework and all its parts are distributed without any WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. For more details see the GNU Lesser General Public
# License.
#
# If you have not received a copy of the GNU Lesser General Public License see
# http://www.gnu.org/licenses/.
#
# The License-Agreement realised between you as Licensee and STRG.AT GmbH as
# Licenser including the issue of its valid conclusion and its pre- and
# post-contractual effects is governed by the laws of Austria. Any disputes
# concerning this License-Agreement including the issue of its valid conclusion
# and its pre- and post-contractual effects are exclusively decided by the
# competent court, in whose district STRG.AT GmbH has its registered seat, at
# the discretion of STRG.AT GmbH also the competent court, in whose district the
# Licensee has his registered seat, an establishment or assets.

from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import NotFoundError
from score.init import ConfiguredModule, parse_list, parse_bool, extract_conf
from sqlalchemy import event
from time import time
import inspect
import logging
from functools import partial


log = logging.getLogger(__name__)

defaults = {
    'ctx.member': 'es',
    'keep_source': False,
}


[docs]def init(confdict, db, ctx=None): """ Initializes this module acoording to :ref:`our module initialization guidelines <module_initialization>` with the following configuration keys: :confkey:`args.hosts` A list of hosts (as read by :func:`score.init.parse_list`) to pass to the :class:`Elasticsearch <elasticsearch.Elasticsearch>` constructor. :confkey:`args.*` Any other arguments to be passed to the :class:`Elasticsearch <elasticsearch.Elasticsearch>` constructor. :confkey:`index` :confdefault:`score` The index to use in all operations. :confkey:`keep_source` :confdefault:`False` Whether the `_source` field should be enabled. The default is `False`, since the canonical representation of all objects are to be found in the score.db database and should be retrieved from there. :confkey:`ctx.member` :confdefault:`es` The name of the :term:`context member`, that should be registered with the configured :mod:`score.ctx` module (if there is one). The default value allows one to conveniently query the index: >>> for knight in ctx.es.query(User, 'name:sir*') ... print(knight.name) """ conf = defaults.copy() conf.update(confdict) kwargs = extract_conf(conf, 'args.') if 'hosts' in kwargs: kwargs['hosts'] = parse_list(kwargs['hosts']) if 'verify_certs' in kwargs: kwargs['verify_certs'] = parse_bool(kwargs['verify_certs']) if 'use_ssl' in kwargs: kwargs['use_ssl'] = parse_bool(kwargs['use_ssl']) es = Elasticsearch(**kwargs) if 'index' not in conf: conf['index'] = 'score' keep_source = parse_bool(conf['keep_source']) es_conf = ConfiguredEsModule(db, es, conf['index'], keep_source) _register_flush_listeners(db, es_conf) if ctx and conf['ctx.member'] not in (None, 'None'): ctx.register(conf['ctx.member'], lambda ctx: CtxProxy(es_conf, ctx)) return es_conf
def _register_flush_listeners(db, es_conf): to_insert = [] to_delete = [] @event.listens_for(db.Session, 'before_flush') def before_flush(session, flush_context, instances): """ Stores the list of new and altered objects in ``to_insert``, and deleted objects in ``to_delete``. The actual storing can only be done *after* the flush operation (in ``after_flush``, below), since new objects don't have an id at this point. But we cannot move the whole logic into the ``after_flush``, since we might miss the optional *instances* argument to this function. """ def relevant(obj): """ Checks if the given object *obj* is to be handled during this database flush. The criteria are simple: - The object must have a matching score.es configuration (i.e. a :term:`top-most es class`) and - it must be in the list of flushed *instances* (if the list is non-empty) """ return (es_conf.get_es_class(obj) and (not instances or obj in instances)) nonlocal to_insert, to_delete # new objects to_insert = [obj for obj in session.new if relevant(obj)] # modified objects that have actual alterations, as described in the # sqlalchemy docs to Session.dirty: # http://docs.sqlalchemy.org/en/latest/orm/session_api.html#sqlalchemy.orm.session.Session.dirty to_insert.extend(obj for obj in session.dirty if relevant(obj) and session.is_modified(obj)) # deleted objects to_delete = [obj for obj in session.deleted if relevant(obj)] @event.listens_for(db.Session, 'after_flush') def after_flush(session, flush_context): for obj in to_insert: es_conf.insert(obj) for obj in to_delete: es_conf.delete(obj)
[docs]class ConfiguredEsModule(ConfiguredModule): """ This module's :class:`configuration class <score.init.ConfiguredModule>`. """ def __init__(self, db, es, index, keep_source): self.db = db self.es = es self.index = index self.keep_source = keep_source self._converters = {}
[docs] def insert(self, object_): """ Inserts an *object_* into the index. """ body = self._object2json(object_) doc_type = body['_type'] del body['_id'] del body['_type'] self.es.index( index=self.index, doc_type=doc_type, body=body, id=object_.id)
def _object2json(self, object_): """ Converts given *object_* to the JSON representation required for indexing. """ cls = object_.__class__ if cls not in self._converters: self._converters[cls] = self._mkconverter(cls) return self._converters[cls](object_) def _mkconverter(self, cls): """ Generates a function for efficiently converting an object of given class *cls* to its json representation as returned by :meth:`._object2json`. """ es_cls = self.get_es_class(cls) bodytpl = { 'class': [], 'concrete_class': cls.__score_db__['type_name'], '_type': es_cls.__score_db__['type_name'], } getters = {} while cls: bodytpl['class'].append(cls.__score_db__['type_name']) if hasattr(cls, '__score_es__'): for member in cls.__score_es__: if member in bodytpl: continue converter = None if '__convert__' in cls.__score_es__[member]: converter = cls.__score_es__[member]['__convert__'] getters[member] = self.__mkmembergetter(member, converter) if cls == es_cls: break cls = cls.__score_db__['parent'] def converter(object_): body = bodytpl.copy() body['_id'] = object_.id for member, getter in getters.items(): body[member] = getter(object_) return body return converter def __mkmembergetter(self, member, converter=None): """ Helper function for _mkconverter: Will return a function that retrieves a member value and optionally converts it with given converter. """ if converter is None: return lambda object_: getattr(object_, member) if len(inspect.getargspec(converter).args) == 2: def getter(object_): return converter(getattr(object_, member), object_) else: def getter(object_): return converter(getattr(object_, member)) return getter
[docs] def delete(self, object_): """ Removes an *object_* from the index. """ es_cls = self.get_es_class(object_) try: self.es.delete( index=self.index, doc_type=es_cls.__score_db__['type_name'], id=object_.id) except NotFoundError: pass
[docs] def query(self, ctx, class_, query, *, analyze_wildcard=False, offset=0, limit=10): """ Executes a lucene *query* on the index and yields a list of objects of given *class_*, retrieved from the database. It is also possible to provide multiple classes, in which case the same query will be performed on `multiple types at once`_. The *query* can be provided as a string, or as a `query DSL`_. The parameter *analyze_wildcard* wildcard is passed to :meth:`elasticsearch.Elasticsearch.search`, whereas *offset* and *limit* are mapped to *from_* and *size* respectively. .. _query DSL: http://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html .. _multiple types at once: https://www.elastic.co/guide/en/elasticsearch/guide/master/multi-index-multi-type.html """ if isinstance(class_, type): classes = [class_] else: classes = class_ doctypes = [] doctype2class = {} for class_ in classes: typename = self.get_es_class(class_).__score_db__['type_name'] doctype2class[typename] = class_ doctypes.append(typename) kwargs = { 'index': self.index, 'analyze_wildcard': analyze_wildcard, 'fields': '_id', 'doc_type': ','.join(doctypes), 'from_': offset, 'size': limit, } if isinstance(query, str): kwargs['q'] = query else: kwargs['body'] = {'query': query} session = getattr(ctx, self.db.ctx_member) result = self.es.search(**kwargs) current_class = None ids = [] for hit in result['hits']['hits']: class_ = doctype2class[hit['_type']] if class_ != current_class: if current_class: yield from session.by_ids(current_class, ids) current_class = class_ ids = [] ids.append(int(hit['_id'])) yield from session.by_ids(current_class, ids)
[docs] def get_es_class(self, object_): """ Returns the :term:`top-most es class` of an *object_*, which must either be a database class, or an object thereof. """ if not isinstance(object_, type): cls = object_.__class__ else: cls = object_ if not hasattr(self, '_es_classes'): self._es_classes = {} if cls in self._es_classes: return self._es_classes[cls] initial_class = cls result = None if hasattr(cls, '__score_es__'): result = cls while cls.__score_db__['parent']: if hasattr(cls, '__score_es__'): result = cls cls = cls.__score_db__['parent'] self._es_classes[initial_class] = result return result
[docs] def classes(self): """ Provides a list of :term:`top-most classes <top-most es class>` with a __score_es__ declaration. """ if hasattr(self, '_classes'): return self._classes self._classes = [] def recurse(cls): if hasattr(cls, '__score_es__'): self._classes.append(cls) return for c in cls.__subclasses__(): recurse(c) recurse(self.db.Base) return self._classes
[docs] def refresh(self, ctx): """ Re-inserts every object into the lucene index. Note that this operation might take a very long time, depending on the number of objects. """ def generator(): session = getattr(ctx, self.db.ctx_member) for cls in self.classes(): start = time() log.debug('indexing %s' % cls) for obj in session.query(cls).yield_per(100): body = self._object2json(obj) body['_index'] = self.index yield body log.debug('indexed %s in %fs' % (cls, time() - start)) helpers.bulk(self.es, generator())
[docs] def destroy(self): """ Completely deletes the whole index. """ self.es.indices.delete(index=self.index, ignore=404)
[docs] def create(self, destroy=True): """ Creates the elasticsearch index and registers all mappings. If the parameter *destroy* is left at its default value, the index will be :meth:`destroyed <.destroy>` first. If the index is not deleted first, this function will raise an exception if the new mapping contradicts an existing mapping in the index. """ if destroy: self.destroy() self.es.indices.create(index=self.index, ignore=400) for cls in self.classes(): key = cls.__score_db__['type_name'] mapping = {} mapping[key] = {'properties': {}} if not self.keep_source: mapping[key]['_source'] = {'enabled': False} def recurse(cls): if hasattr(cls, '__score_es__'): for member in cls.__score_es__: definition = cls.__score_es__[member].copy() definition.pop('__convert__', None) mapping[key]['properties'][member] = definition for c in cls.__subclasses__(): recurse(c) recurse(cls) mapping[key]['properties']['class'] = { 'type': 'string', 'index': 'not_analyzed'} mapping[key]['properties']['concrete_class'] = { 'type': 'string', 'index': 'not_analyzed'} self.es.indices.put_mapping( index=self.index, doc_type=key, body=mapping)
class CtxProxy: """ Wrapper for the ConfiguredEsModule, which stores a reference to a context object and passes that object transparently to the functions requiring a context. This avoids duplicating the *ctx* parameter when accessing the configuration through the context object itself. In other words: you won't need to do the following when using this class: >>> ctx.es.query(ctx, User, ... You can instead omit the second context object: >>> ctx.es.query(User, ... """ def __init__(self, conf, ctx): self._conf = conf self._ctx = ctx def __getattr__(self, attr): result = getattr(self._conf, attr) if attr in ('query', 'refresh'): result = partial(result, self._ctx) return result