Source code for score.serve.worker.simple

import abc
import threading

from .worker import Worker


[docs]class SimpleWorker(Worker): """ A simplified worker base class, that hides all the ugly threading logic. You can subclass this and implement a ``loop`` function that periodically checks this.running: .. code-block:: python class Spammer(SimpleWorker): def loop(): while self.running: print('spam!') time.sleep(1) """ def prepare(self): self.__lock = threading.Lock() self.__thread = None def start(self): self.__thread = threading.Thread(target=self.__loop).start() def pause(self): with self.__lock: self.__running = False if self.__thread: self.__thread.join() self.__thread = None def stop(self): self.pause() def __loop(self): with self.__lock: self.__running = True try: self.loop() except Exception as e: self.service.set_exception(e) def cleanup(self, exception): pass @property def running(self): with self.__lock: return self.__running @abc.abstractmethod def loop(self): pass