Source code for score.serve.worker.asyncio

import abc
import threading
import asyncio
from .worker import Worker
import concurrent.futures

try:
    from asyncio import run_coroutine_threadsafe
except ImportError:

    def run_coroutine_threadsafe(coro, loop):
        future = concurrent.futures.Future()

        def done(task_future):
            exception = task_future.exception()
            if exception:
                future.set_exception(exception)
            else:
                future.set_result(task_future.result())

        def queue_task():
            try:
                # 'async' has become a keyword starting with
                # python 3.5, leading to parse errors for this file if
                # we write `asyncio.async`. That's why we're using
                # getattr() here.
                task_future = getattr(asyncio, 'async')(coro, loop=loop)
                task_future.add_done_callback(done)
            except Exception as exc:
                if future.set_running_or_notify_cancel():
                    future.set_exception(exc)
                raise

        loop.call_soon_threadsafe(queue_task)
        return future

try:
    from types import coroutine
except ImportError:
    from asyncio import coroutine


[docs]class AsyncioWorker(Worker): """ A specialized worker for :mod:`asyncio` servers. This base class will add a layer of abstraction to eliminate threading. Subclasses can override the functions :meth:`_prepare`, :meth:`_start`, :meth:`_pause`, :meth:`_stop` and :meth:`_cleanup`. These functions will be called inside a running event loop (which can be accessed as ``self.loop``) and can be regular functions or :term:`coroutines <coroutine>`. Example implementation: .. code-block:: python class EchoServer(AsyncioWorker): async def _start(self): self.server = yield from self.loop.create_server(myserver) def _pause(self): self.server.close() """ loop = None
[docs] def prepare(self): if self.loop is None: self.loop = asyncio.new_event_loop() event = threading.Event() threading.Thread(target=self.__start_loop, args=(event,)).start() event.wait() event.clear() future = run_coroutine_threadsafe(self.__prepare(), self.loop) future.add_done_callback(lambda future: event.set()) event.wait() exception = future.exception() if exception: raise exception
[docs] def start(self): event = threading.Event() future = run_coroutine_threadsafe(self.__start(), self.loop) future.add_done_callback(lambda future: event.set()) event.wait() exception = future.exception() if exception: raise exception
[docs] def pause(self): event = threading.Event() future = run_coroutine_threadsafe(self.__pause(), self.loop) future.add_done_callback(lambda future: event.set()) event.wait() exception = future.exception() if exception: raise exception
[docs] def stop(self): def stop_loop(future): self.loop.call_soon_threadsafe(self.__stop_loop, event) event = threading.Event() future = run_coroutine_threadsafe(self.__stop(), self.loop) future.add_done_callback(stop_loop) event.wait() exception = future.exception() if exception: raise exception
[docs] def cleanup(self, exception): if not self.loop.is_running(): return def stop_loop(future): self.loop.call_soon_threadsafe(self.__stop_loop, event) event = threading.Event() future = run_coroutine_threadsafe( self.__cleanup(exception), self.loop) future.add_done_callback(stop_loop) event.wait()
def _prepare(self): """ Equivalent of :meth:`Worker.prepare`. This function will be called inside a running event loop. """ pass @abc.abstractmethod def _start(self): """ Equivalent of :meth:`Worker.start`. This function will be called inside a running event loop. """ pass @abc.abstractmethod def _pause(self): """ Equivalent of :meth:`Worker.pause`. This function will be called inside a running event loop. """ pass def _stop(self): """ Equivalent of :meth:`Worker.stop`. This function will be called inside a running event loop. """ pass @abc.abstractmethod def _cleanup(self, exception): """ Equivalent of :meth:`Worker.cleanup`. This function will be called inside a running event loop. """ pass def __start_loop(self, event): event.set() self.loop.run_forever() @coroutine def __prepare(self): result = self._prepare() if asyncio.iscoroutine(result): result = yield from result @coroutine def __start(self): result = self._start() if asyncio.iscoroutine(result): result = yield from result @coroutine def __pause(self): result = self._pause() if asyncio.iscoroutine(result): result = yield from result @coroutine def __stop(self): result = self._stop() if asyncio.iscoroutine(result): result = yield from result @coroutine def __cleanup(self, exception): result = self._cleanup(exception) if asyncio.iscoroutine(result): result = yield from result def __stop_loop(self, event): if not self.loop.is_running(): event.set() return def stop(future=None): if hasattr(asyncio, 'all_tasks'): all_tasks = asyncio.all_tasks(loop=self.loop) else: all_tasks = asyncio.Task.all_tasks(loop=self.loop) pending_tasks = [t for t in all_tasks if not t.done()] if pending_tasks: task = pending_tasks.pop() task.add_done_callback(stop) else: self.loop.stop() event.set() stop()