1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
import datetime
import functools
import logging
import os
import re
import time as time_mod
from collections import namedtuple
from typing import Any, Callable, Dict, Iterable, List, Tuple # noqa
from .abc import AbstractAccessLogger
from .web_request import BaseRequest
from .web_response import StreamResponse
KeyMethod = namedtuple("KeyMethod", "key method")
class AccessLogger(AbstractAccessLogger):
"""Helper object to log access.
Usage:
log = logging.getLogger("spam")
log_format = "%a %{User-Agent}i"
access_logger = AccessLogger(log, log_format)
access_logger.log(request, response, time)
Format:
%% The percent sign
%a Remote IP-address (IP-address of proxy if using reverse proxy)
%t Time when the request was started to process
%P The process ID of the child that serviced the request
%r First line of request
%s Response status code
%b Size of response in bytes, including HTTP headers
%T Time taken to serve the request, in seconds
%Tf Time taken to serve the request, in seconds with floating fraction
in .06f format
%D Time taken to serve the request, in microseconds
%{FOO}i request.headers['FOO']
%{FOO}o response.headers['FOO']
%{FOO}e os.environ['FOO']
"""
LOG_FORMAT_MAP = {
"a": "remote_address",
"t": "request_start_time",
"P": "process_id",
"r": "first_request_line",
"s": "response_status",
"b": "response_size",
"T": "request_time",
"Tf": "request_time_frac",
"D": "request_time_micro",
"i": "request_header",
"o": "response_header",
}
LOG_FORMAT = '%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
FORMAT_RE = re.compile(r"%(\{([A-Za-z0-9\-_]+)\}([ioe])|[atPrsbOD]|Tf?)")
CLEANUP_RE = re.compile(r"(%[^s])")
_FORMAT_CACHE: Dict[str, Tuple[str, List[KeyMethod]]] = {}
def __init__(self, logger: logging.Logger, log_format: str = LOG_FORMAT) -> None:
"""Initialise the logger.
logger is a logger object to be used for logging.
log_format is a string with apache compatible log format description.
"""
super().__init__(logger, log_format=log_format)
_compiled_format = AccessLogger._FORMAT_CACHE.get(log_format)
if not _compiled_format:
_compiled_format = self.compile_format(log_format)
AccessLogger._FORMAT_CACHE[log_format] = _compiled_format
self._log_format, self._methods = _compiled_format
def compile_format(self, log_format: str) -> Tuple[str, List[KeyMethod]]:
"""Translate log_format into form usable by modulo formatting
All known atoms will be replaced with %s
Also methods for formatting of those atoms will be added to
_methods in appropriate order
For example we have log_format = "%a %t"
This format will be translated to "%s %s"
Also contents of _methods will be
[self._format_a, self._format_t]
These method will be called and results will be passed
to translated string format.
Each _format_* method receive 'args' which is list of arguments
given to self.log
Exceptions are _format_e, _format_i and _format_o methods which
also receive key name (by functools.partial)
"""
# list of (key, method) tuples, we don't use an OrderedDict as users
# can repeat the same key more than once
methods = list()
for atom in self.FORMAT_RE.findall(log_format):
if atom[1] == "":
format_key1 = self.LOG_FORMAT_MAP[atom[0]]
m = getattr(AccessLogger, "_format_%s" % atom[0])
key_method = KeyMethod(format_key1, m)
else:
format_key2 = (self.LOG_FORMAT_MAP[atom[2]], atom[1])
m = getattr(AccessLogger, "_format_%s" % atom[2])
key_method = KeyMethod(format_key2, functools.partial(m, atom[1]))
methods.append(key_method)
log_format = self.FORMAT_RE.sub(r"%s", log_format)
log_format = self.CLEANUP_RE.sub(r"%\1", log_format)
return log_format, methods
@staticmethod
def _format_i(
key: str, request: BaseRequest, response: StreamResponse, time: float
) -> str:
if request is None:
return "(no headers)"
# suboptimal, make istr(key) once
return request.headers.get(key, "-")
@staticmethod
def _format_o(
key: str, request: BaseRequest, response: StreamResponse, time: float
) -> str:
# suboptimal, make istr(key) once
return response.headers.get(key, "-")
@staticmethod
def _format_a(request: BaseRequest, response: StreamResponse, time: float) -> str:
if request is None:
return "-"
ip = request.remote
return ip if ip is not None else "-"
@staticmethod
def _format_t(request: BaseRequest, response: StreamResponse, time: float) -> str:
tz = datetime.timezone(datetime.timedelta(seconds=-time_mod.timezone))
now = datetime.datetime.now(tz)
start_time = now - datetime.timedelta(seconds=time)
return start_time.strftime("[%d/%b/%Y:%H:%M:%S %z]")
@staticmethod
def _format_P(request: BaseRequest, response: StreamResponse, time: float) -> str:
return "<%s>" % os.getpid()
@staticmethod
def _format_r(request: BaseRequest, response: StreamResponse, time: float) -> str:
if request is None:
return "-"
return "{} {} HTTP/{}.{}".format(
request.method,
request.path_qs,
request.version.major,
request.version.minor,
)
@staticmethod
def _format_s(request: BaseRequest, response: StreamResponse, time: float) -> int:
return response.status
@staticmethod
def _format_b(request: BaseRequest, response: StreamResponse, time: float) -> int:
return response.body_length
@staticmethod
def _format_T(request: BaseRequest, response: StreamResponse, time: float) -> str:
return str(round(time))
@staticmethod
def _format_Tf(request: BaseRequest, response: StreamResponse, time: float) -> str:
return "%06f" % time
@staticmethod
def _format_D(request: BaseRequest, response: StreamResponse, time: float) -> str:
return str(round(time * 1000000))
def _format_line(
self, request: BaseRequest, response: StreamResponse, time: float
) -> Iterable[Tuple[str, Callable[[BaseRequest, StreamResponse, float], str]]]:
return [(key, method(request, response, time)) for key, method in self._methods]
@property
def enabled(self) -> bool:
"""Check if logger is enabled."""
# Avoid formatting the log line if it will not be emitted.
return self.logger.isEnabledFor(logging.INFO)
def log(self, request: BaseRequest, response: StreamResponse, time: float) -> None:
try:
fmt_info = self._format_line(request, response, time)
values = list()
extra = dict()
for key, value in fmt_info:
values.append(value)
if key.__class__ is str:
extra[key] = value
else:
k1, k2 = key # type: ignore[misc]
dct = extra.get(k1, {}) # type: ignore[var-annotated,has-type]
dct[k2] = value # type: ignore[index,has-type]
extra[k1] = dct # type: ignore[has-type,assignment]
self.logger.info(self._log_format % tuple(values), extra=extra)
except Exception:
self.logger.exception("Error in logging")
|