Source code for score.serve.worker.socketserver

import abc
import functools
import select
import socket
import socketserver
import threading

from .worker import Worker, transitions
from ..service import Service


[docs]class SocketServerWorker(Worker): """ A specialized worker for handling :mod:`socketserver` objects. You only need to implement the function ``_mkserver`` in subclasses. That function must return a :class:`socketserver.BaseServer` instance. The Worker will then perform the equivalent of calling its :meth:`serve_forever <socketserver.BaseServer.serve_forever>` method. """ final_states = ( Service.State.STOPPED, Service.State.STOPPING, Service.State.EXCEPTION) running_states = ( Service.State.STARTING, Service.State.RUNNING) def __init__(self): self.__server = None self.__num_running = 0 self.__request_lock = threading.Condition() def prepare(self): self.__intr_pair = socket.socketpair() server = self._mkserver() assert isinstance(server, socketserver.BaseServer) self.__server = server original_shutdown = server.shutdown_request @functools.wraps(server.shutdown_request) def shutdown_request(*args, **kwargs): original_shutdown(*args, **kwargs) with self.__request_lock: self.__num_running -= 1 self.__request_lock.notify() server.shutdown_request = shutdown_request threading.Thread(target=self._loop).start() def start(self): self._interrupt_loop() @transitions(Service.State.PREPARING) @transitions(Service.State.RUNNING) def stop(self): self._interrupt_loop() def pause(self): self._interrupt_loop() def cleanup(self, exception): if self.__server: try: self.__server.server_close() self.__server = None except: pass def _loop(self): if not self.__intr_pair: return while self.state not in self.final_states: try: with self.__request_lock: if self.state in self.running_states: sockets = (self.__server.socket, self.__intr_pair[0],) else: sockets = (self.__intr_pair[0],) r, w, e = select.select(sockets, [], []) if self.__intr_pair[0] in r: self.__intr_pair[0].recv(2**10) continue except InterruptedError: continue if self.__server.socket in r: self._process_request() self.__server.server_close() self.__server = None def _process_request(self): server = self.__server with self.__request_lock: if self.state in self.final_states: # we're stopping, abort operation return request, client_address = server.get_request() self.__num_running += 1 if not server.verify_request(request, client_address): return try: server.process_request(request, client_address) except: server.handle_error(request, client_address) server.shutdown_request(request) def _interrupt_loop(self): with self.__request_lock: if not self.__intr_pair: return self.__intr_pair[1].send(b'0') while self.__num_running: self.__request_lock.wait() @abc.abstractmethod def _mkserver(self): pass