Source code for score.asyncio._init

# Copyright © 2017,2018 STRG.AT GmbH, Vienna, Austria
# Copyright © 2019-2023 Necdet Can Ateşman, Vienna, Austria
#
# This file is part of the The SCORE Framework.
#
# The SCORE Framework and all its parts are free software: you can redistribute
# them and/or modify them under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation which is in the
# file named COPYING.LESSER.txt.
#
# The SCORE Framework and all its parts are distributed without any WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. For more details see the GNU Lesser General Public
# License.
#
# If you have not received a copy of the GNU Lesser General Public License see
# http://www.gnu.org/licenses/.
#
# The License-Agreement realised between you as Licensee and STRG.AT GmbH as
# Licenser including the issue of its valid conclusion and its pre- and
# post-contractual effects is governed by the laws of Austria. Any disputes
# concerning this License-Agreement including the issue of its valid conclusion
# and its pre- and post-contractual effects are exclusively decided by the
# competent court, in whose district STRG.AT GmbH has its registered seat, at
# the discretion of STRG.AT GmbH also the competent court, in whose district the
# Licensee has his registered seat, an establishment or assets.

from score.init import (
    ConfiguredModule, InitializationError, parse_bool, parse_time_interval)
import asyncio
import warnings
import threading
import time


defaults = {
    "backend": "builtin",
    "use_global_loop": False,
    "stop_timeout": None,
}


[docs]def init(confdict): """ Initializes this module according to the :ref:`SCORE module initialization guidelines <module_initialization>` with the following configuration keys: :confkey:`backend` :confdefault:`"builtin"` The library to use for creating the event loop. Current valid values are ``uvloop`` and ``builtin``. :confkey:`use_global_loop` :confdefault:`False` Whether the global loop object should be used. The "global" loop is the one returned by :func:`asyncio.get_event_loop()`. :confkey:`stop_timeout` :confdefault:`None` Defines how long the module will wait for all tasks running in the loop to finish when stopping the loop. The value will be interpreted through a call to :func:`score.init.parse_time_interval`. The default value `None` indicates that the module will wait indefinitely. If you want to the loop to terminate immediately, without waiting for tasks at all, you must pass "0". """ conf = defaults.copy() conf.update(confdict) use_global_loop = parse_bool(conf['use_global_loop']) stop_timeout = conf['stop_timeout'] if stop_timeout == 'None': stop_timeout = None if stop_timeout is not None: stop_timeout = parse_time_interval(stop_timeout) if conf['backend'] == 'uvloop': import uvloop if use_global_loop: warnings.warn( 'Ignoring value of "use_global_loop" when using uvloop backend') loop = uvloop.new_event_loop() elif conf['backend'] == 'builtin': if use_global_loop: loop = asyncio.get_event_loop() else: loop = asyncio.new_event_loop() else: import score.asyncio raise InitializationError( score.asyncio, 'Invalid value for "backend": ' + conf['backend']) return ConfiguredAsyncioModule( conf['backend'], use_global_loop, stop_timeout, loop)
[docs]class ConfiguredAsyncioModule(ConfiguredModule): """ This module's :class:`configuration class <score.init.ConfiguredModule>`. """ def __init__(self, backend, use_global_loop, stop_timeout, loop): super().__init__("score.asyncio") self.backend = backend self.use_global_loop = use_global_loop self.stop_timeout = stop_timeout self.loop = loop self.loop_tokens = [] self.loop_lock = threading.RLock() def __del__(self): """ Stops the loop, if it is still running. Will also :meth:`close() <asyncio.AbstractEventLoop.close>` the loop if it is not the global event loop. """ if self.loop.is_running(): event = threading.Event() self.loop.call_soon_threadsafe(self.__stop_loop, event) event.wait() if not self.use_global_loop: self.loop.close()
[docs] def await_(self, coroutine): """ Blocks until given *coroutine* is finished and returns the result (or raises the exception). This method will acquire a :term:`loop token`, schedule the coroutine for execution in the configured event loop, await its termination and return the result (or raise the exception). This is very similar to the builtin method :meth:`asyncio.AbstractEventLoop.run_until_complete`, but will work when different clients try to execute a coroutine simultanously, regardless of the current loop state: >>> import asyncio >>> @asyncio.coroutine ... def foo(): ... return 1 ... >>> @asyncio.coroutine ... def bar(): ... return 1 / 0 ... >>> foo() <generator object foo at 0x7fea86b20e08> >>> score.asyncio.await_(foo()) 1 >>> score.asyncio.await_.(bar()) Traceback (most recent call last): File "<console>", line 1, in <module> File "/home/can/Projects/score/py.asyncio/score/asyncio/_init.py", line 107, in await_ return self.loop.run_until_complete(coroutine) File "/usr/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete return future.result() File "/usr/lib/python3.6/asyncio/coroutines.py", line 210, in coro res = func(*args, **kw) File "<console>", line 3, in bar ZeroDivisionError: division by zero """ with self.start_loop(): result = None exception = None condition = threading.Condition() finished = False def resolve(future): nonlocal exception, result, finished exception = future.exception() if exception is None: result = future.result() with condition: finished = True condition.notify() def create_task(): task = self.loop.create_task(coroutine) task.add_done_callback(resolve) self.loop.call_soon_threadsafe(create_task) with condition: condition.wait_for(lambda: finished) if exception: raise exception return result
[docs] def await_multiple(self, coroutines): """ Just like :meth:`await_`, but awaits the completion of multiple *coroutines*. The return value is different though: the method will provide a list of 2-tuples, where the first value is a *bool* indicating successful execution of the coroutine and the second value is the exception itself or the return value. Example with two coroutines, the first successfully returning ``1``, while the other raising a `ZeroDivisionError`: .. code-block:: python [ (True, 1), (False, ZeroDivisionError('division by zero',)), ] """ results = [] for coroutine in coroutines: # TODO: This code is executing the coroutines sequentially. # It should rather run them inside the same loop. try: result = self.await_(coroutine) except Exception as e: results.append((False, e)) else: results.append((True, result)) return results
[docs] def start_loop(self): """ Makes sure the configured :attr:`loop` is running. Will possibly start the loop in a different thread and return a :term:`loop token`. This method is thread-safe. See :ref:`asyncio_start_loop` for usage details. """ with self.loop_lock: token = LoopToken(self) self.loop_tokens.append(token) if not self.loop.is_running(): self.loop_thread = threading.Thread( target=self.loop.run_forever) self.loop_thread.start() return token
[docs] def release_loop(self, token): """ Releases a previously acquired *token*. See :ref:`asyncio_start_loop` for usage details. """ with self.loop_lock: try: self.loop_tokens.remove(token) except KeyError: return token.held = False if not self.loop_tokens and self.loop.is_running(): event = threading.Event() self.loop.call_soon_threadsafe(self.__stop_loop, event) if not self.loop_tokens: self.loop_thread.join() event.wait()
def __stop_loop(self, event): def stop(future=None): if not self.loop.is_running() or self.loop_tokens: event.set() return pending_tasks = [t for t in asyncio.Task.all_tasks(self.loop) if not t.done()] if not pending_tasks or self.stop_timeout == 0: self.loop.stop() event.set() return all_done = asyncio.shield(asyncio.wait( pending_tasks, loop=self.loop), loop=self.loop) if self.stop_timeout is None: wait_task = self.loop.create_task(all_done) wait_task.add_done_callback(stop) else: timeout = self.stop_timeout - (time.time() - stop_time) if timeout <= 0: self.loop.stop() event.set() wait_task = self.loop.create_task(asyncio.wait_for( all_done, timeout, loop=self.loop)) wait_task.add_done_callback(stop) stop_time = time.time() stop()
[docs]class LoopToken: """ A :term:`token <loop token>` provided by the configured score.asyncio module. The configured asyncio will keep running as long as this token is held. Use :meth:`release` to indicate that you're done using the loop. """ def __init__(self, conf): self.conf = conf self.held = True def __del__(self): if self.held: self.conf.release_loop(self) def __enter__(self): pass def __exit__(self, exc_type, exc_value, traceback): self.conf.release_loop(self)
[docs] def release(self): """ Releases this token. See :ref:`asyncio_start_loop` for details. """ self.conf.release_loop(self)