aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py332
1 files changed, 332 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py
new file mode 100644
index 00000000..22d1ee9f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py
@@ -0,0 +1,332 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from random import randrange
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ Union,
+)
+
+from opentelemetry import trace
+from opentelemetry.context import Context
+from opentelemetry.trace.span import INVALID_SPAN
+from opentelemetry.util.types import Attributes
+
+from .exemplar import Exemplar
+
+
+class ExemplarReservoir(ABC):
+ """ExemplarReservoir provide a method to offer measurements to the reservoir
+ and another to collect accumulated Exemplars.
+
+ Note:
+ The constructor MUST accept ``**kwargs`` that may be set from aggregation
+ parameters.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarreservoir
+ """
+
+ @abstractmethod
+ def offer(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> None:
+ """Offers a measurement to be sampled.
+
+ Args:
+ value: Measured value
+ time_unix_nano: Measurement instant
+ attributes: Measurement attributes
+ context: Measurement context
+ """
+ raise NotImplementedError("ExemplarReservoir.offer is not implemented")
+
+ @abstractmethod
+ def collect(self, point_attributes: Attributes) -> List[Exemplar]:
+ """Returns accumulated Exemplars and also resets the reservoir for the next
+ sampling period
+
+ Args:
+ point_attributes: The attributes associated with metric point.
+
+ Returns:
+ a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned
+ exemplars contain the attributes that were filtered out by the aggregator,
+ but recorded alongside the original measurement.
+ """
+ raise NotImplementedError(
+ "ExemplarReservoir.collect is not implemented"
+ )
+
+
+class ExemplarBucket:
+ def __init__(self) -> None:
+ self.__value: Union[int, float] = 0
+ self.__attributes: Attributes = None
+ self.__time_unix_nano: int = 0
+ self.__span_id: Optional[int] = None
+ self.__trace_id: Optional[int] = None
+ self.__offered: bool = False
+
+ def offer(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> None:
+ """Offers a measurement to be sampled.
+
+ Args:
+ value: Measured value
+ time_unix_nano: Measurement instant
+ attributes: Measurement attributes
+ context: Measurement context
+ """
+ self.__value = value
+ self.__time_unix_nano = time_unix_nano
+ self.__attributes = attributes
+ span = trace.get_current_span(context)
+ if span != INVALID_SPAN:
+ span_context = span.get_span_context()
+ self.__span_id = span_context.span_id
+ self.__trace_id = span_context.trace_id
+
+ self.__offered = True
+
+ def collect(self, point_attributes: Attributes) -> Optional[Exemplar]:
+ """May return an Exemplar and resets the bucket for the next sampling period."""
+ if not self.__offered:
+ return None
+
+ # filters out attributes from the measurement that are already included in the metric data point
+ # See the specification for more details:
+ # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar
+ filtered_attributes = (
+ {
+ k: v
+ for k, v in self.__attributes.items()
+ if k not in point_attributes
+ }
+ if self.__attributes
+ else None
+ )
+
+ exemplar = Exemplar(
+ filtered_attributes,
+ self.__value,
+ self.__time_unix_nano,
+ self.__span_id,
+ self.__trace_id,
+ )
+ self.__reset()
+ return exemplar
+
+ def __reset(self) -> None:
+ """Reset the bucket state after a collection cycle."""
+ self.__value = 0
+ self.__attributes = {}
+ self.__time_unix_nano = 0
+ self.__span_id = None
+ self.__trace_id = None
+ self.__offered = False
+
+
+class BucketIndexError(ValueError):
+ """An exception raised when the bucket index cannot be found."""
+
+
+class FixedSizeExemplarReservoirABC(ExemplarReservoir):
+ """Abstract class for a reservoir with fixed size."""
+
+ def __init__(self, size: int, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self._size: int = size
+ self._reservoir_storage: Mapping[int, ExemplarBucket] = defaultdict(
+ ExemplarBucket
+ )
+
+ def collect(self, point_attributes: Attributes) -> List[Exemplar]:
+ """Returns accumulated Exemplars and also resets the reservoir for the next
+ sampling period
+
+ Args:
+ point_attributes: The attributes associated with metric point.
+
+ Returns:
+ a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned
+ exemplars contain the attributes that were filtered out by the aggregator,
+ but recorded alongside the original measurement.
+ """
+ exemplars = [
+ e
+ for e in (
+ bucket.collect(point_attributes)
+ for _, bucket in sorted(self._reservoir_storage.items())
+ )
+ if e is not None
+ ]
+ self._reset()
+ return exemplars
+
+ def offer(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> None:
+ """Offers a measurement to be sampled.
+
+ Args:
+ value: Measured value
+ time_unix_nano: Measurement instant
+ attributes: Measurement attributes
+ context: Measurement context
+ """
+ try:
+ index = self._find_bucket_index(
+ value, time_unix_nano, attributes, context
+ )
+
+ self._reservoir_storage[index].offer(
+ value, time_unix_nano, attributes, context
+ )
+ except BucketIndexError:
+ # Ignore invalid bucket index
+ pass
+
+ @abstractmethod
+ def _find_bucket_index(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> int:
+ """Determines the bucket index for the given measurement.
+
+ It should be implemented by subclasses based on specific strategies.
+
+ Args:
+ value: Measured value
+ time_unix_nano: Measurement instant
+ attributes: Measurement attributes
+ context: Measurement context
+
+ Returns:
+ The bucket index
+
+ Raises:
+ BucketIndexError: If no bucket index can be found.
+ """
+
+ def _reset(self) -> None:
+ """Reset the reservoir by resetting any stateful logic after a collection cycle."""
+
+
+class SimpleFixedSizeExemplarReservoir(FixedSizeExemplarReservoirABC):
+ """This reservoir uses an uniformly-weighted sampling algorithm based on the number
+ of samples the reservoir has seen so far to determine if the offered measurements
+ should be sampled.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
+ """
+
+ def __init__(self, size: int = 1, **kwargs) -> None:
+ super().__init__(size, **kwargs)
+ self._measurements_seen: int = 0
+
+ def _reset(self) -> None:
+ super()._reset()
+ self._measurements_seen = 0
+
+ def _find_bucket_index(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> int:
+ self._measurements_seen += 1
+ if self._measurements_seen < self._size:
+ return self._measurements_seen - 1
+
+ index = randrange(0, self._measurements_seen)
+ if index < self._size:
+ return index
+
+ raise BucketIndexError("Unable to find the bucket index.")
+
+
+class AlignedHistogramBucketExemplarReservoir(FixedSizeExemplarReservoirABC):
+ """This Exemplar reservoir takes a configuration parameter that is the
+ configuration of a Histogram. This implementation keeps the last seen measurement
+ that falls within a histogram bucket.
+
+ Reference:
+ https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alignedhistogrambucketexemplarreservoir
+ """
+
+ def __init__(self, boundaries: Sequence[float], **kwargs) -> None:
+ super().__init__(len(boundaries) + 1, **kwargs)
+ self._boundaries: Sequence[float] = boundaries
+
+ def offer(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> None:
+ """Offers a measurement to be sampled."""
+ index = self._find_bucket_index(
+ value, time_unix_nano, attributes, context
+ )
+ self._reservoir_storage[index].offer(
+ value, time_unix_nano, attributes, context
+ )
+
+ def _find_bucket_index(
+ self,
+ value: Union[int, float],
+ time_unix_nano: int,
+ attributes: Attributes,
+ context: Context,
+ ) -> int:
+ for index, boundary in enumerate(self._boundaries):
+ if value <= boundary:
+ return index
+ return len(self._boundaries)
+
+
+ExemplarReservoirBuilder = Callable[[Dict[str, Any]], ExemplarReservoir]
+ExemplarReservoirBuilder.__doc__ = """ExemplarReservoir builder.
+
+It may receive the Aggregation parameters it is bounded to; e.g.
+the _ExplicitBucketHistogramAggregation will provide the boundaries.
+"""