aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py')
-rw-r--r--.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py373
1 files changed, 373 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py b/.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py
new file mode 100644
index 00000000..c9c42345
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gunicorn/workers/gthread.py
@@ -0,0 +1,373 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gunicorn released under the MIT license.
+# See the NOTICE for more information.
+
+# design:
+# A threaded worker accepts connections in the main loop, accepted
+# connections are added to the thread pool as a connection job.
+# Keepalive connections are put back in the loop waiting for an event.
+# If no event happen after the keep alive timeout, the connection is
+# closed.
+# pylint: disable=no-else-break
+
+from concurrent import futures
+import errno
+import os
+import selectors
+import socket
+import ssl
+import sys
+import time
+from collections import deque
+from datetime import datetime
+from functools import partial
+from threading import RLock
+
+from . import base
+from .. import http
+from .. import util
+from .. import sock
+from ..http import wsgi
+
+
+class TConn(object):
+
+ def __init__(self, cfg, sock, client, server):
+ self.cfg = cfg
+ self.sock = sock
+ self.client = client
+ self.server = server
+
+ self.timeout = None
+ self.parser = None
+ self.initialized = False
+
+ # set the socket to non blocking
+ self.sock.setblocking(False)
+
+ def init(self):
+ self.initialized = True
+ self.sock.setblocking(True)
+
+ if self.parser is None:
+ # wrap the socket if needed
+ if self.cfg.is_ssl:
+ self.sock = sock.ssl_wrap_socket(self.sock, self.cfg)
+
+ # initialize the parser
+ self.parser = http.RequestParser(self.cfg, self.sock, self.client)
+
+ def set_timeout(self):
+ # set the timeout
+ self.timeout = time.time() + self.cfg.keepalive
+
+ def close(self):
+ util.close(self.sock)
+
+
+class ThreadWorker(base.Worker):
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.worker_connections = self.cfg.worker_connections
+ self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
+ # initialise the pool
+ self.tpool = None
+ self.poller = None
+ self._lock = None
+ self.futures = deque()
+ self._keep = deque()
+ self.nr_conns = 0
+
+ @classmethod
+ def check_config(cls, cfg, log):
+ max_keepalived = cfg.worker_connections - cfg.threads
+
+ if max_keepalived <= 0 and cfg.keepalive:
+ log.warning("No keepalived connections can be handled. " +
+ "Check the number of worker connections and threads.")
+
+ def init_process(self):
+ self.tpool = self.get_thread_pool()
+ self.poller = selectors.DefaultSelector()
+ self._lock = RLock()
+ super().init_process()
+
+ def get_thread_pool(self):
+ """Override this method to customize how the thread pool is created"""
+ return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
+
+ def handle_quit(self, sig, frame):
+ self.alive = False
+ # worker_int callback
+ self.cfg.worker_int(self)
+ self.tpool.shutdown(False)
+ time.sleep(0.1)
+ sys.exit(0)
+
+ def _wrap_future(self, fs, conn):
+ fs.conn = conn
+ self.futures.append(fs)
+ fs.add_done_callback(self.finish_request)
+
+ def enqueue_req(self, conn):
+ conn.init()
+ # submit the connection to a worker
+ fs = self.tpool.submit(self.handle, conn)
+ self._wrap_future(fs, conn)
+
+ def accept(self, server, listener):
+ try:
+ sock, client = listener.accept()
+ # initialize the connection object
+ conn = TConn(self.cfg, sock, client, server)
+
+ self.nr_conns += 1
+ # wait until socket is readable
+ with self._lock:
+ self.poller.register(conn.sock, selectors.EVENT_READ,
+ partial(self.on_client_socket_readable, conn))
+ except EnvironmentError as e:
+ if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
+ errno.EWOULDBLOCK):
+ raise
+
+ def on_client_socket_readable(self, conn, client):
+ with self._lock:
+ # unregister the client from the poller
+ self.poller.unregister(client)
+
+ if conn.initialized:
+ # remove the connection from keepalive
+ try:
+ self._keep.remove(conn)
+ except ValueError:
+ # race condition
+ return
+
+ # submit the connection to a worker
+ self.enqueue_req(conn)
+
+ def murder_keepalived(self):
+ now = time.time()
+ while True:
+ with self._lock:
+ try:
+ # remove the connection from the queue
+ conn = self._keep.popleft()
+ except IndexError:
+ break
+
+ delta = conn.timeout - now
+ if delta > 0:
+ # add the connection back to the queue
+ with self._lock:
+ self._keep.appendleft(conn)
+ break
+ else:
+ self.nr_conns -= 1
+ # remove the socket from the poller
+ with self._lock:
+ try:
+ self.poller.unregister(conn.sock)
+ except EnvironmentError as e:
+ if e.errno != errno.EBADF:
+ raise
+ except KeyError:
+ # already removed by the system, continue
+ pass
+ except ValueError:
+ # already removed by the system continue
+ pass
+
+ # close the socket
+ conn.close()
+
+ def is_parent_alive(self):
+ # If our parent changed then we shut down.
+ if self.ppid != os.getppid():
+ self.log.info("Parent changed, shutting down: %s", self)
+ return False
+ return True
+
+ def run(self):
+ # init listeners, add them to the event loop
+ for sock in self.sockets:
+ sock.setblocking(False)
+ # a race condition during graceful shutdown may make the listener
+ # name unavailable in the request handler so capture it once here
+ server = sock.getsockname()
+ acceptor = partial(self.accept, server)
+ self.poller.register(sock, selectors.EVENT_READ, acceptor)
+
+ while self.alive:
+ # notify the arbiter we are alive
+ self.notify()
+
+ # can we accept more connections?
+ if self.nr_conns < self.worker_connections:
+ # wait for an event
+ events = self.poller.select(1.0)
+ for key, _ in events:
+ callback = key.data
+ callback(key.fileobj)
+
+ # check (but do not wait) for finished requests
+ result = futures.wait(self.futures, timeout=0,
+ return_when=futures.FIRST_COMPLETED)
+ else:
+ # wait for a request to finish
+ result = futures.wait(self.futures, timeout=1.0,
+ return_when=futures.FIRST_COMPLETED)
+
+ # clean up finished requests
+ for fut in result.done:
+ self.futures.remove(fut)
+
+ if not self.is_parent_alive():
+ break
+
+ # handle keepalive timeouts
+ self.murder_keepalived()
+
+ self.tpool.shutdown(False)
+ self.poller.close()
+
+ for s in self.sockets:
+ s.close()
+
+ futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
+
+ def finish_request(self, fs):
+ if fs.cancelled():
+ self.nr_conns -= 1
+ fs.conn.close()
+ return
+
+ try:
+ (keepalive, conn) = fs.result()
+ # if the connection should be kept alived add it
+ # to the eventloop and record it
+ if keepalive and self.alive:
+ # flag the socket as non blocked
+ conn.sock.setblocking(False)
+
+ # register the connection
+ conn.set_timeout()
+ with self._lock:
+ self._keep.append(conn)
+
+ # add the socket to the event loop
+ self.poller.register(conn.sock, selectors.EVENT_READ,
+ partial(self.on_client_socket_readable, conn))
+ else:
+ self.nr_conns -= 1
+ conn.close()
+ except Exception:
+ # an exception happened, make sure to close the
+ # socket.
+ self.nr_conns -= 1
+ fs.conn.close()
+
+ def handle(self, conn):
+ keepalive = False
+ req = None
+ try:
+ req = next(conn.parser)
+ if not req:
+ return (False, conn)
+
+ # handle the request
+ keepalive = self.handle_request(req, conn)
+ if keepalive:
+ return (keepalive, conn)
+ except http.errors.NoMoreData as e:
+ self.log.debug("Ignored premature client disconnection. %s", e)
+
+ except StopIteration as e:
+ self.log.debug("Closing connection. %s", e)
+ except ssl.SSLError as e:
+ if e.args[0] == ssl.SSL_ERROR_EOF:
+ self.log.debug("ssl connection closed")
+ conn.sock.close()
+ else:
+ self.log.debug("Error processing SSL request.")
+ self.handle_error(req, conn.sock, conn.client, e)
+
+ except EnvironmentError as e:
+ if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
+ self.log.exception("Socket error processing request.")
+ else:
+ if e.errno == errno.ECONNRESET:
+ self.log.debug("Ignoring connection reset")
+ elif e.errno == errno.ENOTCONN:
+ self.log.debug("Ignoring socket not connected")
+ else:
+ self.log.debug("Ignoring connection epipe")
+ except Exception as e:
+ self.handle_error(req, conn.sock, conn.client, e)
+
+ return (False, conn)
+
+ def handle_request(self, req, conn):
+ environ = {}
+ resp = None
+ try:
+ self.cfg.pre_request(self, req)
+ request_start = datetime.now()
+ resp, environ = wsgi.create(req, conn.sock, conn.client,
+ conn.server, self.cfg)
+ environ["wsgi.multithread"] = True
+ self.nr += 1
+ if self.nr >= self.max_requests:
+ if self.alive:
+ self.log.info("Autorestarting worker after current request.")
+ self.alive = False
+ resp.force_close()
+
+ if not self.alive or not self.cfg.keepalive:
+ resp.force_close()
+ elif len(self._keep) >= self.max_keepalived:
+ resp.force_close()
+
+ respiter = self.wsgi(environ, resp.start_response)
+ try:
+ if isinstance(respiter, environ['wsgi.file_wrapper']):
+ resp.write_file(respiter)
+ else:
+ for item in respiter:
+ resp.write(item)
+
+ resp.close()
+ finally:
+ request_time = datetime.now() - request_start
+ self.log.access(resp, req, environ, request_time)
+ if hasattr(respiter, "close"):
+ respiter.close()
+
+ if resp.should_close():
+ self.log.debug("Closing connection.")
+ return False
+ except EnvironmentError:
+ # pass to next try-except level
+ util.reraise(*sys.exc_info())
+ except Exception:
+ if resp and resp.headers_sent:
+ # If the requests have already been sent, we should close the
+ # connection to indicate the error.
+ self.log.exception("Error handling request")
+ try:
+ conn.sock.shutdown(socket.SHUT_RDWR)
+ conn.sock.close()
+ except EnvironmentError:
+ pass
+ raise StopIteration()
+ raise
+ finally:
+ try:
+ self.cfg.post_request(self, req, environ, resp)
+ except Exception:
+ self.log.exception("Exception in post_request hook")
+
+ return True