about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py39
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py50
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py134
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py332
4 files changed, 555 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py
new file mode 100644
index 00000000..ee93dd18
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py
@@ -0,0 +1,39 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .exemplar import Exemplar
+from .exemplar_filter import (
+    AlwaysOffExemplarFilter,
+    AlwaysOnExemplarFilter,
+    ExemplarFilter,
+    TraceBasedExemplarFilter,
+)
+from .exemplar_reservoir import (
+    AlignedHistogramBucketExemplarReservoir,
+    ExemplarReservoir,
+    ExemplarReservoirBuilder,
+    SimpleFixedSizeExemplarReservoir,
+)
+
+__all__ = [
+    "Exemplar",
+    "ExemplarFilter",
+    "AlwaysOffExemplarFilter",
+    "AlwaysOnExemplarFilter",
+    "TraceBasedExemplarFilter",
+    "AlignedHistogramBucketExemplarReservoir",
+    "ExemplarReservoir",
+    "ExemplarReservoirBuilder",
+    "SimpleFixedSizeExemplarReservoir",
+]
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py
new file mode 100644
index 00000000..95582e16
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py
@@ -0,0 +1,50 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import dataclasses
+from typing import Optional, Union
+
+from opentelemetry.util.types import Attributes
+
+
+@dataclasses.dataclass(frozen=True)
+class Exemplar:
+    """A representation of an exemplar, which is a sample input measurement.
+
+    Exemplars also hold information about the environment when the measurement
+    was recorded, for example the span and trace ID of the active span when the
+    exemplar was recorded.
+
+    Attributes
+        trace_id: (optional) The trace associated with a recording
+        span_id: (optional) The span associated with a recording
+        time_unix_nano: The time of the observation
+        value: The recorded value
+        filtered_attributes: A set of filtered attributes which provide additional insight into the Context when the observation was made.
+
+    References:
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#exemplars
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar
+    """
+
+    # TODO Fix doc - if using valid Google `Attributes:` key, the attributes are duplicated
+    # one will come from napoleon extension and the other from autodoc extension. This
+    # will raise an sphinx error of duplicated object description
+    # See https://github.com/sphinx-doc/sphinx/issues/8664
+
+    filtered_attributes: Attributes
+    value: Union[int, float]
+    time_unix_nano: int
+    span_id: Optional[int] = None
+    trace_id: Optional[int] = None
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py
new file mode 100644
index 00000000..8961d101
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py
@@ -0,0 +1,134 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from typing import Union
+
+from opentelemetry import trace
+from opentelemetry.context import Context
+from opentelemetry.trace.span import INVALID_SPAN
+from opentelemetry.util.types import Attributes
+
+
+class ExemplarFilter(ABC):
+    """``ExemplarFilter`` determines which measurements are eligible for becoming an
+    ``Exemplar``.
+
+    Exemplar filters are used to filter measurements before attempting to store them
+    in a reservoir.
+
+    Reference:
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarfilter
+    """
+
+    @abstractmethod
+    def should_sample(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> bool:
+        """Returns whether or not a reservoir should attempt to filter a measurement.
+
+        Args:
+            value: The value of the measurement
+            timestamp: A timestamp that best represents when the measurement was taken
+            attributes: The complete set of measurement attributes
+            context: The Context of the measurement
+        """
+        raise NotImplementedError(
+            "ExemplarFilter.should_sample is not implemented"
+        )
+
+
+class AlwaysOnExemplarFilter(ExemplarFilter):
+    """An ExemplarFilter which makes all measurements eligible for being an Exemplar.
+
+    Reference:
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alwayson
+    """
+
+    def should_sample(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> bool:
+        """Returns whether or not a reservoir should attempt to filter a measurement.
+
+        Args:
+            value: The value of the measurement
+            timestamp: A timestamp that best represents when the measurement was taken
+            attributes: The complete set of measurement attributes
+            context: The Context of the measurement
+        """
+        return True
+
+
+class AlwaysOffExemplarFilter(ExemplarFilter):
+    """An ExemplarFilter which makes no measurements eligible for being an Exemplar.
+
+    Using this ExemplarFilter is as good as disabling Exemplar feature.
+
+    Reference:
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alwaysoff
+    """
+
+    def should_sample(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> bool:
+        """Returns whether or not a reservoir should attempt to filter a measurement.
+
+        Args:
+            value: The value of the measurement
+            timestamp: A timestamp that best represents when the measurement was taken
+            attributes: The complete set of measurement attributes
+            context: The Context of the measurement
+        """
+        return False
+
+
+class TraceBasedExemplarFilter(ExemplarFilter):
+    """An ExemplarFilter which makes those measurements eligible for being an Exemplar,
+    which are recorded in the context of a sampled parent span.
+
+    Reference:
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#tracebased
+    """
+
+    def should_sample(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> bool:
+        """Returns whether or not a reservoir should attempt to filter a measurement.
+
+        Args:
+            value: The value of the measurement
+            timestamp: A timestamp that best represents when the measurement was taken
+            attributes: The complete set of measurement attributes
+            context: The Context of the measurement
+        """
+        span = trace.get_current_span(context)
+        if span == INVALID_SPAN:
+            return False
+        return span.get_span_context().trace_flags.sampled
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py
new file mode 100644
index 00000000..22d1ee9f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py
@@ -0,0 +1,332 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from random import randrange
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Sequence,
+    Union,
+)
+
+from opentelemetry import trace
+from opentelemetry.context import Context
+from opentelemetry.trace.span import INVALID_SPAN
+from opentelemetry.util.types import Attributes
+
+from .exemplar import Exemplar
+
+
+class ExemplarReservoir(ABC):
+    """ExemplarReservoir provide a method to offer measurements to the reservoir
+    and another to collect accumulated Exemplars.
+
+    Note:
+        The constructor MUST accept ``**kwargs`` that may be set from aggregation
+        parameters.
+
+    Reference:
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarreservoir
+    """
+
+    @abstractmethod
+    def offer(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> None:
+        """Offers a measurement to be sampled.
+
+        Args:
+            value: Measured value
+            time_unix_nano: Measurement instant
+            attributes: Measurement attributes
+            context: Measurement context
+        """
+        raise NotImplementedError("ExemplarReservoir.offer is not implemented")
+
+    @abstractmethod
+    def collect(self, point_attributes: Attributes) -> List[Exemplar]:
+        """Returns accumulated Exemplars and also resets the reservoir for the next
+        sampling period
+
+        Args:
+            point_attributes: The attributes associated with metric point.
+
+        Returns:
+            a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned
+            exemplars contain the attributes that were filtered out by the aggregator,
+            but recorded alongside the original measurement.
+        """
+        raise NotImplementedError(
+            "ExemplarReservoir.collect is not implemented"
+        )
+
+
+class ExemplarBucket:
+    def __init__(self) -> None:
+        self.__value: Union[int, float] = 0
+        self.__attributes: Attributes = None
+        self.__time_unix_nano: int = 0
+        self.__span_id: Optional[int] = None
+        self.__trace_id: Optional[int] = None
+        self.__offered: bool = False
+
+    def offer(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> None:
+        """Offers a measurement to be sampled.
+
+        Args:
+            value: Measured value
+            time_unix_nano: Measurement instant
+            attributes: Measurement attributes
+            context: Measurement context
+        """
+        self.__value = value
+        self.__time_unix_nano = time_unix_nano
+        self.__attributes = attributes
+        span = trace.get_current_span(context)
+        if span != INVALID_SPAN:
+            span_context = span.get_span_context()
+            self.__span_id = span_context.span_id
+            self.__trace_id = span_context.trace_id
+
+        self.__offered = True
+
+    def collect(self, point_attributes: Attributes) -> Optional[Exemplar]:
+        """May return an Exemplar and resets the bucket for the next sampling period."""
+        if not self.__offered:
+            return None
+
+        # filters out attributes from the measurement that are already included in the metric data point
+        # See the specification for more details:
+        # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar
+        filtered_attributes = (
+            {
+                k: v
+                for k, v in self.__attributes.items()
+                if k not in point_attributes
+            }
+            if self.__attributes
+            else None
+        )
+
+        exemplar = Exemplar(
+            filtered_attributes,
+            self.__value,
+            self.__time_unix_nano,
+            self.__span_id,
+            self.__trace_id,
+        )
+        self.__reset()
+        return exemplar
+
+    def __reset(self) -> None:
+        """Reset the bucket state after a collection cycle."""
+        self.__value = 0
+        self.__attributes = {}
+        self.__time_unix_nano = 0
+        self.__span_id = None
+        self.__trace_id = None
+        self.__offered = False
+
+
+class BucketIndexError(ValueError):
+    """An exception raised when the bucket index cannot be found."""
+
+
+class FixedSizeExemplarReservoirABC(ExemplarReservoir):
+    """Abstract class for a reservoir with fixed size."""
+
+    def __init__(self, size: int, **kwargs) -> None:
+        super().__init__(**kwargs)
+        self._size: int = size
+        self._reservoir_storage: Mapping[int, ExemplarBucket] = defaultdict(
+            ExemplarBucket
+        )
+
+    def collect(self, point_attributes: Attributes) -> List[Exemplar]:
+        """Returns accumulated Exemplars and also resets the reservoir for the next
+        sampling period
+
+        Args:
+            point_attributes: The attributes associated with metric point.
+
+        Returns:
+            a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned
+            exemplars contain the attributes that were filtered out by the aggregator,
+            but recorded alongside the original measurement.
+        """
+        exemplars = [
+            e
+            for e in (
+                bucket.collect(point_attributes)
+                for _, bucket in sorted(self._reservoir_storage.items())
+            )
+            if e is not None
+        ]
+        self._reset()
+        return exemplars
+
+    def offer(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> None:
+        """Offers a measurement to be sampled.
+
+        Args:
+            value: Measured value
+            time_unix_nano: Measurement instant
+            attributes: Measurement attributes
+            context: Measurement context
+        """
+        try:
+            index = self._find_bucket_index(
+                value, time_unix_nano, attributes, context
+            )
+
+            self._reservoir_storage[index].offer(
+                value, time_unix_nano, attributes, context
+            )
+        except BucketIndexError:
+            # Ignore invalid bucket index
+            pass
+
+    @abstractmethod
+    def _find_bucket_index(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> int:
+        """Determines the bucket index for the given measurement.
+
+        It should be implemented by subclasses based on specific strategies.
+
+        Args:
+            value: Measured value
+            time_unix_nano: Measurement instant
+            attributes: Measurement attributes
+            context: Measurement context
+
+        Returns:
+            The bucket index
+
+        Raises:
+            BucketIndexError: If no bucket index can be found.
+        """
+
+    def _reset(self) -> None:
+        """Reset the reservoir by resetting any stateful logic after a collection cycle."""
+
+
+class SimpleFixedSizeExemplarReservoir(FixedSizeExemplarReservoirABC):
+    """This reservoir uses an uniformly-weighted sampling algorithm based on the number
+    of samples the reservoir has seen so far to determine if the offered measurements
+    should be sampled.
+
+    Reference:
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
+    """
+
+    def __init__(self, size: int = 1, **kwargs) -> None:
+        super().__init__(size, **kwargs)
+        self._measurements_seen: int = 0
+
+    def _reset(self) -> None:
+        super()._reset()
+        self._measurements_seen = 0
+
+    def _find_bucket_index(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> int:
+        self._measurements_seen += 1
+        if self._measurements_seen < self._size:
+            return self._measurements_seen - 1
+
+        index = randrange(0, self._measurements_seen)
+        if index < self._size:
+            return index
+
+        raise BucketIndexError("Unable to find the bucket index.")
+
+
+class AlignedHistogramBucketExemplarReservoir(FixedSizeExemplarReservoirABC):
+    """This Exemplar reservoir takes a configuration parameter that is the
+    configuration of a Histogram. This implementation keeps the last seen measurement
+    that falls within a histogram bucket.
+
+    Reference:
+        https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alignedhistogrambucketexemplarreservoir
+    """
+
+    def __init__(self, boundaries: Sequence[float], **kwargs) -> None:
+        super().__init__(len(boundaries) + 1, **kwargs)
+        self._boundaries: Sequence[float] = boundaries
+
+    def offer(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> None:
+        """Offers a measurement to be sampled."""
+        index = self._find_bucket_index(
+            value, time_unix_nano, attributes, context
+        )
+        self._reservoir_storage[index].offer(
+            value, time_unix_nano, attributes, context
+        )
+
+    def _find_bucket_index(
+        self,
+        value: Union[int, float],
+        time_unix_nano: int,
+        attributes: Attributes,
+        context: Context,
+    ) -> int:
+        for index, boundary in enumerate(self._boundaries):
+            if value <= boundary:
+                return index
+        return len(self._boundaries)
+
+
+ExemplarReservoirBuilder = Callable[[Dict[str, Any]], ExemplarReservoir]
+ExemplarReservoirBuilder.__doc__ = """ExemplarReservoir builder.
+
+It may receive the Aggregation parameters it is bounded to; e.g.
+the _ExplicitBucketHistogramAggregation will provide the boundaries.
+"""