diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/gunicorn/workers')
9 files changed, 1620 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/__init__.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/__init__.py new file mode 100644 index 00000000..ae753e1c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +# supported gunicorn workers. +SUPPORTED_WORKERS = { + "sync": "gunicorn.workers.sync.SyncWorker", + "eventlet": "gunicorn.workers.geventlet.EventletWorker", + "gevent": "gunicorn.workers.ggevent.GeventWorker", + "gevent_wsgi": "gunicorn.workers.ggevent.GeventPyWSGIWorker", + "gevent_pywsgi": "gunicorn.workers.ggevent.GeventPyWSGIWorker", + "tornado": "gunicorn.workers.gtornado.TornadoWorker", + "gthread": "gunicorn.workers.gthread.ThreadWorker", +} diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/base.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/base.py new file mode 100644 index 00000000..f321dd2d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/base.py @@ -0,0 +1,275 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import io +import os +import signal +import sys +import time +import traceback +from datetime import datetime +from random import randint +from ssl import SSLError + +from gunicorn import util +from gunicorn.http.errors import ( + ForbiddenProxyRequest, InvalidHeader, + InvalidHeaderName, InvalidHTTPVersion, + InvalidProxyLine, InvalidRequestLine, + InvalidRequestMethod, InvalidSchemeHeaders, + LimitRequestHeaders, LimitRequestLine, +) +from gunicorn.http.wsgi import Response, default_environ +from gunicorn.reloader import reloader_engines +from gunicorn.workers.workertmp import WorkerTmp + + +class Worker(object): + + SIGNALS = [getattr(signal, "SIG%s" % x) for x in ( + "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split() + )] + + PIPE = [] + + def __init__(self, age, ppid, sockets, app, timeout, cfg, log): + """\ + This is called pre-fork so it shouldn't do anything to the + current process. If there's a need to make process wide + changes you'll want to do that in ``self.init_process()``. + """ + self.age = age + self.pid = "[booting]" + self.ppid = ppid + self.sockets = sockets + self.app = app + self.timeout = timeout + self.cfg = cfg + self.booted = False + self.aborted = False + self.reloader = None + + self.nr = 0 + + if cfg.max_requests > 0: + jitter = randint(0, cfg.max_requests_jitter) + self.max_requests = cfg.max_requests + jitter + else: + self.max_requests = sys.maxsize + + self.alive = True + self.log = log + self.tmp = WorkerTmp(cfg) + + def __str__(self): + return "<Worker %s>" % self.pid + + def notify(self): + """\ + Your worker subclass must arrange to have this method called + once every ``self.timeout`` seconds. If you fail in accomplishing + this task, the master process will murder your workers. + """ + self.tmp.notify() + + def run(self): + """\ + This is the mainloop of a worker process. You should override + this method in a subclass to provide the intended behaviour + for your particular evil schemes. + """ + raise NotImplementedError() + + def init_process(self): + """\ + If you override this method in a subclass, the last statement + in the function should be to call this method with + super().init_process() so that the ``run()`` loop is initiated. + """ + + # set environment' variables + if self.cfg.env: + for k, v in self.cfg.env.items(): + os.environ[k] = v + + util.set_owner_process(self.cfg.uid, self.cfg.gid, + initgroups=self.cfg.initgroups) + + # Reseed the random number generator + util.seed() + + # For waking ourselves up + self.PIPE = os.pipe() + for p in self.PIPE: + util.set_non_blocking(p) + util.close_on_exec(p) + + # Prevent fd inheritance + for s in self.sockets: + util.close_on_exec(s) + util.close_on_exec(self.tmp.fileno()) + + self.wait_fds = self.sockets + [self.PIPE[0]] + + self.log.close_on_exec() + + self.init_signals() + + # start the reloader + if self.cfg.reload: + def changed(fname): + self.log.info("Worker reloading: %s modified", fname) + self.alive = False + os.write(self.PIPE[1], b"1") + self.cfg.worker_int(self) + time.sleep(0.1) + sys.exit(0) + + reloader_cls = reloader_engines[self.cfg.reload_engine] + self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files, + callback=changed) + + self.load_wsgi() + if self.reloader: + self.reloader.start() + + self.cfg.post_worker_init(self) + + # Enter main run loop + self.booted = True + self.run() + + def load_wsgi(self): + try: + self.wsgi = self.app.wsgi() + except SyntaxError as e: + if not self.cfg.reload: + raise + + self.log.exception(e) + + # fix from PR #1228 + # storing the traceback into exc_tb will create a circular reference. + # per https://docs.python.org/2/library/sys.html#sys.exc_info warning, + # delete the traceback after use. + try: + _, exc_val, exc_tb = sys.exc_info() + self.reloader.add_extra_file(exc_val.filename) + + tb_string = io.StringIO() + traceback.print_tb(exc_tb, file=tb_string) + self.wsgi = util.make_fail_app(tb_string.getvalue()) + finally: + del exc_tb + + def init_signals(self): + # reset signaling + for s in self.SIGNALS: + signal.signal(s, signal.SIG_DFL) + # init new signaling + signal.signal(signal.SIGQUIT, self.handle_quit) + signal.signal(signal.SIGTERM, self.handle_exit) + signal.signal(signal.SIGINT, self.handle_quit) + signal.signal(signal.SIGWINCH, self.handle_winch) + signal.signal(signal.SIGUSR1, self.handle_usr1) + signal.signal(signal.SIGABRT, self.handle_abort) + + # Don't let SIGTERM and SIGUSR1 disturb active requests + # by interrupting system calls + signal.siginterrupt(signal.SIGTERM, False) + signal.siginterrupt(signal.SIGUSR1, False) + + if hasattr(signal, 'set_wakeup_fd'): + signal.set_wakeup_fd(self.PIPE[1]) + + def handle_usr1(self, sig, frame): + self.log.reopen_files() + + def handle_exit(self, sig, frame): + self.alive = False + + def handle_quit(self, sig, frame): + self.alive = False + # worker_int callback + self.cfg.worker_int(self) + time.sleep(0.1) + sys.exit(0) + + def handle_abort(self, sig, frame): + self.alive = False + self.cfg.worker_abort(self) + sys.exit(1) + + def handle_error(self, req, client, addr, exc): + request_start = datetime.now() + addr = addr or ('', -1) # unix socket case + if isinstance(exc, ( + InvalidRequestLine, InvalidRequestMethod, + InvalidHTTPVersion, InvalidHeader, InvalidHeaderName, + LimitRequestLine, LimitRequestHeaders, + InvalidProxyLine, ForbiddenProxyRequest, + InvalidSchemeHeaders, + SSLError, + )): + + status_int = 400 + reason = "Bad Request" + + if isinstance(exc, InvalidRequestLine): + mesg = "Invalid Request Line '%s'" % str(exc) + elif isinstance(exc, InvalidRequestMethod): + mesg = "Invalid Method '%s'" % str(exc) + elif isinstance(exc, InvalidHTTPVersion): + mesg = "Invalid HTTP Version '%s'" % str(exc) + elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)): + mesg = "%s" % str(exc) + if not req and hasattr(exc, "req"): + req = exc.req # for access log + elif isinstance(exc, LimitRequestLine): + mesg = "%s" % str(exc) + elif isinstance(exc, LimitRequestHeaders): + reason = "Request Header Fields Too Large" + mesg = "Error parsing headers: '%s'" % str(exc) + status_int = 431 + elif isinstance(exc, InvalidProxyLine): + mesg = "'%s'" % str(exc) + elif isinstance(exc, ForbiddenProxyRequest): + reason = "Forbidden" + mesg = "Request forbidden" + status_int = 403 + elif isinstance(exc, InvalidSchemeHeaders): + mesg = "%s" % str(exc) + elif isinstance(exc, SSLError): + reason = "Forbidden" + mesg = "'%s'" % str(exc) + status_int = 403 + + msg = "Invalid request from ip={ip}: {error}" + self.log.warning(msg.format(ip=addr[0], error=str(exc))) + else: + if hasattr(req, "uri"): + self.log.exception("Error handling request %s", req.uri) + status_int = 500 + reason = "Internal Server Error" + mesg = "" + + if req is not None: + request_time = datetime.now() - request_start + environ = default_environ(req, client, self.cfg) + environ['REMOTE_ADDR'] = addr[0] + environ['REMOTE_PORT'] = str(addr[1]) + resp = Response(req, client, self.cfg) + resp.status = "%s %s" % (status_int, reason) + resp.response_length = len(mesg) + self.log.access(resp, req, environ, request_time) + + try: + util.write_error(client, status_int, reason, mesg) + except Exception: + self.log.debug("Failed to send error message.") + + def handle_winch(self, sig, fname): + # Ignore SIGWINCH in worker. Fixes a crash on OpenBSD. + self.log.debug("worker: SIGWINCH ignored.") diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/base_async.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/base_async.py new file mode 100644 index 00000000..b059a7cb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/base_async.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +from datetime import datetime +import errno +import socket +import ssl +import sys + +from gunicorn import http +from gunicorn.http import wsgi +from gunicorn import util +from gunicorn.workers import base + +ALREADY_HANDLED = object() + + +class AsyncWorker(base.Worker): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.worker_connections = self.cfg.worker_connections + + def timeout_ctx(self): + raise NotImplementedError() + + def is_already_handled(self, respiter): + # some workers will need to overload this function to raise a StopIteration + return respiter == ALREADY_HANDLED + + def handle(self, listener, client, addr): + req = None + try: + parser = http.RequestParser(self.cfg, client, addr) + try: + listener_name = listener.getsockname() + if not self.cfg.keepalive: + req = next(parser) + self.handle_request(listener_name, req, client, addr) + else: + # keepalive loop + proxy_protocol_info = {} + while True: + req = None + with self.timeout_ctx(): + req = next(parser) + if not req: + break + if req.proxy_protocol_info: + proxy_protocol_info = req.proxy_protocol_info + else: + req.proxy_protocol_info = proxy_protocol_info + self.handle_request(listener_name, req, client, addr) + except http.errors.NoMoreData as e: + self.log.debug("Ignored premature client disconnection. %s", e) + except StopIteration as e: + self.log.debug("Closing connection. %s", e) + except ssl.SSLError: + # pass to next try-except level + util.reraise(*sys.exc_info()) + except EnvironmentError: + # pass to next try-except level + util.reraise(*sys.exc_info()) + except Exception as e: + self.handle_error(req, client, addr, e) + except ssl.SSLError as e: + if e.args[0] == ssl.SSL_ERROR_EOF: + self.log.debug("ssl connection closed") + client.close() + else: + self.log.debug("Error processing SSL request.") + self.handle_error(req, client, addr, e) + except EnvironmentError as e: + if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN): + self.log.exception("Socket error processing request.") + else: + if e.errno == errno.ECONNRESET: + self.log.debug("Ignoring connection reset") + elif e.errno == errno.ENOTCONN: + self.log.debug("Ignoring socket not connected") + else: + self.log.debug("Ignoring EPIPE") + except Exception as e: + self.handle_error(req, client, addr, e) + finally: + util.close(client) + + def handle_request(self, listener_name, req, sock, addr): + request_start = datetime.now() + environ = {} + resp = None + try: + self.cfg.pre_request(self, req) + resp, environ = wsgi.create(req, sock, addr, + listener_name, self.cfg) + environ["wsgi.multithread"] = True + self.nr += 1 + if self.nr >= self.max_requests: + if self.alive: + self.log.info("Autorestarting worker after current request.") + self.alive = False + + if not self.alive or not self.cfg.keepalive: + resp.force_close() + + respiter = self.wsgi(environ, resp.start_response) + if self.is_already_handled(respiter): + return False + try: + if isinstance(respiter, environ['wsgi.file_wrapper']): + resp.write_file(respiter) + else: + for item in respiter: + resp.write(item) + resp.close() + finally: + request_time = datetime.now() - request_start + self.log.access(resp, req, environ, request_time) + if hasattr(respiter, "close"): + respiter.close() + if resp.should_close(): + raise StopIteration() + except StopIteration: + raise + except EnvironmentError: + # If the original exception was a socket.error we delegate + # handling it to the caller (where handle() might ignore it) + util.reraise(*sys.exc_info()) + except Exception: + if resp and resp.headers_sent: + # If the requests have already been sent, we should close the + # connection to indicate the error. + self.log.exception("Error handling request") + try: + sock.shutdown(socket.SHUT_RDWR) + sock.close() + except EnvironmentError: + pass + raise StopIteration() + raise + finally: + try: + self.cfg.post_request(self, req, environ, resp) + except Exception: + self.log.exception("Exception in post_request hook") + return True diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/geventlet.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/geventlet.py new file mode 100644 index 00000000..c42ed118 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/geventlet.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +from functools import partial +import sys + +try: + import eventlet +except ImportError: + raise RuntimeError("eventlet worker requires eventlet 0.24.1 or higher") +else: + from packaging.version import parse as parse_version + if parse_version(eventlet.__version__) < parse_version('0.24.1'): + raise RuntimeError("eventlet worker requires eventlet 0.24.1 or higher") + +from eventlet import hubs, greenthread +from eventlet.greenio import GreenSocket +import eventlet.wsgi +import greenlet + +from gunicorn.workers.base_async import AsyncWorker +from gunicorn.sock import ssl_wrap_socket + +# ALREADY_HANDLED is removed in 0.30.3+ now it's `WSGI_LOCAL.already_handled: bool` +# https://github.com/eventlet/eventlet/pull/544 +EVENTLET_WSGI_LOCAL = getattr(eventlet.wsgi, "WSGI_LOCAL", None) +EVENTLET_ALREADY_HANDLED = getattr(eventlet.wsgi, "ALREADY_HANDLED", None) + + +def _eventlet_socket_sendfile(self, file, offset=0, count=None): + # Based on the implementation in gevent which in turn is slightly + # modified from the standard library implementation. + if self.gettimeout() == 0: + raise ValueError("non-blocking sockets are not supported") + if offset: + file.seek(offset) + blocksize = min(count, 8192) if count else 8192 + total_sent = 0 + # localize variable access to minimize overhead + file_read = file.read + sock_send = self.send + try: + while True: + if count: + blocksize = min(count - total_sent, blocksize) + if blocksize <= 0: + break + data = memoryview(file_read(blocksize)) + if not data: + break # EOF + while True: + try: + sent = sock_send(data) + except BlockingIOError: + continue + else: + total_sent += sent + if sent < len(data): + data = data[sent:] + else: + break + return total_sent + finally: + if total_sent > 0 and hasattr(file, 'seek'): + file.seek(offset + total_sent) + + +def _eventlet_serve(sock, handle, concurrency): + """ + Serve requests forever. + + This code is nearly identical to ``eventlet.convenience.serve`` except + that it attempts to join the pool at the end, which allows for gunicorn + graceful shutdowns. + """ + pool = eventlet.greenpool.GreenPool(concurrency) + server_gt = eventlet.greenthread.getcurrent() + + while True: + try: + conn, addr = sock.accept() + gt = pool.spawn(handle, conn, addr) + gt.link(_eventlet_stop, server_gt, conn) + conn, addr, gt = None, None, None + except eventlet.StopServe: + sock.close() + pool.waitall() + return + + +def _eventlet_stop(client, server, conn): + """ + Stop a greenlet handling a request and close its connection. + + This code is lifted from eventlet so as not to depend on undocumented + functions in the library. + """ + try: + try: + client.wait() + finally: + conn.close() + except greenlet.GreenletExit: + pass + except Exception: + greenthread.kill(server, *sys.exc_info()) + + +def patch_sendfile(): + # As of eventlet 0.25.1, GreenSocket.sendfile doesn't exist, + # meaning the native implementations of socket.sendfile will be used. + # If os.sendfile exists, it will attempt to use that, failing explicitly + # if the socket is in non-blocking mode, which the underlying + # socket object /is/. Even the regular _sendfile_use_send will + # fail in that way; plus, it would use the underlying socket.send which isn't + # properly cooperative. So we have to monkey-patch a working socket.sendfile() + # into GreenSocket; in this method, `self.send` will be the GreenSocket's + # send method which is properly cooperative. + if not hasattr(GreenSocket, 'sendfile'): + GreenSocket.sendfile = _eventlet_socket_sendfile + + +class EventletWorker(AsyncWorker): + + def patch(self): + hubs.use_hub() + eventlet.monkey_patch() + patch_sendfile() + + def is_already_handled(self, respiter): + # eventlet >= 0.30.3 + if getattr(EVENTLET_WSGI_LOCAL, "already_handled", None): + raise StopIteration() + # eventlet < 0.30.3 + if respiter == EVENTLET_ALREADY_HANDLED: + raise StopIteration() + return super().is_already_handled(respiter) + + def init_process(self): + self.patch() + super().init_process() + + def handle_quit(self, sig, frame): + eventlet.spawn(super().handle_quit, sig, frame) + + def handle_usr1(self, sig, frame): + eventlet.spawn(super().handle_usr1, sig, frame) + + def timeout_ctx(self): + return eventlet.Timeout(self.cfg.keepalive or None, False) + + def handle(self, listener, client, addr): + if self.cfg.is_ssl: + client = ssl_wrap_socket(client, self.cfg) + super().handle(listener, client, addr) + + def run(self): + acceptors = [] + for sock in self.sockets: + gsock = GreenSocket(sock) + gsock.setblocking(1) + hfun = partial(self.handle, gsock) + acceptor = eventlet.spawn(_eventlet_serve, gsock, hfun, + self.worker_connections) + + acceptors.append(acceptor) + eventlet.sleep(0.0) + + while self.alive: + self.notify() + eventlet.sleep(1.0) + + self.notify() + t = None + try: + with eventlet.Timeout(self.cfg.graceful_timeout) as t: + for a in acceptors: + a.kill(eventlet.StopServe()) + for a in acceptors: + a.wait() + except eventlet.Timeout as te: + if te != t: + raise + for a in acceptors: + a.kill() diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/ggevent.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/ggevent.py new file mode 100644 index 00000000..2125a32d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/ggevent.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import os +import sys +from datetime import datetime +from functools import partial +import time + +try: + import gevent +except ImportError: + raise RuntimeError("gevent worker requires gevent 1.4 or higher") +else: + from packaging.version import parse as parse_version + if parse_version(gevent.__version__) < parse_version('1.4'): + raise RuntimeError("gevent worker requires gevent 1.4 or higher") + +from gevent.pool import Pool +from gevent.server import StreamServer +from gevent import hub, monkey, socket, pywsgi + +import gunicorn +from gunicorn.http.wsgi import base_environ +from gunicorn.sock import ssl_context +from gunicorn.workers.base_async import AsyncWorker + +VERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__) + + +class GeventWorker(AsyncWorker): + + server_class = None + wsgi_handler = None + + def patch(self): + monkey.patch_all() + + # patch sockets + sockets = [] + for s in self.sockets: + sockets.append(socket.socket(s.FAMILY, socket.SOCK_STREAM, + fileno=s.sock.fileno())) + self.sockets = sockets + + def notify(self): + super().notify() + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + sys.exit(0) + + def timeout_ctx(self): + return gevent.Timeout(self.cfg.keepalive, False) + + def run(self): + servers = [] + ssl_args = {} + + if self.cfg.is_ssl: + ssl_args = {"ssl_context": ssl_context(self.cfg)} + + for s in self.sockets: + s.setblocking(1) + pool = Pool(self.worker_connections) + if self.server_class is not None: + environ = base_environ(self.cfg) + environ.update({ + "wsgi.multithread": True, + "SERVER_SOFTWARE": VERSION, + }) + server = self.server_class( + s, application=self.wsgi, spawn=pool, log=self.log, + handler_class=self.wsgi_handler, environ=environ, + **ssl_args) + else: + hfun = partial(self.handle, s) + server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args) + if self.cfg.workers > 1: + server.max_accept = 1 + + server.start() + servers.append(server) + + while self.alive: + self.notify() + gevent.sleep(1.0) + + try: + # Stop accepting requests + for server in servers: + if hasattr(server, 'close'): # gevent 1.0 + server.close() + if hasattr(server, 'kill'): # gevent < 1.0 + server.kill() + + # Handle current requests until graceful_timeout + ts = time.time() + while time.time() - ts <= self.cfg.graceful_timeout: + accepting = 0 + for server in servers: + if server.pool.free_count() != server.pool.size: + accepting += 1 + + # if no server is accepting a connection, we can exit + if not accepting: + return + + self.notify() + gevent.sleep(1.0) + + # Force kill all active the handlers + self.log.warning("Worker graceful timeout (pid:%s)", self.pid) + for server in servers: + server.stop(timeout=1) + except Exception: + pass + + def handle(self, listener, client, addr): + # Connected socket timeout defaults to socket.getdefaulttimeout(). + # This forces to blocking mode. + client.setblocking(1) + super().handle(listener, client, addr) + + def handle_request(self, listener_name, req, sock, addr): + try: + super().handle_request(listener_name, req, sock, addr) + except gevent.GreenletExit: + pass + except SystemExit: + pass + + def handle_quit(self, sig, frame): + # Move this out of the signal handler so we can use + # blocking calls. See #1126 + gevent.spawn(super().handle_quit, sig, frame) + + def handle_usr1(self, sig, frame): + # Make the gevent workers handle the usr1 signal + # by deferring to a new greenlet. See #1645 + gevent.spawn(super().handle_usr1, sig, frame) + + def init_process(self): + self.patch() + hub.reinit() + super().init_process() + + +class GeventResponse(object): + + status = None + headers = None + sent = None + + def __init__(self, status, headers, clength): + self.status = status + self.headers = headers + self.sent = clength + + +class PyWSGIHandler(pywsgi.WSGIHandler): + + def log_request(self): + start = datetime.fromtimestamp(self.time_start) + finish = datetime.fromtimestamp(self.time_finish) + response_time = finish - start + resp_headers = getattr(self, 'response_headers', {}) + resp = GeventResponse(self.status, resp_headers, self.response_length) + if hasattr(self, 'headers'): + req_headers = self.headers.items() + else: + req_headers = [] + self.server.log.access(resp, req_headers, self.environ, response_time) + + def get_environ(self): + env = super().get_environ() + env['gunicorn.sock'] = self.socket + env['RAW_URI'] = self.path + return env + + +class PyWSGIServer(pywsgi.WSGIServer): + pass + + +class GeventPyWSGIWorker(GeventWorker): + "The Gevent StreamServer based workers." + server_class = PyWSGIServer + wsgi_handler = PyWSGIHandler diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py new file mode 100644 index 00000000..c9c42345 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py @@ -0,0 +1,373 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +# design: +# A threaded worker accepts connections in the main loop, accepted +# connections are added to the thread pool as a connection job. +# Keepalive connections are put back in the loop waiting for an event. +# If no event happen after the keep alive timeout, the connection is +# closed. +# pylint: disable=no-else-break + +from concurrent import futures +import errno +import os +import selectors +import socket +import ssl +import sys +import time +from collections import deque +from datetime import datetime +from functools import partial +from threading import RLock + +from . import base +from .. import http +from .. import util +from .. import sock +from ..http import wsgi + + +class TConn(object): + + def __init__(self, cfg, sock, client, server): + self.cfg = cfg + self.sock = sock + self.client = client + self.server = server + + self.timeout = None + self.parser = None + self.initialized = False + + # set the socket to non blocking + self.sock.setblocking(False) + + def init(self): + self.initialized = True + self.sock.setblocking(True) + + if self.parser is None: + # wrap the socket if needed + if self.cfg.is_ssl: + self.sock = sock.ssl_wrap_socket(self.sock, self.cfg) + + # initialize the parser + self.parser = http.RequestParser(self.cfg, self.sock, self.client) + + def set_timeout(self): + # set the timeout + self.timeout = time.time() + self.cfg.keepalive + + def close(self): + util.close(self.sock) + + +class ThreadWorker(base.Worker): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.worker_connections = self.cfg.worker_connections + self.max_keepalived = self.cfg.worker_connections - self.cfg.threads + # initialise the pool + self.tpool = None + self.poller = None + self._lock = None + self.futures = deque() + self._keep = deque() + self.nr_conns = 0 + + @classmethod + def check_config(cls, cfg, log): + max_keepalived = cfg.worker_connections - cfg.threads + + if max_keepalived <= 0 and cfg.keepalive: + log.warning("No keepalived connections can be handled. " + + "Check the number of worker connections and threads.") + + def init_process(self): + self.tpool = self.get_thread_pool() + self.poller = selectors.DefaultSelector() + self._lock = RLock() + super().init_process() + + def get_thread_pool(self): + """Override this method to customize how the thread pool is created""" + return futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + + def handle_quit(self, sig, frame): + self.alive = False + # worker_int callback + self.cfg.worker_int(self) + self.tpool.shutdown(False) + time.sleep(0.1) + sys.exit(0) + + def _wrap_future(self, fs, conn): + fs.conn = conn + self.futures.append(fs) + fs.add_done_callback(self.finish_request) + + def enqueue_req(self, conn): + conn.init() + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self._wrap_future(fs, conn) + + def accept(self, server, listener): + try: + sock, client = listener.accept() + # initialize the connection object + conn = TConn(self.cfg, sock, client, server) + + self.nr_conns += 1 + # wait until socket is readable + with self._lock: + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) + except EnvironmentError as e: + if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, + errno.EWOULDBLOCK): + raise + + def on_client_socket_readable(self, conn, client): + with self._lock: + # unregister the client from the poller + self.poller.unregister(client) + + if conn.initialized: + # remove the connection from keepalive + try: + self._keep.remove(conn) + except ValueError: + # race condition + return + + # submit the connection to a worker + self.enqueue_req(conn) + + def murder_keepalived(self): + now = time.time() + while True: + with self._lock: + try: + # remove the connection from the queue + conn = self._keep.popleft() + except IndexError: + break + + delta = conn.timeout - now + if delta > 0: + # add the connection back to the queue + with self._lock: + self._keep.appendleft(conn) + break + else: + self.nr_conns -= 1 + # remove the socket from the poller + with self._lock: + try: + self.poller.unregister(conn.sock) + except EnvironmentError as e: + if e.errno != errno.EBADF: + raise + except KeyError: + # already removed by the system, continue + pass + except ValueError: + # already removed by the system continue + pass + + # close the socket + conn.close() + + def is_parent_alive(self): + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + return False + return True + + def run(self): + # init listeners, add them to the event loop + for sock in self.sockets: + sock.setblocking(False) + # a race condition during graceful shutdown may make the listener + # name unavailable in the request handler so capture it once here + server = sock.getsockname() + acceptor = partial(self.accept, server) + self.poller.register(sock, selectors.EVENT_READ, acceptor) + + while self.alive: + # notify the arbiter we are alive + self.notify() + + # can we accept more connections? + if self.nr_conns < self.worker_connections: + # wait for an event + events = self.poller.select(1.0) + for key, _ in events: + callback = key.data + callback(key.fileobj) + + # check (but do not wait) for finished requests + result = futures.wait(self.futures, timeout=0, + return_when=futures.FIRST_COMPLETED) + else: + # wait for a request to finish + result = futures.wait(self.futures, timeout=1.0, + return_when=futures.FIRST_COMPLETED) + + # clean up finished requests + for fut in result.done: + self.futures.remove(fut) + + if not self.is_parent_alive(): + break + + # handle keepalive timeouts + self.murder_keepalived() + + self.tpool.shutdown(False) + self.poller.close() + + for s in self.sockets: + s.close() + + futures.wait(self.futures, timeout=self.cfg.graceful_timeout) + + def finish_request(self, fs): + if fs.cancelled(): + self.nr_conns -= 1 + fs.conn.close() + return + + try: + (keepalive, conn) = fs.result() + # if the connection should be kept alived add it + # to the eventloop and record it + if keepalive and self.alive: + # flag the socket as non blocked + conn.sock.setblocking(False) + + # register the connection + conn.set_timeout() + with self._lock: + self._keep.append(conn) + + # add the socket to the event loop + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) + else: + self.nr_conns -= 1 + conn.close() + except Exception: + # an exception happened, make sure to close the + # socket. + self.nr_conns -= 1 + fs.conn.close() + + def handle(self, conn): + keepalive = False + req = None + try: + req = next(conn.parser) + if not req: + return (False, conn) + + # handle the request + keepalive = self.handle_request(req, conn) + if keepalive: + return (keepalive, conn) + except http.errors.NoMoreData as e: + self.log.debug("Ignored premature client disconnection. %s", e) + + except StopIteration as e: + self.log.debug("Closing connection. %s", e) + except ssl.SSLError as e: + if e.args[0] == ssl.SSL_ERROR_EOF: + self.log.debug("ssl connection closed") + conn.sock.close() + else: + self.log.debug("Error processing SSL request.") + self.handle_error(req, conn.sock, conn.client, e) + + except EnvironmentError as e: + if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN): + self.log.exception("Socket error processing request.") + else: + if e.errno == errno.ECONNRESET: + self.log.debug("Ignoring connection reset") + elif e.errno == errno.ENOTCONN: + self.log.debug("Ignoring socket not connected") + else: + self.log.debug("Ignoring connection epipe") + except Exception as e: + self.handle_error(req, conn.sock, conn.client, e) + + return (False, conn) + + def handle_request(self, req, conn): + environ = {} + resp = None + try: + self.cfg.pre_request(self, req) + request_start = datetime.now() + resp, environ = wsgi.create(req, conn.sock, conn.client, + conn.server, self.cfg) + environ["wsgi.multithread"] = True + self.nr += 1 + if self.nr >= self.max_requests: + if self.alive: + self.log.info("Autorestarting worker after current request.") + self.alive = False + resp.force_close() + + if not self.alive or not self.cfg.keepalive: + resp.force_close() + elif len(self._keep) >= self.max_keepalived: + resp.force_close() + + respiter = self.wsgi(environ, resp.start_response) + try: + if isinstance(respiter, environ['wsgi.file_wrapper']): + resp.write_file(respiter) + else: + for item in respiter: + resp.write(item) + + resp.close() + finally: + request_time = datetime.now() - request_start + self.log.access(resp, req, environ, request_time) + if hasattr(respiter, "close"): + respiter.close() + + if resp.should_close(): + self.log.debug("Closing connection.") + return False + except EnvironmentError: + # pass to next try-except level + util.reraise(*sys.exc_info()) + except Exception: + if resp and resp.headers_sent: + # If the requests have already been sent, we should close the + # connection to indicate the error. + self.log.exception("Error handling request") + try: + conn.sock.shutdown(socket.SHUT_RDWR) + conn.sock.close() + except EnvironmentError: + pass + raise StopIteration() + raise + finally: + try: + self.cfg.post_request(self, req, environ, resp) + except Exception: + self.log.exception("Exception in post_request hook") + + return True diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/gtornado.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/gtornado.py new file mode 100644 index 00000000..28506119 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/gtornado.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import os +import sys + +try: + import tornado +except ImportError: + raise RuntimeError("You need tornado installed to use this worker.") +import tornado.web +import tornado.httpserver +from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.wsgi import WSGIContainer +from gunicorn.workers.base import Worker +from gunicorn import __version__ as gversion +from gunicorn.sock import ssl_context + + +# Tornado 5.0 updated its IOLoop, and the `io_loop` arguments to many +# Tornado functions have been removed in Tornado 5.0. Also, they no +# longer store PeriodCallbacks in ioloop._callbacks. Instead we store +# them on our side, and use stop() on them when stopping the worker. +# See https://www.tornadoweb.org/en/stable/releases/v5.0.0.html#backwards-compatibility-notes +# for more details. +TORNADO5 = tornado.version_info >= (5, 0, 0) + + +class TornadoWorker(Worker): + + @classmethod + def setup(cls): + web = sys.modules.pop("tornado.web") + old_clear = web.RequestHandler.clear + + def clear(self): + old_clear(self) + if "Gunicorn" not in self._headers["Server"]: + self._headers["Server"] += " (Gunicorn/%s)" % gversion + web.RequestHandler.clear = clear + sys.modules["tornado.web"] = web + + def handle_exit(self, sig, frame): + if self.alive: + super().handle_exit(sig, frame) + + def handle_request(self): + self.nr += 1 + if self.alive and self.nr >= self.max_requests: + self.log.info("Autorestarting worker after current request.") + self.alive = False + + def watchdog(self): + if self.alive: + self.notify() + + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + self.alive = False + + def heartbeat(self): + if not self.alive: + if self.server_alive: + if hasattr(self, 'server'): + try: + self.server.stop() + except Exception: + pass + self.server_alive = False + else: + if TORNADO5: + for callback in self.callbacks: + callback.stop() + self.ioloop.stop() + else: + if not self.ioloop._callbacks: + self.ioloop.stop() + + def init_process(self): + # IOLoop cannot survive a fork or be shared across processes + # in any way. When multiple processes are being used, each process + # should create its own IOLoop. We should clear current IOLoop + # if exists before os.fork. + IOLoop.clear_current() + super().init_process() + + def run(self): + self.ioloop = IOLoop.instance() + self.alive = True + self.server_alive = False + + if TORNADO5: + self.callbacks = [] + self.callbacks.append(PeriodicCallback(self.watchdog, 1000)) + self.callbacks.append(PeriodicCallback(self.heartbeat, 1000)) + for callback in self.callbacks: + callback.start() + else: + PeriodicCallback(self.watchdog, 1000, io_loop=self.ioloop).start() + PeriodicCallback(self.heartbeat, 1000, io_loop=self.ioloop).start() + + # Assume the app is a WSGI callable if its not an + # instance of tornado.web.Application or is an + # instance of tornado.wsgi.WSGIApplication + app = self.wsgi + + if tornado.version_info[0] < 6: + if not isinstance(app, tornado.web.Application) or \ + isinstance(app, tornado.wsgi.WSGIApplication): + app = WSGIContainer(app) + elif not isinstance(app, WSGIContainer) and \ + not isinstance(app, tornado.web.Application): + app = WSGIContainer(app) + + # Monkey-patching HTTPConnection.finish to count the + # number of requests being handled by Tornado. This + # will help gunicorn shutdown the worker if max_requests + # is exceeded. + httpserver = sys.modules["tornado.httpserver"] + if hasattr(httpserver, 'HTTPConnection'): + old_connection_finish = httpserver.HTTPConnection.finish + + def finish(other): + self.handle_request() + old_connection_finish(other) + httpserver.HTTPConnection.finish = finish + sys.modules["tornado.httpserver"] = httpserver + + server_class = tornado.httpserver.HTTPServer + else: + + class _HTTPServer(tornado.httpserver.HTTPServer): + + def on_close(instance, server_conn): + self.handle_request() + super(_HTTPServer, instance).on_close(server_conn) + + server_class = _HTTPServer + + if self.cfg.is_ssl: + if TORNADO5: + server = server_class(app, ssl_options=ssl_context(self.cfg)) + else: + server = server_class(app, io_loop=self.ioloop, + ssl_options=ssl_context(self.cfg)) + else: + if TORNADO5: + server = server_class(app) + else: + server = server_class(app, io_loop=self.ioloop) + + self.server = server + self.server_alive = True + + for s in self.sockets: + s.setblocking(0) + if hasattr(server, "add_socket"): # tornado > 2.0 + server.add_socket(s) + elif hasattr(server, "_sockets"): # tornado 2.0 + server._sockets[s.fileno()] = s + + server.no_keep_alive = self.cfg.keepalive <= 0 + server.start(num_processes=1) + + self.ioloop.start() diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/sync.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/sync.py new file mode 100644 index 00000000..39a209f0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/sync.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. +# + +from datetime import datetime +import errno +import os +import select +import socket +import ssl +import sys + +from gunicorn import http +from gunicorn.http import wsgi +from gunicorn import sock +from gunicorn import util +from gunicorn.workers import base + + +class StopWaiting(Exception): + """ exception raised to stop waiting for a connection """ + + +class SyncWorker(base.Worker): + + def accept(self, listener): + client, addr = listener.accept() + client.setblocking(1) + util.close_on_exec(client) + self.handle(listener, client, addr) + + def wait(self, timeout): + try: + self.notify() + ret = select.select(self.wait_fds, [], [], timeout) + if ret[0]: + if self.PIPE[0] in ret[0]: + os.read(self.PIPE[0], 1) + return ret[0] + + except select.error as e: + if e.args[0] == errno.EINTR: + return self.sockets + if e.args[0] == errno.EBADF: + if self.nr < 0: + return self.sockets + else: + raise StopWaiting + raise + + def is_parent_alive(self): + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + return False + return True + + def run_for_one(self, timeout): + listener = self.sockets[0] + while self.alive: + self.notify() + + # Accept a connection. If we get an error telling us + # that no connection is waiting we fall down to the + # select which is where we'll wait for a bit for new + # workers to come give us some love. + try: + self.accept(listener) + # Keep processing clients until no one is waiting. This + # prevents the need to select() for every client that we + # process. + continue + + except EnvironmentError as e: + if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, + errno.EWOULDBLOCK): + raise + + if not self.is_parent_alive(): + return + + try: + self.wait(timeout) + except StopWaiting: + return + + def run_for_multiple(self, timeout): + while self.alive: + self.notify() + + try: + ready = self.wait(timeout) + except StopWaiting: + return + + if ready is not None: + for listener in ready: + if listener == self.PIPE[0]: + continue + + try: + self.accept(listener) + except EnvironmentError as e: + if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, + errno.EWOULDBLOCK): + raise + + if not self.is_parent_alive(): + return + + def run(self): + # if no timeout is given the worker will never wait and will + # use the CPU for nothing. This minimal timeout prevent it. + timeout = self.timeout or 0.5 + + # self.socket appears to lose its blocking status after + # we fork in the arbiter. Reset it here. + for s in self.sockets: + s.setblocking(0) + + if len(self.sockets) > 1: + self.run_for_multiple(timeout) + else: + self.run_for_one(timeout) + + def handle(self, listener, client, addr): + req = None + try: + if self.cfg.is_ssl: + client = sock.ssl_wrap_socket(client, self.cfg) + parser = http.RequestParser(self.cfg, client, addr) + req = next(parser) + self.handle_request(listener, req, client, addr) + except http.errors.NoMoreData as e: + self.log.debug("Ignored premature client disconnection. %s", e) + except StopIteration as e: + self.log.debug("Closing connection. %s", e) + except ssl.SSLError as e: + if e.args[0] == ssl.SSL_ERROR_EOF: + self.log.debug("ssl connection closed") + client.close() + else: + self.log.debug("Error processing SSL request.") + self.handle_error(req, client, addr, e) + except EnvironmentError as e: + if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN): + self.log.exception("Socket error processing request.") + else: + if e.errno == errno.ECONNRESET: + self.log.debug("Ignoring connection reset") + elif e.errno == errno.ENOTCONN: + self.log.debug("Ignoring socket not connected") + else: + self.log.debug("Ignoring EPIPE") + except Exception as e: + self.handle_error(req, client, addr, e) + finally: + util.close(client) + + def handle_request(self, listener, req, client, addr): + environ = {} + resp = None + try: + self.cfg.pre_request(self, req) + request_start = datetime.now() + resp, environ = wsgi.create(req, client, addr, + listener.getsockname(), self.cfg) + # Force the connection closed until someone shows + # a buffering proxy that supports Keep-Alive to + # the backend. + resp.force_close() + self.nr += 1 + if self.nr >= self.max_requests: + self.log.info("Autorestarting worker after current request.") + self.alive = False + respiter = self.wsgi(environ, resp.start_response) + try: + if isinstance(respiter, environ['wsgi.file_wrapper']): + resp.write_file(respiter) + else: + for item in respiter: + resp.write(item) + resp.close() + finally: + request_time = datetime.now() - request_start + self.log.access(resp, req, environ, request_time) + if hasattr(respiter, "close"): + respiter.close() + except EnvironmentError: + # pass to next try-except level + util.reraise(*sys.exc_info()) + except Exception: + if resp and resp.headers_sent: + # If the requests have already been sent, we should close the + # connection to indicate the error. + self.log.exception("Error handling request") + try: + client.shutdown(socket.SHUT_RDWR) + client.close() + except EnvironmentError: + pass + raise StopIteration() + raise + finally: + try: + self.cfg.post_request(self, req, environ, resp) + except Exception: + self.log.exception("Exception in post_request hook") diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/workertmp.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/workertmp.py new file mode 100644 index 00000000..cc79ecd6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/workertmp.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import os +import platform +import tempfile + +from gunicorn import util + +PLATFORM = platform.system() +IS_CYGWIN = PLATFORM.startswith('CYGWIN') + + +class WorkerTmp(object): + + def __init__(self, cfg): + old_umask = os.umask(cfg.umask) + fdir = cfg.worker_tmp_dir + if fdir and not os.path.isdir(fdir): + raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir) + fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir) + os.umask(old_umask) + + # change the owner and group of the file if the worker will run as + # a different user or group, so that the worker can modify the file + if cfg.uid != os.geteuid() or cfg.gid != os.getegid(): + util.chown(name, cfg.uid, cfg.gid) + + # unlink the file so we don't leak temporary files + try: + if not IS_CYGWIN: + util.unlink(name) + # In Python 3.8, open() emits RuntimeWarning if buffering=1 for binary mode. + # Because we never write to this file, pass 0 to switch buffering off. + self._tmp = os.fdopen(fd, 'w+b', 0) + except Exception: + os.close(fd) + raise + + self.spinner = 0 + + def notify(self): + self.spinner = (self.spinner + 1) % 2 + os.fchmod(self._tmp.fileno(), self.spinner) + + def last_update(self): + return os.fstat(self._tmp.fileno()).st_ctime + + def fileno(self): + return self._tmp.fileno() + + def close(self): + return self._tmp.close() |