about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py257
1 files changed, 257 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py
new file mode 100644
index 00000000..71a6403a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/util/http/__init__.py
@@ -0,0 +1,257 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+from collections.abc import Mapping
+from os import environ
+from re import IGNORECASE as RE_IGNORECASE
+from re import compile as re_compile
+from re import search
+from typing import Callable, Iterable, overload
+from urllib.parse import urlparse, urlunparse
+
+from opentelemetry.semconv.trace import SpanAttributes
+
+OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS = (
+    "OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS"
+)
+OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST = (
+    "OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST"
+)
+OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE = (
+    "OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE"
+)
+
+OTEL_PYTHON_INSTRUMENTATION_HTTP_CAPTURE_ALL_METHODS = (
+    "OTEL_PYTHON_INSTRUMENTATION_HTTP_CAPTURE_ALL_METHODS"
+)
+
+# List of recommended metrics attributes
+_duration_attrs = {
+    SpanAttributes.HTTP_METHOD,
+    SpanAttributes.HTTP_HOST,
+    SpanAttributes.HTTP_SCHEME,
+    SpanAttributes.HTTP_STATUS_CODE,
+    SpanAttributes.HTTP_FLAVOR,
+    SpanAttributes.HTTP_SERVER_NAME,
+    SpanAttributes.NET_HOST_NAME,
+    SpanAttributes.NET_HOST_PORT,
+}
+
+_active_requests_count_attrs = {
+    SpanAttributes.HTTP_METHOD,
+    SpanAttributes.HTTP_HOST,
+    SpanAttributes.HTTP_SCHEME,
+    SpanAttributes.HTTP_FLAVOR,
+    SpanAttributes.HTTP_SERVER_NAME,
+}
+
+
+class ExcludeList:
+    """Class to exclude certain paths (given as a list of regexes) from tracing requests"""
+
+    def __init__(self, excluded_urls: Iterable[str]):
+        self._excluded_urls = excluded_urls
+        if self._excluded_urls:
+            self._regex = re_compile("|".join(excluded_urls))
+
+    def url_disabled(self, url: str) -> bool:
+        return bool(self._excluded_urls and search(self._regex, url))
+
+
+class SanitizeValue:
+    """Class to sanitize (remove sensitive data from) certain headers (given as a list of regexes)"""
+
+    def __init__(self, sanitized_fields: Iterable[str]):
+        self._sanitized_fields = sanitized_fields
+        if self._sanitized_fields:
+            self._regex = re_compile("|".join(sanitized_fields), RE_IGNORECASE)
+
+    def sanitize_header_value(self, header: str, value: str) -> str:
+        return (
+            "[REDACTED]"
+            if (self._sanitized_fields and search(self._regex, header))
+            else value
+        )
+
+    def sanitize_header_values(
+        self,
+        headers: Mapping[str, str | list[str]],
+        header_regexes: list[str],
+        normalize_function: Callable[[str], str],
+    ) -> dict[str, list[str]]:
+        values: dict[str, list[str]] = {}
+
+        if header_regexes:
+            header_regexes_compiled = re_compile(
+                "|".join(header_regexes),
+                RE_IGNORECASE,
+            )
+
+            for header_name, header_value in headers.items():
+                if header_regexes_compiled.fullmatch(header_name):
+                    key = normalize_function(header_name.lower())
+                    if isinstance(header_value, str):
+                        values[key] = [
+                            self.sanitize_header_value(
+                                header_name, header_value
+                            )
+                        ]
+                    else:
+                        values[key] = [
+                            self.sanitize_header_value(header_name, value)
+                            for value in header_value
+                        ]
+
+        return values
+
+
+_root = r"OTEL_PYTHON_{}"
+
+
+def get_traced_request_attrs(instrumentation: str) -> list[str]:
+    traced_request_attrs = environ.get(
+        _root.format(f"{instrumentation}_TRACED_REQUEST_ATTRS")
+    )
+    if traced_request_attrs:
+        return [
+            traced_request_attr.strip()
+            for traced_request_attr in traced_request_attrs.split(",")
+        ]
+    return []
+
+
+def get_excluded_urls(instrumentation: str) -> ExcludeList:
+    # Get instrumentation-specific excluded URLs. If not set, retrieve them
+    # from generic variable.
+    excluded_urls = environ.get(
+        _root.format(f"{instrumentation}_EXCLUDED_URLS"),
+        environ.get(_root.format("EXCLUDED_URLS"), ""),
+    )
+
+    return parse_excluded_urls(excluded_urls)
+
+
+def parse_excluded_urls(excluded_urls: str) -> ExcludeList:
+    """
+    Small helper to put an arbitrary url list inside an ExcludeList
+    """
+    if excluded_urls:
+        excluded_url_list = [
+            excluded_url.strip() for excluded_url in excluded_urls.split(",")
+        ]
+    else:
+        excluded_url_list = []
+
+    return ExcludeList(excluded_url_list)
+
+
+def remove_url_credentials(url: str) -> str:
+    """Given a string url, remove the username and password only if it is a valid url"""
+
+    try:
+        parsed = urlparse(url)
+        if all([parsed.scheme, parsed.netloc]):  # checks for valid url
+            parsed_url = urlparse(url)
+            _, _, netloc = parsed.netloc.rpartition("@")
+            return urlunparse(
+                (
+                    parsed_url.scheme,
+                    netloc,
+                    parsed_url.path,
+                    parsed_url.params,
+                    parsed_url.query,
+                    parsed_url.fragment,
+                )
+            )
+    except ValueError:  # an unparsable url was passed
+        pass
+    return url
+
+
+def normalise_request_header_name(header: str) -> str:
+    key = header.lower().replace("-", "_")
+    return f"http.request.header.{key}"
+
+
+def normalise_response_header_name(header: str) -> str:
+    key = header.lower().replace("-", "_")
+    return f"http.response.header.{key}"
+
+
+@overload
+def sanitize_method(method: str) -> str: ...
+
+
+@overload
+def sanitize_method(method: None) -> None: ...
+
+
+def sanitize_method(method: str | None) -> str | None:
+    if method is None:
+        return None
+    method = method.upper()
+    if (
+        environ.get(OTEL_PYTHON_INSTRUMENTATION_HTTP_CAPTURE_ALL_METHODS)
+        or
+        # Based on https://www.rfc-editor.org/rfc/rfc7231#section-4.1 and https://www.rfc-editor.org/rfc/rfc5789#section-2.
+        method
+        in [
+            "GET",
+            "HEAD",
+            "POST",
+            "PUT",
+            "DELETE",
+            "CONNECT",
+            "OPTIONS",
+            "TRACE",
+            "PATCH",
+        ]
+    ):
+        return method
+    return "_OTHER"
+
+
+def get_custom_headers(env_var: str) -> list[str]:
+    custom_headers = environ.get(env_var, None)
+    if custom_headers:
+        return [
+            custom_headers.strip()
+            for custom_headers in custom_headers.split(",")
+        ]
+    return []
+
+
+def _parse_active_request_count_attrs(req_attrs):
+    active_requests_count_attrs = {
+        key: req_attrs[key]
+        for key in _active_requests_count_attrs.intersection(req_attrs.keys())
+    }
+    return active_requests_count_attrs
+
+
+def _parse_duration_attrs(req_attrs):
+    duration_attrs = {
+        key: req_attrs[key]
+        for key in _duration_attrs.intersection(req_attrs.keys())
+    }
+    return duration_attrs
+
+
+def _parse_url_query(url: str):
+    parsed_url = urlparse(url)
+    path = parsed_url.path
+    query_params = parsed_url.query
+    return path, query_params