about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiohttp/web_log.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/web_log.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohttp/web_log.py216
1 files changed, 216 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/web_log.py b/.venv/lib/python3.12/site-packages/aiohttp/web_log.py
new file mode 100644
index 00000000..d5ea2bee
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohttp/web_log.py
@@ -0,0 +1,216 @@
+import datetime
+import functools
+import logging
+import os
+import re
+import time as time_mod
+from collections import namedtuple
+from typing import Any, Callable, Dict, Iterable, List, Tuple  # noqa
+
+from .abc import AbstractAccessLogger
+from .web_request import BaseRequest
+from .web_response import StreamResponse
+
+KeyMethod = namedtuple("KeyMethod", "key method")
+
+
+class AccessLogger(AbstractAccessLogger):
+    """Helper object to log access.
+
+    Usage:
+        log = logging.getLogger("spam")
+        log_format = "%a %{User-Agent}i"
+        access_logger = AccessLogger(log, log_format)
+        access_logger.log(request, response, time)
+
+    Format:
+        %%  The percent sign
+        %a  Remote IP-address (IP-address of proxy if using reverse proxy)
+        %t  Time when the request was started to process
+        %P  The process ID of the child that serviced the request
+        %r  First line of request
+        %s  Response status code
+        %b  Size of response in bytes, including HTTP headers
+        %T  Time taken to serve the request, in seconds
+        %Tf Time taken to serve the request, in seconds with floating fraction
+            in .06f format
+        %D  Time taken to serve the request, in microseconds
+        %{FOO}i  request.headers['FOO']
+        %{FOO}o  response.headers['FOO']
+        %{FOO}e  os.environ['FOO']
+
+    """
+
+    LOG_FORMAT_MAP = {
+        "a": "remote_address",
+        "t": "request_start_time",
+        "P": "process_id",
+        "r": "first_request_line",
+        "s": "response_status",
+        "b": "response_size",
+        "T": "request_time",
+        "Tf": "request_time_frac",
+        "D": "request_time_micro",
+        "i": "request_header",
+        "o": "response_header",
+    }
+
+    LOG_FORMAT = '%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
+    FORMAT_RE = re.compile(r"%(\{([A-Za-z0-9\-_]+)\}([ioe])|[atPrsbOD]|Tf?)")
+    CLEANUP_RE = re.compile(r"(%[^s])")
+    _FORMAT_CACHE: Dict[str, Tuple[str, List[KeyMethod]]] = {}
+
+    def __init__(self, logger: logging.Logger, log_format: str = LOG_FORMAT) -> None:
+        """Initialise the logger.
+
+        logger is a logger object to be used for logging.
+        log_format is a string with apache compatible log format description.
+
+        """
+        super().__init__(logger, log_format=log_format)
+
+        _compiled_format = AccessLogger._FORMAT_CACHE.get(log_format)
+        if not _compiled_format:
+            _compiled_format = self.compile_format(log_format)
+            AccessLogger._FORMAT_CACHE[log_format] = _compiled_format
+
+        self._log_format, self._methods = _compiled_format
+
+    def compile_format(self, log_format: str) -> Tuple[str, List[KeyMethod]]:
+        """Translate log_format into form usable by modulo formatting
+
+        All known atoms will be replaced with %s
+        Also methods for formatting of those atoms will be added to
+        _methods in appropriate order
+
+        For example we have log_format = "%a %t"
+        This format will be translated to "%s %s"
+        Also contents of _methods will be
+        [self._format_a, self._format_t]
+        These method will be called and results will be passed
+        to translated string format.
+
+        Each _format_* method receive 'args' which is list of arguments
+        given to self.log
+
+        Exceptions are _format_e, _format_i and _format_o methods which
+        also receive key name (by functools.partial)
+
+        """
+        # list of (key, method) tuples, we don't use an OrderedDict as users
+        # can repeat the same key more than once
+        methods = list()
+
+        for atom in self.FORMAT_RE.findall(log_format):
+            if atom[1] == "":
+                format_key1 = self.LOG_FORMAT_MAP[atom[0]]
+                m = getattr(AccessLogger, "_format_%s" % atom[0])
+                key_method = KeyMethod(format_key1, m)
+            else:
+                format_key2 = (self.LOG_FORMAT_MAP[atom[2]], atom[1])
+                m = getattr(AccessLogger, "_format_%s" % atom[2])
+                key_method = KeyMethod(format_key2, functools.partial(m, atom[1]))
+
+            methods.append(key_method)
+
+        log_format = self.FORMAT_RE.sub(r"%s", log_format)
+        log_format = self.CLEANUP_RE.sub(r"%\1", log_format)
+        return log_format, methods
+
+    @staticmethod
+    def _format_i(
+        key: str, request: BaseRequest, response: StreamResponse, time: float
+    ) -> str:
+        if request is None:
+            return "(no headers)"
+
+        # suboptimal, make istr(key) once
+        return request.headers.get(key, "-")
+
+    @staticmethod
+    def _format_o(
+        key: str, request: BaseRequest, response: StreamResponse, time: float
+    ) -> str:
+        # suboptimal, make istr(key) once
+        return response.headers.get(key, "-")
+
+    @staticmethod
+    def _format_a(request: BaseRequest, response: StreamResponse, time: float) -> str:
+        if request is None:
+            return "-"
+        ip = request.remote
+        return ip if ip is not None else "-"
+
+    @staticmethod
+    def _format_t(request: BaseRequest, response: StreamResponse, time: float) -> str:
+        tz = datetime.timezone(datetime.timedelta(seconds=-time_mod.timezone))
+        now = datetime.datetime.now(tz)
+        start_time = now - datetime.timedelta(seconds=time)
+        return start_time.strftime("[%d/%b/%Y:%H:%M:%S %z]")
+
+    @staticmethod
+    def _format_P(request: BaseRequest, response: StreamResponse, time: float) -> str:
+        return "<%s>" % os.getpid()
+
+    @staticmethod
+    def _format_r(request: BaseRequest, response: StreamResponse, time: float) -> str:
+        if request is None:
+            return "-"
+        return "{} {} HTTP/{}.{}".format(
+            request.method,
+            request.path_qs,
+            request.version.major,
+            request.version.minor,
+        )
+
+    @staticmethod
+    def _format_s(request: BaseRequest, response: StreamResponse, time: float) -> int:
+        return response.status
+
+    @staticmethod
+    def _format_b(request: BaseRequest, response: StreamResponse, time: float) -> int:
+        return response.body_length
+
+    @staticmethod
+    def _format_T(request: BaseRequest, response: StreamResponse, time: float) -> str:
+        return str(round(time))
+
+    @staticmethod
+    def _format_Tf(request: BaseRequest, response: StreamResponse, time: float) -> str:
+        return "%06f" % time
+
+    @staticmethod
+    def _format_D(request: BaseRequest, response: StreamResponse, time: float) -> str:
+        return str(round(time * 1000000))
+
+    def _format_line(
+        self, request: BaseRequest, response: StreamResponse, time: float
+    ) -> Iterable[Tuple[str, Callable[[BaseRequest, StreamResponse, float], str]]]:
+        return [(key, method(request, response, time)) for key, method in self._methods]
+
+    @property
+    def enabled(self) -> bool:
+        """Check if logger is enabled."""
+        # Avoid formatting the log line if it will not be emitted.
+        return self.logger.isEnabledFor(logging.INFO)
+
+    def log(self, request: BaseRequest, response: StreamResponse, time: float) -> None:
+        try:
+            fmt_info = self._format_line(request, response, time)
+
+            values = list()
+            extra = dict()
+            for key, value in fmt_info:
+                values.append(value)
+
+                if key.__class__ is str:
+                    extra[key] = value
+                else:
+                    k1, k2 = key  # type: ignore[misc]
+                    dct = extra.get(k1, {})  # type: ignore[var-annotated,has-type]
+                    dct[k2] = value  # type: ignore[index,has-type]
+                    extra[k1] = dct  # type: ignore[has-type,assignment]
+
+            self.logger.info(self._log_format % tuple(values), extra=extra)
+        except Exception:
+            self.logger.exception("Error in logging")