From 59242df7d62d42e95217c5913dbd077d36678f82 Mon Sep 17 00:00:00 2001 From: Eli Uriegas Date: Fri, 27 Jan 2017 19:34:21 -0600 Subject: [PATCH] Move serve_multiple, fix tests (#357) * Move serve_multiple, remove stop_events, fix tests Moves serve_multiple out of the app, removes stop_event (adds a deprecation warning, but it also wasn't doing anything) fixes multiprocessing tests so that they don't freeze pytest's runner. Other notes: Also moves around some imports so that they are better optimized as well. * Re-add in stop_event, maybe it wasn't so bad! * Get rid of unused warnings import --- sanic/sanic.py | 68 +++----------------------- sanic/server.py | 67 ++++++++++++++++++++++++-- tests/test_logging.py | 14 ++++-- tests/test_multiprocessing.py | 89 +++++++---------------------------- 4 files changed, 94 insertions(+), 144 deletions(-) diff --git a/sanic/sanic.py b/sanic/sanic.py index e02f878028..1ed14b7f76 100644 --- a/sanic/sanic.py +++ b/sanic/sanic.py @@ -1,22 +1,18 @@ +import logging from asyncio import get_event_loop from collections import deque from functools import partial from inspect import isawaitable, stack, getmodulename -from multiprocessing import Process, Event -from signal import signal, SIGTERM, SIGINT from traceback import format_exc -import logging from .config import Config from .exceptions import Handler +from .exceptions import ServerError from .log import log from .response import HTTPResponse from .router import Router -from .server import serve, HttpProtocol +from .server import serve, serve_multiple, HttpProtocol from .static import register as static_register -from .exceptions import ServerError -from socket import socket, SOL_SOCKET, SO_REUSEADDR -from os import set_inheritable class Sanic: @@ -358,9 +354,7 @@ def run(self, host="127.0.0.1", port=8000, debug=False, before_start=None, if workers == 1: serve(**server_settings) else: - log.info('Spinning up {} workers...'.format(workers)) - - self.serve_multiple(server_settings, workers, stop_event) + serve_multiple(server_settings, workers, stop_event) except Exception as e: log.exception( @@ -369,13 +363,7 @@ def run(self, host="127.0.0.1", port=8000, debug=False, before_start=None, log.info("Server Stopped") def stop(self): - """ - This kills the Sanic - """ - if self.processes is not None: - for process in self.processes: - process.terminate() - self.sock.close() + """This kills the Sanic""" get_event_loop().stop() async def create_server(self, host="127.0.0.1", port=8000, debug=False, @@ -414,8 +402,7 @@ async def create_server(self, host="127.0.0.1", port=8000, debug=False, ("before_server_start", "before_start", before_start, False), ("after_server_start", "after_start", after_start, False), ("before_server_stop", "before_stop", before_stop, True), - ("after_server_stop", "after_stop", after_stop, True), - ): + ("after_server_stop", "after_stop", after_stop, True)): listeners = [] for blueprint in self.blueprints.values(): listeners += blueprint.listeners[event_name] @@ -438,46 +425,3 @@ async def create_server(self, host="127.0.0.1", port=8000, debug=False, log.info('Goin\' Fast @ {}://{}:{}'.format(proto, host, port)) return await serve(**server_settings) - - def serve_multiple(self, server_settings, workers, stop_event=None): - """ - Starts multiple server processes simultaneously. Stops on interrupt - and terminate signals, and drains connections when complete. - - :param server_settings: kw arguments to be passed to the serve function - :param workers: number of workers to launch - :param stop_event: if provided, is used as a stop signal - :return: - """ - if server_settings.get('loop', None) is not None: - log.warning("Passing a loop will be deprecated in version 0.4.0" - " https://github.com/channelcat/sanic/pull/335" - " has more information.", DeprecationWarning) - server_settings['reuse_port'] = True - - # Create a stop event to be triggered by a signal - if stop_event is None: - stop_event = Event() - signal(SIGINT, lambda s, f: stop_event.set()) - signal(SIGTERM, lambda s, f: stop_event.set()) - - self.sock = socket() - self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) - self.sock.bind((server_settings['host'], server_settings['port'])) - set_inheritable(self.sock.fileno(), True) - server_settings['sock'] = self.sock - server_settings['host'] = None - server_settings['port'] = None - - self.processes = [] - for _ in range(workers): - process = Process(target=serve, kwargs=server_settings) - process.daemon = True - process.start() - self.processes.append(process) - - for process in self.processes: - process.join() - - # the above processes will block this until they're stopped - self.stop() diff --git a/sanic/server.py b/sanic/server.py index 1b19c78445..dc3acc4224 100644 --- a/sanic/server.py +++ b/sanic/server.py @@ -1,11 +1,18 @@ import asyncio +import os import traceback from functools import partial from inspect import isawaitable -from signal import SIGINT, SIGTERM +from multiprocessing import Process, Event +from os import set_inheritable +from signal import SIGTERM, SIGINT +from signal import signal as signal_func +from socket import socket, SOL_SOCKET, SO_REUSEADDR from time import time + from httptools import HttpRequestParser from httptools.parser.errors import HttpParserError + from .exceptions import ServerError try: @@ -17,7 +24,6 @@ from .request import Request from .exceptions import RequestTimeout, PayloadTooLarge, InvalidUsage - current_time = None @@ -31,6 +37,7 @@ class CIDict(dict): This does not maintain the inputted case when calling items() or keys() in favor of speed, since headers are case insensitive """ + def get(self, key, default=None): return super().get(key.casefold(), default) @@ -56,7 +63,7 @@ class HttpProtocol(asyncio.Protocol): '_total_request_size', '_timeout_handler', '_last_communication_time') def __init__(self, *, loop, request_handler, error_handler, - signal=Signal(), connections={}, request_timeout=60, + signal=Signal(), connections=set(), request_timeout=60, request_max_size=None): self.loop = loop self.transport = None @@ -328,7 +335,7 @@ def serve(host, port, request_handler, error_handler, before_start=None, try: http_server = loop.run_until_complete(server_coroutine) - except Exception: + except: log.exception("Unable to start server") return @@ -339,10 +346,12 @@ def serve(host, port, request_handler, error_handler, before_start=None, for _signal in (SIGINT, SIGTERM): loop.add_signal_handler(_signal, loop.stop) + pid = os.getpid() try: + log.info('Starting worker [{}]'.format(pid)) loop.run_forever() finally: - log.info("Stop requested, draining connections...") + log.info("Stopping worker [{}]".format(pid)) # Run the on_stop function if provided trigger_events(before_stop, loop) @@ -362,3 +371,51 @@ def serve(host, port, request_handler, error_handler, before_start=None, trigger_events(after_stop, loop) loop.close() + + +def serve_multiple(server_settings, workers, stop_event=None): + """ + Starts multiple server processes simultaneously. Stops on interrupt + and terminate signals, and drains connections when complete. + + :param server_settings: kw arguments to be passed to the serve function + :param workers: number of workers to launch + :param stop_event: if provided, is used as a stop signal + :return: + """ + if server_settings.get('loop', None) is not None: + log.warning("Passing a loop will be deprecated in version 0.4.0" + " https://github.com/channelcat/sanic/pull/335" + " has more information.", DeprecationWarning) + server_settings['reuse_port'] = True + + sock = socket() + sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + sock.bind((server_settings['host'], server_settings['port'])) + set_inheritable(sock.fileno(), True) + server_settings['sock'] = sock + server_settings['host'] = None + server_settings['port'] = None + + if stop_event is None: + stop_event = Event() + + signal_func(SIGINT, lambda s, f: stop_event.set()) + signal_func(SIGTERM, lambda s, f: stop_event.set()) + + processes = [] + for _ in range(workers): + process = Process(target=serve, kwargs=server_settings) + process.daemon = True + process.start() + processes.append(process) + + for process in processes: + process.join() + + # the above processes will block this until they're stopped + for process in processes: + process.terminate() + sock.close() + + asyncio.get_event_loop().stop() diff --git a/tests/test_logging.py b/tests/test_logging.py index b3e3c1fcd4..ec326276d8 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -1,4 +1,5 @@ import asyncio +import uuid from sanic.response import text from sanic import Sanic from io import StringIO @@ -9,10 +10,11 @@ function: %(funcName)s(); \ message: %(message)s''' + def test_log(): log_stream = StringIO() for handler in logging.root.handlers[:]: - logging.root.removeHandler(handler) + logging.root.removeHandler(handler) logging.basicConfig( format=logging_format, level=logging.DEBUG, @@ -20,14 +22,16 @@ def test_log(): ) log = logging.getLogger() app = Sanic('test_logging') + rand_string = str(uuid.uuid4()) + @app.route('/') def handler(request): - log.info('hello world') + log.info(rand_string) return text('hello') request, response = sanic_endpoint_test(app) - log_text = log_stream.getvalue().strip().split('\n')[-3] - assert log_text == "module: test_logging; function: handler(); message: hello world" + log_text = log_stream.getvalue() + assert rand_string in log_text -if __name__ =="__main__": +if __name__ == "__main__": test_log() diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index e39c3d24f2..7f68c1b6ce 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -1,81 +1,26 @@ -from multiprocessing import Array, Event, Process -from time import sleep, time -from ujson import loads as json_loads - -import pytest +import multiprocessing +import random +import signal from sanic import Sanic -from sanic.response import json -from sanic.utils import local_request, HOST, PORT - +from sanic.utils import HOST, PORT -# ------------------------------------------------------------ # -# GET -# ------------------------------------------------------------ # -# TODO: Figure out why this freezes on pytest but not when -# executed via interpreter -@pytest.mark.skip( - reason="Freezes with pytest not on interpreter") def test_multiprocessing(): - app = Sanic('test_json') - - response = Array('c', 50) - @app.route('/') - async def handler(request): - return json({"test": True}) - - stop_event = Event() - async def after_start(*args, **kwargs): - http_response = await local_request('get', '/') - response.value = http_response.text.encode() - stop_event.set() - - def rescue_crew(): - sleep(5) - stop_event.set() - - rescue_process = Process(target=rescue_crew) - rescue_process.start() - - app.serve_multiple({ - 'host': HOST, - 'port': PORT, - 'after_start': after_start, - 'request_handler': app.handle_request, - 'request_max_size': 100000, - }, workers=2, stop_event=stop_event) - - rescue_process.terminate() - - try: - results = json_loads(response.value) - except: - raise ValueError("Expected JSON response but got '{}'".format(response)) - - assert results.get('test') == True - -@pytest.mark.skip( - reason="Freezes with pytest not on interpreter") -def test_drain_connections(): - app = Sanic('test_json') + """Tests that the number of children we produce is correct""" + # Selects a number at random so we can spot check + num_workers = random.choice(range(2, multiprocessing.cpu_count() * 2 + 1)) + app = Sanic('test_multiprocessing') + process_list = set() - @app.route('/') - async def handler(request): - return json({"test": True}) + def stop_on_alarm(*args): + for process in multiprocessing.active_children(): + process_list.add(process.pid) + process.terminate() - stop_event = Event() - async def after_start(*args, **kwargs): - http_response = await local_request('get', '/') - stop_event.set() + signal.signal(signal.SIGALRM, stop_on_alarm) + signal.alarm(1) + app.run(HOST, PORT, workers=num_workers) - start = time() - app.serve_multiple({ - 'host': HOST, - 'port': PORT, - 'after_start': after_start, - 'request_handler': app.handle_request, - }, workers=2, stop_event=stop_event) - end = time() + assert len(process_list) == num_workers - assert end - start < 0.05