Source code for score.serve.service

# Copyright © 2015-2018 STRG.AT GmbH, Vienna, Austria
# Copyright © 2020-2023 Necdet Can Ateşman, Vienna, Austria
#
# This file is part of the The SCORE Framework.
#
# The SCORE Framework and all its parts are free software: you can redistribute
# them and/or modify them under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation which is in
# the file named COPYING.LESSER.txt.
#
# The SCORE Framework and all its parts are distributed without any WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. For more details see the GNU Lesser General Public
# License.
#
# If you have not received a copy of the GNU Lesser General Public License see
# http://www.gnu.org/licenses/.
#
# The License-Agreement realised between you as Licensee and STRG.AT GmbH as
# Licenser including the issue of its valid conclusion and its pre- and
# post-contractual effects is governed by the laws of Austria. Any disputes
# concerning this License-Agreement including the issue of its valid conclusion
# and its pre- and post-contractual effects are exclusively decided by the
# competent court, in whose district STRG.AT GmbH has its registered seat, at
# the discretion of STRG.AT GmbH also the competent court, in whose district
# the Licensee has his registered seat, an establishment or assets.

import enum
import time
import threading
import logging


log = logging.getLogger(__name__)


@enum.unique
class ServiceState(str, enum.Enum):
    STOPPED = 'stopped'
    STARTING = 'starting'
    RUNNING = 'running'
    PAUSING = 'pausing'
    PREPARING = 'preparing'
    PAUSED = 'paused'
    STOPPING = 'stopping'
    EXCEPTION = 'exception'


STOPPED = ServiceState.STOPPED
STARTING = ServiceState.STARTING
RUNNING = ServiceState.RUNNING
PAUSING = ServiceState.PAUSING
PREPARING = ServiceState.PREPARING
PAUSED = ServiceState.PAUSED
STOPPING = ServiceState.STOPPING
EXCEPTION = ServiceState.EXCEPTION


intermediate_states = {
    RUNNING: PAUSED,
    STOPPED: PAUSED,
}


[docs]class Service: """ A wrapper around workers, that you can use to control your workers without worrying about threading. """ State = globals()['ServiceState'] def __init__(self, name, worker): self.name = name self.worker = worker self.exception = None self.state_listeners = set() self.next_transition = None self.state_lock = threading.RLock() self._target_state = None self._next_state = None self._state = STOPPED self._transition = None self.state_timestamp = time.time() worker.service = self
[docs] def start(self): """ Makes sure the worker ends up in the ``RUNNING`` state eventually. """ self._transition_to(RUNNING)
[docs] def pause(self): """ Makes sure the worker ends up in the ``PAUSED`` state eventually. """ self._transition_to(PAUSED)
[docs] def prepare(self): """ An alias for :meth:`pause` for ensuring compaitibility with the :class:`Worker` API. """ self._transition_to(PAUSED)
[docs] def stop(self): """ Makes sure the worker ends up in the ``STOPPED`` state eventually. """ self._transition_to(STOPPED)
[docs] def register_state_change_listener(self, callback): """ Registers a `callable` that will be invoked whenever the state of the worker changes. The *callback* will receive three arguments: * this service, * the old state and * the new (current) state. Note, that due to the nature of threading, it is possible that the Service is already in another state than the one provided as the third argument. """ self.state_listeners.add(callback)
[docs] def unregister_state_change_listener(self, callback): """ Removes a previously registered listener. """ self.state_listeners.discard(callback)
def _transition_to(self, target_state): with self.state_lock: if self._state == EXCEPTION: log.debug('_transition_to(%s) -> EXCEPTION' % target_state) return if self._state == target_state: self._target_state = None self._next_state = None log.debug('_transition_to(%s) -> NOP' % target_state) return if (self._target_state == target_state and self._transition and self._transition[1] == target_state): # already transitioning to give state self._target_state = None self._next_state = None log.debug('_transition_to(%s) -> in progress' % target_state) return transition = (self._state, target_state) if transition in self.worker._state_transitions: log.debug('_transition_to(%s) -> transition initiated' % target_state) self._target_state = None self._next_state = None if target_state == RUNNING: self.state = STARTING elif target_state == STOPPED: self.state = STOPPING elif target_state == PAUSED and self.state == STOPPED: self.state = PREPARING elif target_state == PAUSED: self.state = PAUSING self._target_state = target_state funcname = self.worker._state_transitions[transition] callback = getattr(self.worker, funcname) self._transition = transition threading.Thread(target=self._execute_transition, args=(transition, callback)).start() return if target_state in intermediate_states: log.debug('_transition_to(%s) -> intermediate(%s)' % ( target_state, intermediate_states[target_state])) self._transition_to(intermediate_states[target_state]) self._next_state = target_state else: log.debug('_transition_to(%s) -> queued' % target_state) self._next_state = target_state def _execute_transition(self, transition, callback): log.debug('_execute_transition(%s, %s)' % (str(transition), callback.__name__)) state_timestamp = self.state_timestamp try: callback() with self.state_lock: if self._transition == transition: self._transition = None if state_timestamp >= self.state_timestamp: self.state = transition[1] except Exception as exception: self.set_exception(exception) def set_exception(self, exception): with self.state_lock: if self._state == EXCEPTION: return old_state = self._state self._state = EXCEPTION self.exception = exception self.state_timestamp = time.time() self.worker.cleanup(exception) self._state_changed(old_state, EXCEPTION) def _state_changed(self, old, new): log.debug('changed state: %s -> %s' % (old, new)) if new == EXCEPTION: log.exception(self.exception) for callback in self.state_listeners: callback(self, old, new) for callback in self.worker.state_listeners: callback(self, old, new) with self.state_lock: if self._next_state: log.debug(' next state: %s' % (self._next_state)) next_state = self._next_state self._next_state = None self._transition_to(next_state) elif self._target_state and self._state != self._target_state: log.debug(' target state: %s' % (self._target_state)) self._transition_to(self._target_state) @property def state(self): return self._state @state.setter def state(self, new_state): with self.state_lock: if new_state == self._state or self._state == EXCEPTION: return old_state = self._state self._state = new_state self.state_timestamp = time.time() self._state_changed(old_state, new_state)