about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/gunicorn/util.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/gunicorn/util.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/gunicorn/util.py')
-rw-r--r--.venv/lib/python3.12/site-packages/gunicorn/util.py654
1 files changed, 654 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/gunicorn/util.py b/.venv/lib/python3.12/site-packages/gunicorn/util.py
new file mode 100644
index 00000000..751deea7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gunicorn/util.py
@@ -0,0 +1,654 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gunicorn released under the MIT license.
+# See the NOTICE for more information.
+import ast
+import email.utils
+import errno
+import fcntl
+import html
+import importlib
+import inspect
+import io
+import logging
+import os
+import pwd
+import random
+import re
+import socket
+import sys
+import textwrap
+import time
+import traceback
+import warnings
+
+try:
+    import importlib.metadata as importlib_metadata
+except (ModuleNotFoundError, ImportError):
+    import importlib_metadata
+
+from gunicorn.errors import AppImportError
+from gunicorn.workers import SUPPORTED_WORKERS
+import urllib.parse
+
+REDIRECT_TO = getattr(os, 'devnull', '/dev/null')
+
+# Server and Date aren't technically hop-by-hop
+# headers, but they are in the purview of the
+# origin server which the WSGI spec says we should
+# act like. So we drop them and add our own.
+#
+# In the future, concatenation server header values
+# might be better, but nothing else does it and
+# dropping them is easier.
+hop_headers = set("""
+    connection keep-alive proxy-authenticate proxy-authorization
+    te trailers transfer-encoding upgrade
+    server date
+    """.split())
+
+try:
+    from setproctitle import setproctitle
+
+    def _setproctitle(title):
+        setproctitle("gunicorn: %s" % title)
+except ImportError:
+    def _setproctitle(title):
+        pass
+
+
+def load_entry_point(distribution, group, name):
+    dist_obj = importlib_metadata.distribution(distribution)
+    eps = [ep for ep in dist_obj.entry_points
+           if ep.group == group and ep.name == name]
+    if not eps:
+        raise ImportError("Entry point %r not found" % ((group, name),))
+    return eps[0].load()
+
+
+def load_class(uri, default="gunicorn.workers.sync.SyncWorker",
+               section="gunicorn.workers"):
+    if inspect.isclass(uri):
+        return uri
+    if uri.startswith("egg:"):
+        # uses entry points
+        entry_str = uri.split("egg:")[1]
+        try:
+            dist, name = entry_str.rsplit("#", 1)
+        except ValueError:
+            dist = entry_str
+            name = default
+
+        try:
+            return load_entry_point(dist, section, name)
+        except Exception:
+            exc = traceback.format_exc()
+            msg = "class uri %r invalid or not found: \n\n[%s]"
+            raise RuntimeError(msg % (uri, exc))
+    else:
+        components = uri.split('.')
+        if len(components) == 1:
+            while True:
+                if uri.startswith("#"):
+                    uri = uri[1:]
+
+                if uri in SUPPORTED_WORKERS:
+                    components = SUPPORTED_WORKERS[uri].split(".")
+                    break
+
+                try:
+                    return load_entry_point(
+                        "gunicorn", section, uri
+                    )
+                except Exception:
+                    exc = traceback.format_exc()
+                    msg = "class uri %r invalid or not found: \n\n[%s]"
+                    raise RuntimeError(msg % (uri, exc))
+
+        klass = components.pop(-1)
+
+        try:
+            mod = importlib.import_module('.'.join(components))
+        except Exception:
+            exc = traceback.format_exc()
+            msg = "class uri %r invalid or not found: \n\n[%s]"
+            raise RuntimeError(msg % (uri, exc))
+        return getattr(mod, klass)
+
+
+positionals = (
+    inspect.Parameter.POSITIONAL_ONLY,
+    inspect.Parameter.POSITIONAL_OR_KEYWORD,
+)
+
+
+def get_arity(f):
+    sig = inspect.signature(f)
+    arity = 0
+
+    for param in sig.parameters.values():
+        if param.kind in positionals:
+            arity += 1
+
+    return arity
+
+
+def get_username(uid):
+    """ get the username for a user id"""
+    return pwd.getpwuid(uid).pw_name
+
+
+def set_owner_process(uid, gid, initgroups=False):
+    """ set user and group of workers processes """
+
+    if gid:
+        if uid:
+            try:
+                username = get_username(uid)
+            except KeyError:
+                initgroups = False
+
+        # versions of python < 2.6.2 don't manage unsigned int for
+        # groups like on osx or fedora
+        gid = abs(gid) & 0x7FFFFFFF
+
+        if initgroups:
+            os.initgroups(username, gid)
+        elif gid != os.getgid():
+            os.setgid(gid)
+
+    if uid and uid != os.getuid():
+        os.setuid(uid)
+
+
+def chown(path, uid, gid):
+    os.chown(path, uid, gid)
+
+
+if sys.platform.startswith("win"):
+    def _waitfor(func, pathname, waitall=False):
+        # Perform the operation
+        func(pathname)
+        # Now setup the wait loop
+        if waitall:
+            dirname = pathname
+        else:
+            dirname, name = os.path.split(pathname)
+            dirname = dirname or '.'
+        # Check for `pathname` to be removed from the filesystem.
+        # The exponential backoff of the timeout amounts to a total
+        # of ~1 second after which the deletion is probably an error
+        # anyway.
+        # Testing on a i7@4.3GHz shows that usually only 1 iteration is
+        # required when contention occurs.
+        timeout = 0.001
+        while timeout < 1.0:
+            # Note we are only testing for the existence of the file(s) in
+            # the contents of the directory regardless of any security or
+            # access rights.  If we have made it this far, we have sufficient
+            # permissions to do that much using Python's equivalent of the
+            # Windows API FindFirstFile.
+            # Other Windows APIs can fail or give incorrect results when
+            # dealing with files that are pending deletion.
+            L = os.listdir(dirname)
+            if not L if waitall else name in L:
+                return
+            # Increase the timeout and try again
+            time.sleep(timeout)
+            timeout *= 2
+        warnings.warn('tests may fail, delete still pending for ' + pathname,
+                      RuntimeWarning, stacklevel=4)
+
+    def _unlink(filename):
+        _waitfor(os.unlink, filename)
+else:
+    _unlink = os.unlink
+
+
+def unlink(filename):
+    try:
+        _unlink(filename)
+    except OSError as error:
+        # The filename need not exist.
+        if error.errno not in (errno.ENOENT, errno.ENOTDIR):
+            raise
+
+
+def is_ipv6(addr):
+    try:
+        socket.inet_pton(socket.AF_INET6, addr)
+    except socket.error:  # not a valid address
+        return False
+    except ValueError:  # ipv6 not supported on this platform
+        return False
+    return True
+
+
+def parse_address(netloc, default_port='8000'):
+    if re.match(r'unix:(//)?', netloc):
+        return re.split(r'unix:(//)?', netloc)[-1]
+
+    if netloc.startswith("fd://"):
+        fd = netloc[5:]
+        try:
+            return int(fd)
+        except ValueError:
+            raise RuntimeError("%r is not a valid file descriptor." % fd) from None
+
+    if netloc.startswith("tcp://"):
+        netloc = netloc.split("tcp://")[1]
+    host, port = netloc, default_port
+
+    if '[' in netloc and ']' in netloc:
+        host = netloc.split(']')[0][1:]
+        port = (netloc.split(']:') + [default_port])[1]
+    elif ':' in netloc:
+        host, port = (netloc.split(':') + [default_port])[:2]
+    elif netloc == "":
+        host, port = "0.0.0.0", default_port
+
+    try:
+        port = int(port)
+    except ValueError:
+        raise RuntimeError("%r is not a valid port number." % port)
+
+    return host.lower(), port
+
+
+def close_on_exec(fd):
+    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+    flags |= fcntl.FD_CLOEXEC
+    fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+
+def set_non_blocking(fd):
+    flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
+    fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+def close(sock):
+    try:
+        sock.close()
+    except socket.error:
+        pass
+
+
+try:
+    from os import closerange
+except ImportError:
+    def closerange(fd_low, fd_high):
+        # Iterate through and close all file descriptors.
+        for fd in range(fd_low, fd_high):
+            try:
+                os.close(fd)
+            except OSError:  # ERROR, fd wasn't open to begin with (ignored)
+                pass
+
+
+def write_chunk(sock, data):
+    if isinstance(data, str):
+        data = data.encode('utf-8')
+    chunk_size = "%X\r\n" % len(data)
+    chunk = b"".join([chunk_size.encode('utf-8'), data, b"\r\n"])
+    sock.sendall(chunk)
+
+
+def write(sock, data, chunked=False):
+    if chunked:
+        return write_chunk(sock, data)
+    sock.sendall(data)
+
+
+def write_nonblock(sock, data, chunked=False):
+    timeout = sock.gettimeout()
+    if timeout != 0.0:
+        try:
+            sock.setblocking(0)
+            return write(sock, data, chunked)
+        finally:
+            sock.setblocking(1)
+    else:
+        return write(sock, data, chunked)
+
+
+def write_error(sock, status_int, reason, mesg):
+    html_error = textwrap.dedent("""\
+    <html>
+      <head>
+        <title>%(reason)s</title>
+      </head>
+      <body>
+        <h1><p>%(reason)s</p></h1>
+        %(mesg)s
+      </body>
+    </html>
+    """) % {"reason": reason, "mesg": html.escape(mesg)}
+
+    http = textwrap.dedent("""\
+    HTTP/1.1 %s %s\r
+    Connection: close\r
+    Content-Type: text/html\r
+    Content-Length: %d\r
+    \r
+    %s""") % (str(status_int), reason, len(html_error), html_error)
+    write_nonblock(sock, http.encode('latin1'))
+
+
+def _called_with_wrong_args(f):
+    """Check whether calling a function raised a ``TypeError`` because
+    the call failed or because something in the function raised the
+    error.
+
+    :param f: The function that was called.
+    :return: ``True`` if the call failed.
+    """
+    tb = sys.exc_info()[2]
+
+    try:
+        while tb is not None:
+            if tb.tb_frame.f_code is f.__code__:
+                # In the function, it was called successfully.
+                return False
+
+            tb = tb.tb_next
+
+        # Didn't reach the function.
+        return True
+    finally:
+        # Delete tb to break a circular reference in Python 2.
+        # https://docs.python.org/2/library/sys.html#sys.exc_info
+        del tb
+
+
+def import_app(module):
+    parts = module.split(":", 1)
+    if len(parts) == 1:
+        obj = "application"
+    else:
+        module, obj = parts[0], parts[1]
+
+    try:
+        mod = importlib.import_module(module)
+    except ImportError:
+        if module.endswith(".py") and os.path.exists(module):
+            msg = "Failed to find application, did you mean '%s:%s'?"
+            raise ImportError(msg % (module.rsplit(".", 1)[0], obj))
+        raise
+
+    # Parse obj as a single expression to determine if it's a valid
+    # attribute name or function call.
+    try:
+        expression = ast.parse(obj, mode="eval").body
+    except SyntaxError:
+        raise AppImportError(
+            "Failed to parse %r as an attribute name or function call." % obj
+        )
+
+    if isinstance(expression, ast.Name):
+        name = expression.id
+        args = kwargs = None
+    elif isinstance(expression, ast.Call):
+        # Ensure the function name is an attribute name only.
+        if not isinstance(expression.func, ast.Name):
+            raise AppImportError("Function reference must be a simple name: %r" % obj)
+
+        name = expression.func.id
+
+        # Parse the positional and keyword arguments as literals.
+        try:
+            args = [ast.literal_eval(arg) for arg in expression.args]
+            kwargs = {kw.arg: ast.literal_eval(kw.value) for kw in expression.keywords}
+        except ValueError:
+            # literal_eval gives cryptic error messages, show a generic
+            # message with the full expression instead.
+            raise AppImportError(
+                "Failed to parse arguments as literal values: %r" % obj
+            )
+    else:
+        raise AppImportError(
+            "Failed to parse %r as an attribute name or function call." % obj
+        )
+
+    is_debug = logging.root.level == logging.DEBUG
+    try:
+        app = getattr(mod, name)
+    except AttributeError:
+        if is_debug:
+            traceback.print_exception(*sys.exc_info())
+        raise AppImportError("Failed to find attribute %r in %r." % (name, module))
+
+    # If the expression was a function call, call the retrieved object
+    # to get the real application.
+    if args is not None:
+        try:
+            app = app(*args, **kwargs)
+        except TypeError as e:
+            # If the TypeError was due to bad arguments to the factory
+            # function, show Python's nice error message without a
+            # traceback.
+            if _called_with_wrong_args(app):
+                raise AppImportError(
+                    "".join(traceback.format_exception_only(TypeError, e)).strip()
+                )
+
+            # Otherwise it was raised from within the function, show the
+            # full traceback.
+            raise
+
+    if app is None:
+        raise AppImportError("Failed to find application object: %r" % obj)
+
+    if not callable(app):
+        raise AppImportError("Application object must be callable.")
+    return app
+
+
+def getcwd():
+    # get current path, try to use PWD env first
+    try:
+        a = os.stat(os.environ['PWD'])
+        b = os.stat(os.getcwd())
+        if a.st_ino == b.st_ino and a.st_dev == b.st_dev:
+            cwd = os.environ['PWD']
+        else:
+            cwd = os.getcwd()
+    except Exception:
+        cwd = os.getcwd()
+    return cwd
+
+
+def http_date(timestamp=None):
+    """Return the current date and time formatted for a message header."""
+    if timestamp is None:
+        timestamp = time.time()
+    s = email.utils.formatdate(timestamp, localtime=False, usegmt=True)
+    return s
+
+
+def is_hoppish(header):
+    return header.lower().strip() in hop_headers
+
+
+def daemonize(enable_stdio_inheritance=False):
+    """\
+    Standard daemonization of a process.
+    http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7
+    """
+    if 'GUNICORN_FD' not in os.environ:
+        if os.fork():
+            os._exit(0)
+        os.setsid()
+
+        if os.fork():
+            os._exit(0)
+
+        os.umask(0o22)
+
+        # In both the following any file descriptors above stdin
+        # stdout and stderr are left untouched. The inheritance
+        # option simply allows one to have output go to a file
+        # specified by way of shell redirection when not wanting
+        # to use --error-log option.
+
+        if not enable_stdio_inheritance:
+            # Remap all of stdin, stdout and stderr on to
+            # /dev/null. The expectation is that users have
+            # specified the --error-log option.
+
+            closerange(0, 3)
+
+            fd_null = os.open(REDIRECT_TO, os.O_RDWR)
+            # PEP 446, make fd for /dev/null inheritable
+            os.set_inheritable(fd_null, True)
+
+            # expect fd_null to be always 0 here, but in-case not ...
+            if fd_null != 0:
+                os.dup2(fd_null, 0)
+
+            os.dup2(fd_null, 1)
+            os.dup2(fd_null, 2)
+
+        else:
+            fd_null = os.open(REDIRECT_TO, os.O_RDWR)
+
+            # Always redirect stdin to /dev/null as we would
+            # never expect to need to read interactive input.
+
+            if fd_null != 0:
+                os.close(0)
+                os.dup2(fd_null, 0)
+
+            # If stdout and stderr are still connected to
+            # their original file descriptors we check to see
+            # if they are associated with terminal devices.
+            # When they are we map them to /dev/null so that
+            # are still detached from any controlling terminal
+            # properly. If not we preserve them as they are.
+            #
+            # If stdin and stdout were not hooked up to the
+            # original file descriptors, then all bets are
+            # off and all we can really do is leave them as
+            # they were.
+            #
+            # This will allow 'gunicorn ... > output.log 2>&1'
+            # to work with stdout/stderr going to the file
+            # as expected.
+            #
+            # Note that if using --error-log option, the log
+            # file specified through shell redirection will
+            # only be used up until the log file specified
+            # by the option takes over. As it replaces stdout
+            # and stderr at the file descriptor level, then
+            # anything using stdout or stderr, including having
+            # cached a reference to them, will still work.
+
+            def redirect(stream, fd_expect):
+                try:
+                    fd = stream.fileno()
+                    if fd == fd_expect and stream.isatty():
+                        os.close(fd)
+                        os.dup2(fd_null, fd)
+                except AttributeError:
+                    pass
+
+            redirect(sys.stdout, 1)
+            redirect(sys.stderr, 2)
+
+
+def seed():
+    try:
+        random.seed(os.urandom(64))
+    except NotImplementedError:
+        random.seed('%s.%s' % (time.time(), os.getpid()))
+
+
+def check_is_writable(path):
+    try:
+        with open(path, 'a') as f:
+            f.close()
+    except IOError as e:
+        raise RuntimeError("Error: '%s' isn't writable [%r]" % (path, e))
+
+
+def to_bytestring(value, encoding="utf8"):
+    """Converts a string argument to a byte string"""
+    if isinstance(value, bytes):
+        return value
+    if not isinstance(value, str):
+        raise TypeError('%r is not a string' % value)
+
+    return value.encode(encoding)
+
+
+def has_fileno(obj):
+    if not hasattr(obj, "fileno"):
+        return False
+
+    # check BytesIO case and maybe others
+    try:
+        obj.fileno()
+    except (AttributeError, IOError, io.UnsupportedOperation):
+        return False
+
+    return True
+
+
+def warn(msg):
+    print("!!!", file=sys.stderr)
+
+    lines = msg.splitlines()
+    for i, line in enumerate(lines):
+        if i == 0:
+            line = "WARNING: %s" % line
+        print("!!! %s" % line, file=sys.stderr)
+
+    print("!!!\n", file=sys.stderr)
+    sys.stderr.flush()
+
+
+def make_fail_app(msg):
+    msg = to_bytestring(msg)
+
+    def app(environ, start_response):
+        start_response("500 Internal Server Error", [
+            ("Content-Type", "text/plain"),
+            ("Content-Length", str(len(msg)))
+        ])
+        return [msg]
+
+    return app
+
+
+def split_request_uri(uri):
+    if uri.startswith("//"):
+        # When the path starts with //, urlsplit considers it as a
+        # relative uri while the RFC says we should consider it as abs_path
+        # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
+        # We use temporary dot prefix to workaround this behaviour
+        parts = urllib.parse.urlsplit("." + uri)
+        return parts._replace(path=parts.path[1:])
+
+    return urllib.parse.urlsplit(uri)
+
+
+# From six.reraise
+def reraise(tp, value, tb=None):
+    try:
+        if value is None:
+            value = tp()
+        if value.__traceback__ is not tb:
+            raise value.with_traceback(tb)
+        raise value
+    finally:
+        value = None
+        tb = None
+
+
+def bytes_to_str(b):
+    if isinstance(b, str):
+        return b
+    return str(b, 'latin1')
+
+
+def unquote_to_wsgi_str(string):
+    return urllib.parse.unquote_to_bytes(string).decode('latin-1')