diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py new file mode 100644 index 00000000..71a6403a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py @@ -0,0 +1,257 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections.abc import Mapping +from os import environ +from re import IGNORECASE as RE_IGNORECASE +from re import compile as re_compile +from re import search +from typing import Callable, Iterable, overload +from urllib.parse import urlparse, urlunparse + +from opentelemetry.semconv.trace import SpanAttributes + +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS = ( + "OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS" +) +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST = ( + "OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST" +) +OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE = ( + "OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE" +) + +OTEL_PYTHON_INSTRUMENTATION_HTTP_CAPTURE_ALL_METHODS = ( + "OTEL_PYTHON_INSTRUMENTATION_HTTP_CAPTURE_ALL_METHODS" +) + +# List of recommended metrics attributes +_duration_attrs = { + SpanAttributes.HTTP_METHOD, + SpanAttributes.HTTP_HOST, + SpanAttributes.HTTP_SCHEME, + SpanAttributes.HTTP_STATUS_CODE, + SpanAttributes.HTTP_FLAVOR, + SpanAttributes.HTTP_SERVER_NAME, + SpanAttributes.NET_HOST_NAME, + SpanAttributes.NET_HOST_PORT, +} + +_active_requests_count_attrs = { + SpanAttributes.HTTP_METHOD, + SpanAttributes.HTTP_HOST, + SpanAttributes.HTTP_SCHEME, + SpanAttributes.HTTP_FLAVOR, + SpanAttributes.HTTP_SERVER_NAME, +} + + +class ExcludeList: + """Class to exclude certain paths (given as a list of regexes) from tracing requests""" + + def __init__(self, excluded_urls: Iterable[str]): + self._excluded_urls = excluded_urls + if self._excluded_urls: + self._regex = re_compile("|".join(excluded_urls)) + + def url_disabled(self, url: str) -> bool: + return bool(self._excluded_urls and search(self._regex, url)) + + +class SanitizeValue: + """Class to sanitize (remove sensitive data from) certain headers (given as a list of regexes)""" + + def __init__(self, sanitized_fields: Iterable[str]): + self._sanitized_fields = sanitized_fields + if self._sanitized_fields: + self._regex = re_compile("|".join(sanitized_fields), RE_IGNORECASE) + + def sanitize_header_value(self, header: str, value: str) -> str: + return ( + "[REDACTED]" + if (self._sanitized_fields and search(self._regex, header)) + else value + ) + + def sanitize_header_values( + self, + headers: Mapping[str, str | list[str]], + header_regexes: list[str], + normalize_function: Callable[[str], str], + ) -> dict[str, list[str]]: + values: dict[str, list[str]] = {} + + if header_regexes: + header_regexes_compiled = re_compile( + "|".join(header_regexes), + RE_IGNORECASE, + ) + + for header_name, header_value in headers.items(): + if header_regexes_compiled.fullmatch(header_name): + key = normalize_function(header_name.lower()) + if isinstance(header_value, str): + values[key] = [ + self.sanitize_header_value( + header_name, header_value + ) + ] + else: + values[key] = [ + self.sanitize_header_value(header_name, value) + for value in header_value + ] + + return values + + +_root = r"OTEL_PYTHON_{}" + + +def get_traced_request_attrs(instrumentation: str) -> list[str]: + traced_request_attrs = environ.get( + _root.format(f"{instrumentation}_TRACED_REQUEST_ATTRS") + ) + if traced_request_attrs: + return [ + traced_request_attr.strip() + for traced_request_attr in traced_request_attrs.split(",") + ] + return [] + + +def get_excluded_urls(instrumentation: str) -> ExcludeList: + # Get instrumentation-specific excluded URLs. If not set, retrieve them + # from generic variable. + excluded_urls = environ.get( + _root.format(f"{instrumentation}_EXCLUDED_URLS"), + environ.get(_root.format("EXCLUDED_URLS"), ""), + ) + + return parse_excluded_urls(excluded_urls) + + +def parse_excluded_urls(excluded_urls: str) -> ExcludeList: + """ + Small helper to put an arbitrary url list inside an ExcludeList + """ + if excluded_urls: + excluded_url_list = [ + excluded_url.strip() for excluded_url in excluded_urls.split(",") + ] + else: + excluded_url_list = [] + + return ExcludeList(excluded_url_list) + + +def remove_url_credentials(url: str) -> str: + """Given a string url, remove the username and password only if it is a valid url""" + + try: + parsed = urlparse(url) + if all([parsed.scheme, parsed.netloc]): # checks for valid url + parsed_url = urlparse(url) + _, _, netloc = parsed.netloc.rpartition("@") + return urlunparse( + ( + parsed_url.scheme, + netloc, + parsed_url.path, + parsed_url.params, + parsed_url.query, + parsed_url.fragment, + ) + ) + except ValueError: # an unparsable url was passed + pass + return url + + +def normalise_request_header_name(header: str) -> str: + key = header.lower().replace("-", "_") + return f"http.request.header.{key}" + + +def normalise_response_header_name(header: str) -> str: + key = header.lower().replace("-", "_") + return f"http.response.header.{key}" + + +@overload +def sanitize_method(method: str) -> str: ... + + +@overload +def sanitize_method(method: None) -> None: ... + + +def sanitize_method(method: str | None) -> str | None: + if method is None: + return None + method = method.upper() + if ( + environ.get(OTEL_PYTHON_INSTRUMENTATION_HTTP_CAPTURE_ALL_METHODS) + or + # Based on https://www.rfc-editor.org/rfc/rfc7231#section-4.1 and https://www.rfc-editor.org/rfc/rfc5789#section-2. + method + in [ + "GET", + "HEAD", + "POST", + "PUT", + "DELETE", + "CONNECT", + "OPTIONS", + "TRACE", + "PATCH", + ] + ): + return method + return "_OTHER" + + +def get_custom_headers(env_var: str) -> list[str]: + custom_headers = environ.get(env_var, None) + if custom_headers: + return [ + custom_headers.strip() + for custom_headers in custom_headers.split(",") + ] + return [] + + +def _parse_active_request_count_attrs(req_attrs): + active_requests_count_attrs = { + key: req_attrs[key] + for key in _active_requests_count_attrs.intersection(req_attrs.keys()) + } + return active_requests_count_attrs + + +def _parse_duration_attrs(req_attrs): + duration_attrs = { + key: req_attrs[key] + for key in _duration_attrs.intersection(req_attrs.keys()) + } + return duration_attrs + + +def _parse_url_query(url: str): + parsed_url = urlparse(url) + path = parsed_url.path + query_params = parsed_url.query + return path, query_params |