aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py39
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py50
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py134
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py332
4 files changed, 555 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py
new file mode 100644
index 00000000..ee93dd18
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py
@@ -0,0 +1,39 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .exemplar import Exemplar
+from .exemplar_filter import (
+ AlwaysOffExemplarFilter,
+ AlwaysOnExemplarFilter,
+ ExemplarFilter,
+ TraceBasedExemplarFilter,
+)
+from .exemplar_reservoir import (
+ AlignedHistogramBucketExemplarReservoir,
+ ExemplarReservoir,
+ ExemplarReservoirBuilder,
+ SimpleFixedSizeExemplarReservoir,
+)
+
+__all__ = [
+ "Exemplar",
+ "ExemplarFilter",
+ "AlwaysOffExemplarFilter",
+ "AlwaysOnExemplarFilter",
+ "TraceBasedExemplarFilter",
+ "AlignedHistogramBucketExemplarReservoir",
+ "ExemplarReservoir",
+ "ExemplarReservoirBuilder",
+ "SimpleFixedSizeExemplarReservoir",
+]
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py
new file mode 100644
index 00000000..95582e16
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py
@@ -0,0 +1,50 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import dataclasses
+from typing import Optional, Union
+
+from opentelemetry.util.types import Attributes
+
+
+@dataclasses.dataclass(frozen=True)
+class Exemplar:
+ """A representation of an exemplar, which is a sample input measurement.
+
+ Exemplars also hold information about the environment when the measurement
+ was recorded, for example the span and trace ID of the active span when the
+ exemplar was recorded.
+
+ Attributes
+ trace_id: (optional) The trace associated with a recording
+ span_id: (optional) The span associated with a recording
+ time_unix_nano: The time of the observation
+ value: The recorded value
+ filtered_attributes: A set of filtered attributes which provide additional insight into the Context when the observation was made.
+
+ References:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#exemplars
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar
+ """
+
+ # TODO Fix doc - if using valid Google `Attributes:` key, the attributes are duplicated
+ # one will come from napoleon extension and the other from autodoc extension. This
+ # will raise an sphinx error of duplicated object description
+ # See https://github.com/sphinx-doc/sphinx/issues/8664
+
+ filtered_attributes: Attributes
+ value: Union[int, float]
+ time_unix_nano: int
+ span_id: Optional[int] = None
+ trace_id: Optional[int] = None
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py
new file mode 100644
index 00000000..8961d101
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py
@@ -0,0 +1,134 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from typing import Union
+
+from opentelemetry import trace
+from opentelemetry.context import Context
+from opentelemetry.trace.span import INVALID_SPAN
+from opentelemetry.util.types import Attributes
+
+
+class ExemplarFilter(ABC):
+ """``ExemplarFilter`` determines which measurements are eligible for becoming an
+ ``Exemplar``.
+
+ Exemplar filters are used to filter measurements before attempting to store them
+ in a reservoir.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarfilter
+ """
+
+ @abstractmethod
+ def should_sample(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> bool:
+ """Returns whether or not a reservoir should attempt to filter a measurement.
+
+ Args:
+ value: The value of the measurement
+ timestamp: A timestamp that best represents when the measurement was taken
+ attributes: The complete set of measurement attributes
+ context: The Context of the measurement
+ """
+ raise NotImplementedError(
+ "ExemplarFilter.should_sample is not implemented"
+ )
+
+
+class AlwaysOnExemplarFilter(ExemplarFilter):
+ """An ExemplarFilter which makes all measurements eligible for being an Exemplar.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alwayson
+ """
+
+ def should_sample(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> bool:
+ """Returns whether or not a reservoir should attempt to filter a measurement.
+
+ Args:
+ value: The value of the measurement
+ timestamp: A timestamp that best represents when the measurement was taken
+ attributes: The complete set of measurement attributes
+ context: The Context of the measurement
+ """
+ return True
+
+
+class AlwaysOffExemplarFilter(ExemplarFilter):
+ """An ExemplarFilter which makes no measurements eligible for being an Exemplar.
+
+ Using this ExemplarFilter is as good as disabling Exemplar feature.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alwaysoff
+ """
+
+ def should_sample(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> bool:
+ """Returns whether or not a reservoir should attempt to filter a measurement.
+
+ Args:
+ value: The value of the measurement
+ timestamp: A timestamp that best represents when the measurement was taken
+ attributes: The complete set of measurement attributes
+ context: The Context of the measurement
+ """
+ return False
+
+
+class TraceBasedExemplarFilter(ExemplarFilter):
+ """An ExemplarFilter which makes those measurements eligible for being an Exemplar,
+ which are recorded in the context of a sampled parent span.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#tracebased
+ """
+
+ def should_sample(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> bool:
+ """Returns whether or not a reservoir should attempt to filter a measurement.
+
+ Args:
+ value: The value of the measurement
+ timestamp: A timestamp that best represents when the measurement was taken
+ attributes: The complete set of measurement attributes
+ context: The Context of the measurement
+ """
+ span = trace.get_current_span(context)
+ if span == INVALID_SPAN:
+ return False
+ return span.get_span_context().trace_flags.sampled
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py
new file mode 100644
index 00000000..22d1ee9f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py
@@ -0,0 +1,332 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from random import randrange
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ Union,
+)
+
+from opentelemetry import trace
+from opentelemetry.context import Context
+from opentelemetry.trace.span import INVALID_SPAN
+from opentelemetry.util.types import Attributes
+
+from .exemplar import Exemplar
+
+
+class ExemplarReservoir(ABC):
+ """ExemplarReservoir provide a method to offer measurements to the reservoir
+ and another to collect accumulated Exemplars.
+
+ Note:
+ The constructor MUST accept ``**kwargs`` that may be set from aggregation
+ parameters.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarreservoir
+ """
+
+ @abstractmethod
+ def offer(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> None:
+ """Offers a measurement to be sampled.
+
+ Args:
+ value: Measured value
+ time_unix_nano: Measurement instant
+ attributes: Measurement attributes
+ context: Measurement context
+ """
+ raise NotImplementedError("ExemplarReservoir.offer is not implemented")
+
+ @abstractmethod
+ def collect(self, point_attributes: Attributes) -> List[Exemplar]:
+ """Returns accumulated Exemplars and also resets the reservoir for the next
+ sampling period
+
+ Args:
+ point_attributes: The attributes associated with metric point.
+
+ Returns:
+ a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned
+ exemplars contain the attributes that were filtered out by the aggregator,
+ but recorded alongside the original measurement.
+ """
+ raise NotImplementedError(
+ "ExemplarReservoir.collect is not implemented"
+ )
+
+
+class ExemplarBucket:
+ def __init__(self) -> None:
+ self.__value: Union[int, float] = 0
+ self.__attributes: Attributes = None
+ self.__time_unix_nano: int = 0
+ self.__span_id: Optional[int] = None
+ self.__trace_id: Optional[int] = None
+ self.__offered: bool = False
+
+ def offer(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> None:
+ """Offers a measurement to be sampled.
+
+ Args:
+ value: Measured value
+ time_unix_nano: Measurement instant
+ attributes: Measurement attributes
+ context: Measurement context
+ """
+ self.__value = value
+ self.__time_unix_nano = time_unix_nano
+ self.__attributes = attributes
+ span = trace.get_current_span(context)
+ if span != INVALID_SPAN:
+ span_context = span.get_span_context()
+ self.__span_id = span_context.span_id
+ self.__trace_id = span_context.trace_id
+
+ self.__offered = True
+
+ def collect(self, point_attributes: Attributes) -> Optional[Exemplar]:
+ """May return an Exemplar and resets the bucket for the next sampling period."""
+ if not self.__offered:
+ return None
+
+ # filters out attributes from the measurement that are already included in the metric data point
+ # See the specification for more details:
+ # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar
+ filtered_attributes = (
+ {
+ k: v
+ for k, v in self.__attributes.items()
+ if k not in point_attributes
+ }
+ if self.__attributes
+ else None
+ )
+
+ exemplar = Exemplar(
+ filtered_attributes,
+ self.__value,
+ self.__time_unix_nano,
+ self.__span_id,
+ self.__trace_id,
+ )
+ self.__reset()
+ return exemplar
+
+ def __reset(self) -> None:
+ """Reset the bucket state after a collection cycle."""
+ self.__value = 0
+ self.__attributes = {}
+ self.__time_unix_nano = 0
+ self.__span_id = None
+ self.__trace_id = None
+ self.__offered = False
+
+
+class BucketIndexError(ValueError):
+ """An exception raised when the bucket index cannot be found."""
+
+
+class FixedSizeExemplarReservoirABC(ExemplarReservoir):
+ """Abstract class for a reservoir with fixed size."""
+
+ def __init__(self, size: int, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self._size: int = size
+ self._reservoir_storage: Mapping[int, ExemplarBucket] = defaultdict(
+ ExemplarBucket
+ )
+
+ def collect(self, point_attributes: Attributes) -> List[Exemplar]:
+ """Returns accumulated Exemplars and also resets the reservoir for the next
+ sampling period
+
+ Args:
+ point_attributes: The attributes associated with metric point.
+
+ Returns:
+ a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned
+ exemplars contain the attributes that were filtered out by the aggregator,
+ but recorded alongside the original measurement.
+ """
+ exemplars = [
+ e
+ for e in (
+ bucket.collect(point_attributes)
+ for _, bucket in sorted(self._reservoir_storage.items())
+ )
+ if e is not None
+ ]
+ self._reset()
+ return exemplars
+
+ def offer(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> None:
+ """Offers a measurement to be sampled.
+
+ Args:
+ value: Measured value
+ time_unix_nano: Measurement instant
+ attributes: Measurement attributes
+ context: Measurement context
+ """
+ try:
+ index = self._find_bucket_index(
+ value, time_unix_nano, attributes, context
+ )
+
+ self._reservoir_storage[index].offer(
+ value, time_unix_nano, attributes, context
+ )
+ except BucketIndexError:
+ # Ignore invalid bucket index
+ pass
+
+ @abstractmethod
+ def _find_bucket_index(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> int:
+ """Determines the bucket index for the given measurement.
+
+ It should be implemented by subclasses based on specific strategies.
+
+ Args:
+ value: Measured value
+ time_unix_nano: Measurement instant
+ attributes: Measurement attributes
+ context: Measurement context
+
+ Returns:
+ The bucket index
+
+ Raises:
+ BucketIndexError: If no bucket index can be found.
+ """
+
+ def _reset(self) -> None:
+ """Reset the reservoir by resetting any stateful logic after a collection cycle."""
+
+
+class SimpleFixedSizeExemplarReservoir(FixedSizeExemplarReservoirABC):
+ """This reservoir uses an uniformly-weighted sampling algorithm based on the number
+ of samples the reservoir has seen so far to determine if the offered measurements
+ should be sampled.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
+ """
+
+ def __init__(self, size: int = 1, **kwargs) -> None:
+ super().__init__(size, **kwargs)
+ self._measurements_seen: int = 0
+
+ def _reset(self) -> None:
+ super()._reset()
+ self._measurements_seen = 0
+
+ def _find_bucket_index(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> int:
+ self._measurements_seen += 1
+ if self._measurements_seen < self._size:
+ return self._measurements_seen - 1
+
+ index = randrange(0, self._measurements_seen)
+ if index < self._size:
+ return index
+
+ raise BucketIndexError("Unable to find the bucket index.")
+
+
+class AlignedHistogramBucketExemplarReservoir(FixedSizeExemplarReservoirABC):
+ """This Exemplar reservoir takes a configuration parameter that is the
+ configuration of a Histogram. This implementation keeps the last seen measurement
+ that falls within a histogram bucket.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alignedhistogrambucketexemplarreservoir
+ """
+
+ def __init__(self, boundaries: Sequence[float], **kwargs) -> None:
+ super().__init__(len(boundaries) + 1, **kwargs)
+ self._boundaries: Sequence[float] = boundaries
+
+ def offer(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> None:
+ """Offers a measurement to be sampled."""
+ index = self._find_bucket_index(
+ value, time_unix_nano, attributes, context
+ )
+ self._reservoir_storage[index].offer(
+ value, time_unix_nano, attributes, context
+ )
+
+ def _find_bucket_index(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> int:
+ for index, boundary in enumerate(self._boundaries):
+ if value <= boundary:
+ return index
+ return len(self._boundaries)
+
+
+ExemplarReservoirBuilder = Callable[[Dict[str, Any]], ExemplarReservoir]
+ExemplarReservoirBuilder.__doc__ = """ExemplarReservoir builder.
+
+It may receive the Aggregation parameters it is bounded to; e.g.
+the _ExplicitBucketHistogramAggregation will provide the boundaries.
+"""