diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/uvicorn/supervisors/multiprocess.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/uvicorn/supervisors/multiprocess.py | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/uvicorn/supervisors/multiprocess.py b/.venv/lib/python3.12/site-packages/uvicorn/supervisors/multiprocess.py new file mode 100644 index 00000000..153b3d65 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/uvicorn/supervisors/multiprocess.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import logging +import os +import signal +import threading +from multiprocessing.context import SpawnProcess +from socket import socket +from types import FrameType +from typing import Callable + +import click + +from uvicorn._subprocess import get_subprocess +from uvicorn.config import Config + +HANDLED_SIGNALS = ( + signal.SIGINT, # Unix signal 2. Sent by Ctrl+C. + signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`. +) + +logger = logging.getLogger("uvicorn.error") + + +class Multiprocess: + def __init__( + self, + config: Config, + target: Callable[[list[socket] | None], None], + sockets: list[socket], + ) -> None: + self.config = config + self.target = target + self.sockets = sockets + self.processes: list[SpawnProcess] = [] + self.should_exit = threading.Event() + self.pid = os.getpid() + + def signal_handler(self, sig: int, frame: FrameType | None) -> None: + """ + A signal handler that is registered with the parent process. + """ + self.should_exit.set() + + def run(self) -> None: + self.startup() + self.should_exit.wait() + self.shutdown() + + def startup(self) -> None: + message = "Started parent process [{}]".format(str(self.pid)) + color_message = "Started parent process [{}]".format( + click.style(str(self.pid), fg="cyan", bold=True) + ) + logger.info(message, extra={"color_message": color_message}) + + for sig in HANDLED_SIGNALS: + signal.signal(sig, self.signal_handler) + + for _idx in range(self.config.workers): + process = get_subprocess( + config=self.config, target=self.target, sockets=self.sockets + ) + process.start() + self.processes.append(process) + + def shutdown(self) -> None: + for process in self.processes: + process.terminate() + process.join() + + message = "Stopping parent process [{}]".format(str(self.pid)) + color_message = "Stopping parent process [{}]".format( + click.style(str(self.pid), fg="cyan", bold=True) + ) + logger.info(message, extra={"color_message": color_message}) |