Source code for score.serve.worker.worker

import abc
from ..service import Service


STOPPED = Service.State.STOPPED
STARTING = Service.State.STARTING
RUNNING = Service.State.RUNNING
PAUSING = Service.State.PAUSING
PREPARING = Service.State.PREPARING
PAUSED = Service.State.PAUSED
STOPPING = Service.State.STOPPING
EXCEPTION = Service.State.EXCEPTION


final_states = {
    'start': RUNNING,
    'stop': STOPPED,
    'pause': PAUSED,
    'prepare': PAUSED,
}

invalid_transitions = {
    (STARTING, RUNNING),
    (STOPPING, STOPPED),
    (PAUSING, PAUSED),
    (PREPARING, PAUSED),
}


[docs]def transitions(state1, state2=None): """ This annotation can add additional transitions to a class. If your worker is capable of going from RUNNING to STOPPED, for example, you can add the additional transition to a new class method: .. code-block:: python class MyWorker(Worker): @transitions(Service.State.RUNNING, Service.State.STOPPED) def kill(): # ... """ def wrapper(func): nonlocal state1, state2 if not hasattr(func, 'transitions'): func.transitions = set() name = func.__name__ if not state2: try: state2 = final_states[name] except KeyError: raise Exception( 'Function %s() has no end state for transition from %s' % (name, state1)) elif name in final_states and final_states[name] != state2: raise Exception('Function %s() must transition to %s' % (name, final_states[name])) for transition in func.transitions: if state2 != transition[1]: raise Exception( 'Function %s() cannot traansition to both %s and %s' % (name, state2, transition[1])) func.transitions.add((state1, state2)) return func if state1 == state2 or (state1, state2) in invalid_transitions: raise ValueError('Invalid transition: %s' % str((state1, state2))) return wrapper
class WorkerMeta(abc.ABCMeta): def __init__(cls, name, parents, members): transitions = {} for name in members: func = members[name] if not callable(func): continue if not hasattr(func, 'transitions'): continue for transition in func.transitions: if transition in transitions: raise Exception( 'Transition %s already registered as function %s' % (str(transition), transitions[transition])) transitions[transition] = name if not parents: tmp = { (STOPPED, PAUSED): 'prepare', (PAUSED, RUNNING): 'start', (RUNNING, PAUSED): 'pause', (PAUSED, STOPPED): 'stop', } tmp.update(transitions) transitions = tmp if transitions: for parent in parents: if hasattr(parent, '_state_transitions'): tmp = parent._state_transitions.copy() tmp.update(transitions) transitions = tmp break members['_state_transitions'] = transitions cls._state_transitions = transitions abc.ABCMeta.__init__(cls, name, parents, members)
[docs]class Worker(metaclass=WorkerMeta): """ The implementation of a single service. The worker will be wrapped in :class:`Service` objects before being started. """ state_listeners = set() @property def state(self): return self.service.state
[docs] @abc.abstractmethod def prepare(self): """ Implements the transition from STOPPED to PAUSED. """
[docs] @abc.abstractmethod def start(self): """ Implements the transition from PAUSED to RUNNING. """
[docs] @abc.abstractmethod def stop(self): """ Implements the transition from PAUSED to STOPPED. """
[docs] @abc.abstractmethod def pause(self): """ Implements the transition from RUNNING to PAUSED. """
[docs] @abc.abstractmethod def cleanup(self, exception): """ Called when an exception occured. Due to the nature of threading, it is not entirely clear, in which state the worker was, when this specific exception occurred. """
[docs] def register_state_change_listener(self, callback): """ Registers a `callable` that will be invoked whenever the state of this worker changes. The *callback* will receive three arguments: * a :class:`Service <score.serve.service.Service>` wrapping this worker, * the old :class:`state <score.serve.service.State>` and * the new (current) state. Note, that due to the nature of threading, it is possible that the Service is already in another state than the one provided as the third argument. """ if not self.state_listeners: self.state_listeners = set() self.state_listeners.add(callback)
[docs] def unregister_state_change_listener(self, callback): """ Removes a previously registered listener. """ self.state_listeners.discard(callback)