aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py1475
1 files changed, 1475 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py
new file mode 100644
index 00000000..8443d951
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py
@@ -0,0 +1,1475 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=too-many-lines
+
+from abc import ABC, abstractmethod
+from bisect import bisect_left
+from enum import IntEnum
+from functools import partial
+from logging import getLogger
+from math import inf
+from threading import Lock
+from typing import (
+ Callable,
+ Generic,
+ List,
+ Optional,
+ Sequence,
+ Type,
+ TypeVar,
+)
+
+from opentelemetry.metrics import (
+ Asynchronous,
+ Counter,
+ Histogram,
+ Instrument,
+ ObservableCounter,
+ ObservableGauge,
+ ObservableUpDownCounter,
+ Synchronous,
+ UpDownCounter,
+ _Gauge,
+)
+from opentelemetry.sdk.metrics._internal.exemplar import (
+ Exemplar,
+ ExemplarReservoirBuilder,
+)
+from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import (
+ Buckets,
+)
+from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import (
+ Mapping,
+)
+from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import (
+ ExponentMapping,
+)
+from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.logarithm_mapping import (
+ LogarithmMapping,
+)
+from opentelemetry.sdk.metrics._internal.measurement import Measurement
+from opentelemetry.sdk.metrics._internal.point import Buckets as BucketsPoint
+from opentelemetry.sdk.metrics._internal.point import (
+ ExponentialHistogramDataPoint,
+ HistogramDataPoint,
+ NumberDataPoint,
+ Sum,
+)
+from opentelemetry.sdk.metrics._internal.point import Gauge as GaugePoint
+from opentelemetry.sdk.metrics._internal.point import (
+ Histogram as HistogramPoint,
+)
+from opentelemetry.util.types import Attributes
+
+_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint)
+
+_logger = getLogger(__name__)
+
+
+class AggregationTemporality(IntEnum):
+ """
+ The temporality to use when aggregating data.
+
+ Can be one of the following values:
+ """
+
+ UNSPECIFIED = 0
+ DELTA = 1
+ CUMULATIVE = 2
+
+
+class _Aggregation(ABC, Generic[_DataPointVarT]):
+ def __init__(
+ self,
+ attributes: Attributes,
+ reservoir_builder: ExemplarReservoirBuilder,
+ ):
+ self._lock = Lock()
+ self._attributes = attributes
+ self._reservoir = reservoir_builder()
+ self._previous_point = None
+
+ @abstractmethod
+ def aggregate(
+ self, measurement: Measurement, should_sample_exemplar: bool = True
+ ) -> None:
+ """Aggregate a measurement.
+
+ Args:
+ measurement: Measurement to aggregate
+ should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not.
+ """
+
+ @abstractmethod
+ def collect(
+ self,
+ collection_aggregation_temporality: AggregationTemporality,
+ collection_start_nano: int,
+ ) -> Optional[_DataPointVarT]:
+ pass
+
+ def _collect_exemplars(self) -> Sequence[Exemplar]:
+ """Returns the collected exemplars.
+
+ Returns:
+ The exemplars collected by the reservoir
+ """
+ return self._reservoir.collect(self._attributes)
+
+ def _sample_exemplar(
+ self, measurement: Measurement, should_sample_exemplar: bool
+ ) -> None:
+ """Offer the measurement to the exemplar reservoir for sampling.
+
+ It should be called within the each :ref:`aggregate` call.
+
+ Args:
+ measurement: The new measurement
+ should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not.
+ """
+ if should_sample_exemplar:
+ self._reservoir.offer(
+ measurement.value,
+ measurement.time_unix_nano,
+ measurement.attributes,
+ measurement.context,
+ )
+
+
+class _DropAggregation(_Aggregation):
+ def aggregate(
+ self, measurement: Measurement, should_sample_exemplar: bool = True
+ ) -> None:
+ pass
+
+ def collect(
+ self,
+ collection_aggregation_temporality: AggregationTemporality,
+ collection_start_nano: int,
+ ) -> Optional[_DataPointVarT]:
+ pass
+
+
+class _SumAggregation(_Aggregation[Sum]):
+ def __init__(
+ self,
+ attributes: Attributes,
+ instrument_is_monotonic: bool,
+ instrument_aggregation_temporality: AggregationTemporality,
+ start_time_unix_nano: int,
+ reservoir_builder: ExemplarReservoirBuilder,
+ ):
+ super().__init__(attributes, reservoir_builder)
+
+ self._start_time_unix_nano = start_time_unix_nano
+ self._instrument_aggregation_temporality = (
+ instrument_aggregation_temporality
+ )
+ self._instrument_is_monotonic = instrument_is_monotonic
+
+ self._value = None
+
+ self._previous_collection_start_nano = self._start_time_unix_nano
+ self._previous_value = 0
+
+ def aggregate(
+ self, measurement: Measurement, should_sample_exemplar: bool = True
+ ) -> None:
+ with self._lock:
+ if self._value is None:
+ self._value = 0
+
+ self._value = self._value + measurement.value
+
+ self._sample_exemplar(measurement, should_sample_exemplar)
+
+ def collect(
+ self,
+ collection_aggregation_temporality: AggregationTemporality,
+ collection_start_nano: int,
+ ) -> Optional[NumberDataPoint]:
+ """
+ Atomically return a point for the current value of the metric and
+ reset the aggregation value.
+
+ Synchronous instruments have a method which is called directly with
+ increments for a given quantity:
+
+ For example, an instrument that counts the amount of passengers in
+ every vehicle that crosses a certain point in a highway:
+
+ synchronous_instrument.add(2)
+ collect(...) # 2 passengers are counted
+ synchronous_instrument.add(3)
+ collect(...) # 3 passengers are counted
+ synchronous_instrument.add(1)
+ collect(...) # 1 passenger is counted
+
+ In this case the instrument aggregation temporality is DELTA because
+ every value represents an increment to the count,
+
+ Asynchronous instruments have a callback which returns the total value
+ of a given quantity:
+
+ For example, an instrument that measures the amount of bytes written to
+ a certain hard drive:
+
+ callback() -> 1352
+ collect(...) # 1352 bytes have been written so far
+ callback() -> 2324
+ collect(...) # 2324 bytes have been written so far
+ callback() -> 4542
+ collect(...) # 4542 bytes have been written so far
+
+ In this case the instrument aggregation temporality is CUMULATIVE
+ because every value represents the total of the measurement.
+
+ There is also the collection aggregation temporality, which is passed
+ to this method. The collection aggregation temporality defines the
+ nature of the returned value by this aggregation.
+
+ When the collection aggregation temporality matches the
+ instrument aggregation temporality, then this method returns the
+ current value directly:
+
+ synchronous_instrument.add(2)
+ collect(DELTA) -> 2
+ synchronous_instrument.add(3)
+ collect(DELTA) -> 3
+ synchronous_instrument.add(1)
+ collect(DELTA) -> 1
+
+ callback() -> 1352
+ collect(CUMULATIVE) -> 1352
+ callback() -> 2324
+ collect(CUMULATIVE) -> 2324
+ callback() -> 4542
+ collect(CUMULATIVE) -> 4542
+
+ When the collection aggregation temporality does not match the
+ instrument aggregation temporality, then a conversion is made. For this
+ purpose, this aggregation keeps a private attribute,
+ self._previous_value.
+
+ When the instrument is synchronous:
+
+ self._previous_value is the sum of every previously
+ collected (delta) value. In this case, the returned (cumulative) value
+ will be:
+
+ self._previous_value + value
+
+ synchronous_instrument.add(2)
+ collect(CUMULATIVE) -> 2
+ synchronous_instrument.add(3)
+ collect(CUMULATIVE) -> 5
+ synchronous_instrument.add(1)
+ collect(CUMULATIVE) -> 6
+
+ Also, as a diagram:
+
+ time ->
+
+ self._previous_value
+ |-------------|
+
+ value (delta)
+ |----|
+
+ returned value (cumulative)
+ |------------------|
+
+ When the instrument is asynchronous:
+
+ self._previous_value is the value of the previously
+ collected (cumulative) value. In this case, the returned (delta) value
+ will be:
+
+ value - self._previous_value
+
+ callback() -> 1352
+ collect(DELTA) -> 1352
+ callback() -> 2324
+ collect(DELTA) -> 972
+ callback() -> 4542
+ collect(DELTA) -> 2218
+
+ Also, as a diagram:
+
+ time ->
+
+ self._previous_value
+ |-------------|
+
+ value (cumulative)
+ |------------------|
+
+ returned value (delta)
+ |----|
+ """
+
+ with self._lock:
+ value = self._value
+ self._value = None
+
+ if (
+ self._instrument_aggregation_temporality
+ is AggregationTemporality.DELTA
+ ):
+ # This happens when the corresponding instrument for this
+ # aggregation is synchronous.
+ if (
+ collection_aggregation_temporality
+ is AggregationTemporality.DELTA
+ ):
+ previous_collection_start_nano = (
+ self._previous_collection_start_nano
+ )
+ self._previous_collection_start_nano = (
+ collection_start_nano
+ )
+
+ if value is None:
+ return None
+
+ return NumberDataPoint(
+ attributes=self._attributes,
+ exemplars=self._collect_exemplars(),
+ start_time_unix_nano=previous_collection_start_nano,
+ time_unix_nano=collection_start_nano,
+ value=value,
+ )
+
+ if value is None:
+ value = 0
+
+ self._previous_value = value + self._previous_value
+
+ return NumberDataPoint(
+ attributes=self._attributes,
+ exemplars=self._collect_exemplars(),
+ start_time_unix_nano=self._start_time_unix_nano,
+ time_unix_nano=collection_start_nano,
+ value=self._previous_value,
+ )
+
+ # This happens when the corresponding instrument for this
+ # aggregation is asynchronous.
+
+ if value is None:
+ # This happens when the corresponding instrument callback
+ # does not produce measurements.
+ return None
+
+ if (
+ collection_aggregation_temporality
+ is AggregationTemporality.DELTA
+ ):
+ result_value = value - self._previous_value
+
+ self._previous_value = value
+
+ previous_collection_start_nano = (
+ self._previous_collection_start_nano
+ )
+ self._previous_collection_start_nano = collection_start_nano
+
+ return NumberDataPoint(
+ attributes=self._attributes,
+ exemplars=self._collect_exemplars(),
+ start_time_unix_nano=previous_collection_start_nano,
+ time_unix_nano=collection_start_nano,
+ value=result_value,
+ )
+
+ return NumberDataPoint(
+ attributes=self._attributes,
+ exemplars=self._collect_exemplars(),
+ start_time_unix_nano=self._start_time_unix_nano,
+ time_unix_nano=collection_start_nano,
+ value=value,
+ )
+
+
+class _LastValueAggregation(_Aggregation[GaugePoint]):
+ def __init__(
+ self,
+ attributes: Attributes,
+ reservoir_builder: ExemplarReservoirBuilder,
+ ):
+ super().__init__(attributes, reservoir_builder)
+ self._value = None
+
+ def aggregate(
+ self, measurement: Measurement, should_sample_exemplar: bool = True
+ ):
+ with self._lock:
+ self._value = measurement.value
+
+ self._sample_exemplar(measurement, should_sample_exemplar)
+
+ def collect(
+ self,
+ collection_aggregation_temporality: AggregationTemporality,
+ collection_start_nano: int,
+ ) -> Optional[_DataPointVarT]:
+ """
+ Atomically return a point for the current value of the metric.
+ """
+ with self._lock:
+ if self._value is None:
+ return None
+ value = self._value
+ self._value = None
+
+ exemplars = self._collect_exemplars()
+
+ return NumberDataPoint(
+ attributes=self._attributes,
+ exemplars=exemplars,
+ start_time_unix_nano=None,
+ time_unix_nano=collection_start_nano,
+ value=value,
+ )
+
+
+_DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES: Sequence[float] = (
+ 0.0,
+ 5.0,
+ 10.0,
+ 25.0,
+ 50.0,
+ 75.0,
+ 100.0,
+ 250.0,
+ 500.0,
+ 750.0,
+ 1000.0,
+ 2500.0,
+ 5000.0,
+ 7500.0,
+ 10000.0,
+)
+
+
+class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
+ def __init__(
+ self,
+ attributes: Attributes,
+ instrument_aggregation_temporality: AggregationTemporality,
+ start_time_unix_nano: int,
+ reservoir_builder: ExemplarReservoirBuilder,
+ boundaries: Optional[Sequence[float]] = None,
+ record_min_max: bool = True,
+ ):
+ if boundaries is None:
+ boundaries = (
+ _DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES
+ )
+ super().__init__(
+ attributes,
+ reservoir_builder=partial(
+ reservoir_builder, boundaries=boundaries
+ ),
+ )
+
+ self._instrument_aggregation_temporality = (
+ instrument_aggregation_temporality
+ )
+ self._start_time_unix_nano = start_time_unix_nano
+ self._boundaries = tuple(boundaries)
+ self._record_min_max = record_min_max
+
+ self._value = None
+ self._min = inf
+ self._max = -inf
+ self._sum = 0
+
+ self._previous_value = None
+ self._previous_min = inf
+ self._previous_max = -inf
+ self._previous_sum = 0
+
+ self._previous_collection_start_nano = self._start_time_unix_nano
+
+ def _get_empty_bucket_counts(self) -> List[int]:
+ return [0] * (len(self._boundaries) + 1)
+
+ def aggregate(
+ self, measurement: Measurement, should_sample_exemplar: bool = True
+ ) -> None:
+ with self._lock:
+ if self._value is None:
+ self._value = self._get_empty_bucket_counts()
+
+ measurement_value = measurement.value
+
+ self._sum += measurement_value
+
+ if self._record_min_max:
+ self._min = min(self._min, measurement_value)
+ self._max = max(self._max, measurement_value)
+
+ self._value[bisect_left(self._boundaries, measurement_value)] += 1
+
+ self._sample_exemplar(measurement, should_sample_exemplar)
+
+ def collect(
+ self,
+ collection_aggregation_temporality: AggregationTemporality,
+ collection_start_nano: int,
+ ) -> Optional[_DataPointVarT]:
+ """
+ Atomically return a point for the current value of the metric.
+ """
+
+ with self._lock:
+ value = self._value
+ sum_ = self._sum
+ min_ = self._min
+ max_ = self._max
+
+ self._value = None
+ self._sum = 0
+ self._min = inf
+ self._max = -inf
+
+ if (
+ self._instrument_aggregation_temporality
+ is AggregationTemporality.DELTA
+ ):
+ # This happens when the corresponding instrument for this
+ # aggregation is synchronous.
+ if (
+ collection_aggregation_temporality
+ is AggregationTemporality.DELTA
+ ):
+ previous_collection_start_nano = (
+ self._previous_collection_start_nano
+ )
+ self._previous_collection_start_nano = (
+ collection_start_nano
+ )
+
+ if value is None:
+ return None
+
+ return HistogramDataPoint(
+ attributes=self._attributes,
+ exemplars=self._collect_exemplars(),
+ start_time_unix_nano=previous_collection_start_nano,
+ time_unix_nano=collection_start_nano,
+ count=sum(value),
+ sum=sum_,
+ bucket_counts=tuple(value),
+ explicit_bounds=self._boundaries,
+ min=min_,
+ max=max_,
+ )
+
+ if value is None:
+ value = self._get_empty_bucket_counts()
+
+ if self._previous_value is None:
+ self._previous_value = self._get_empty_bucket_counts()
+
+ self._previous_value = [
+ value_element + previous_value_element
+ for (
+ value_element,
+ previous_value_element,
+ ) in zip(value, self._previous_value)
+ ]
+ self._previous_min = min(min_, self._previous_min)
+ self._previous_max = max(max_, self._previous_max)
+ self._previous_sum = sum_ + self._previous_sum
+
+ return HistogramDataPoint(
+ attributes=self._attributes,
+ exemplars=self._collect_exemplars(),
+ start_time_unix_nano=self._start_time_unix_nano,
+ time_unix_nano=collection_start_nano,
+ count=sum(self._previous_value),
+ sum=self._previous_sum,
+ bucket_counts=tuple(self._previous_value),
+ explicit_bounds=self._boundaries,
+ min=self._previous_min,
+ max=self._previous_max,
+ )
+
+ return None
+
+
+# pylint: disable=protected-access
+class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]):
+ # _min_max_size and _max_max_size are the smallest and largest values
+ # the max_size parameter may have, respectively.
+
+ # _min_max_size is is the smallest reasonable value which is small enough
+ # to contain the entire normal floating point range at the minimum scale.
+ _min_max_size = 2
+
+ # _max_max_size is an arbitrary limit meant to limit accidental creation of
+ # giant exponential bucket histograms.
+ _max_max_size = 16384
+
+ def __init__(
+ self,
+ attributes: Attributes,
+ reservoir_builder: ExemplarReservoirBuilder,
+ instrument_aggregation_temporality: AggregationTemporality,
+ start_time_unix_nano: int,
+ # This is the default maximum number of buckets per positive or
+ # negative number range. The value 160 is specified by OpenTelemetry.
+ # See the derivation here:
+ # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-bucket-histogram-aggregation)
+ max_size: int = 160,
+ max_scale: int = 20,
+ ):
+ # max_size is the maximum capacity of the positive and negative
+ # buckets.
+ # _sum is the sum of all the values aggregated by this aggregator.
+ # _count is the count of all calls to aggregate.
+ # _zero_count is the count of all the calls to aggregate when the value
+ # to be aggregated is exactly 0.
+ # _min is the smallest value aggregated by this aggregator.
+ # _max is the smallest value aggregated by this aggregator.
+ # _positive holds the positive values.
+ # _negative holds the negative values by their absolute value.
+ if max_size < self._min_max_size:
+ raise ValueError(
+ f"Buckets max size {max_size} is smaller than "
+ "minimum max size {self._min_max_size}"
+ )
+
+ if max_size > self._max_max_size:
+ raise ValueError(
+ f"Buckets max size {max_size} is larger than "
+ "maximum max size {self._max_max_size}"
+ )
+ if max_scale > 20:
+ _logger.warning(
+ "max_scale is set to %s which is "
+ "larger than the recommended value of 20",
+ max_scale,
+ )
+
+ # This aggregation is analogous to _ExplicitBucketHistogramAggregation,
+ # the only difference is that with every call to aggregate, the size
+ # and amount of buckets can change (in
+ # _ExplicitBucketHistogramAggregation both size and amount of buckets
+ # remain constant once it is instantiated).
+
+ super().__init__(
+ attributes,
+ reservoir_builder=partial(
+ reservoir_builder, size=min(20, max_size)
+ ),
+ )
+
+ self._instrument_aggregation_temporality = (
+ instrument_aggregation_temporality
+ )
+ self._start_time_unix_nano = start_time_unix_nano
+ self._max_size = max_size
+ self._max_scale = max_scale
+
+ self._value_positive = None
+ self._value_negative = None
+ self._min = inf
+ self._max = -inf
+ self._sum = 0
+ self._count = 0
+ self._zero_count = 0
+ self._scale = None
+
+ self._previous_value_positive = None
+ self._previous_value_negative = None
+ self._previous_min = inf
+ self._previous_max = -inf
+ self._previous_sum = 0
+ self._previous_count = 0
+ self._previous_zero_count = 0
+ self._previous_scale = None
+
+ self._previous_collection_start_nano = self._start_time_unix_nano
+
+ self._mapping = self._new_mapping(self._max_scale)
+
+ def aggregate(
+ self, measurement: Measurement, should_sample_exemplar: bool = True
+ ) -> None:
+ # pylint: disable=too-many-branches,too-many-statements, too-many-locals
+
+ with self._lock:
+ if self._value_positive is None:
+ self._value_positive = Buckets()
+ if self._value_negative is None:
+ self._value_negative = Buckets()
+
+ measurement_value = measurement.value
+
+ self._sum += measurement_value
+
+ self._min = min(self._min, measurement_value)
+ self._max = max(self._max, measurement_value)
+
+ self._count += 1
+
+ if measurement_value == 0:
+ self._zero_count += 1
+
+ if self._count == self._zero_count:
+ self._scale = 0
+
+ return
+
+ if measurement_value > 0:
+ value = self._value_positive
+
+ else:
+ measurement_value = -measurement_value
+ value = self._value_negative
+
+ # The following code finds out if it is necessary to change the
+ # buckets to hold the incoming measurement_value, changes them if
+ # necessary. This process does not exist in
+ # _ExplicitBucketHistogram aggregation because the buckets there
+ # are constant in size and amount.
+ index = self._mapping.map_to_index(measurement_value)
+
+ is_rescaling_needed = False
+ low, high = 0, 0
+
+ if len(value) == 0:
+ value.index_start = index
+ value.index_end = index
+ value.index_base = index
+
+ elif (
+ index < value.index_start
+ and (value.index_end - index) >= self._max_size
+ ):
+ is_rescaling_needed = True
+ low = index
+ high = value.index_end
+
+ elif (
+ index > value.index_end
+ and (index - value.index_start) >= self._max_size
+ ):
+ is_rescaling_needed = True
+ low = value.index_start
+ high = index
+
+ if is_rescaling_needed:
+ scale_change = self._get_scale_change(low, high)
+ self._downscale(
+ scale_change,
+ self._value_positive,
+ self._value_negative,
+ )
+ self._mapping = self._new_mapping(
+ self._mapping.scale - scale_change
+ )
+
+ index = self._mapping.map_to_index(measurement_value)
+
+ self._scale = self._mapping.scale
+
+ if index < value.index_start:
+ span = value.index_end - index
+
+ if span >= len(value.counts):
+ value.grow(span + 1, self._max_size)
+
+ value.index_start = index
+
+ elif index > value.index_end:
+ span = index - value.index_start
+
+ if span >= len(value.counts):
+ value.grow(span + 1, self._max_size)
+
+ value.index_end = index
+
+ bucket_index = index - value.index_base
+
+ if bucket_index < 0:
+ bucket_index += len(value.counts)
+
+ # Now the buckets have been changed if needed and bucket_index will
+ # be used to increment the counter of the bucket that needs to be
+ # incremented.
+
+ # This is analogous to
+ # self._value[bisect_left(self._boundaries, measurement_value)] += 1
+ # in _ExplicitBucketHistogramAggregation.aggregate
+ value.increment_bucket(bucket_index)
+
+ self._sample_exemplar(measurement, should_sample_exemplar)
+
+ def collect(
+ self,
+ collection_aggregation_temporality: AggregationTemporality,
+ collection_start_nano: int,
+ ) -> Optional[_DataPointVarT]:
+ """
+ Atomically return a point for the current value of the metric.
+ """
+
+ # pylint: disable=too-many-statements, too-many-locals
+ with self._lock:
+ value_positive = self._value_positive
+ value_negative = self._value_negative
+ sum_ = self._sum
+ min_ = self._min
+ max_ = self._max
+ count = self._count
+ zero_count = self._zero_count
+ scale = self._scale
+
+ self._value_positive = None
+ self._value_negative = None
+ self._sum = 0
+ self._min = inf
+ self._max = -inf
+ self._count = 0
+ self._zero_count = 0
+ self._scale = None
+
+ if (
+ self._instrument_aggregation_temporality
+ is AggregationTemporality.DELTA
+ ):
+ # This happens when the corresponding instrument for this
+ # aggregation is synchronous.
+ if (
+ collection_aggregation_temporality
+ is AggregationTemporality.DELTA
+ ):
+ previous_collection_start_nano = (
+ self._previous_collection_start_nano
+ )
+ self._previous_collection_start_nano = (
+ collection_start_nano
+ )
+
+ if value_positive is None and value_negative is None:
+ return None
+
+ return ExponentialHistogramDataPoint(
+ attributes=self._attributes,
+ exemplars=self._collect_exemplars(),
+ start_time_unix_nano=previous_collection_start_nano,
+ time_unix_nano=collection_start_nano,
+ count=count,
+ sum=sum_,
+ scale=scale,
+ zero_count=zero_count,
+ positive=BucketsPoint(
+ offset=value_positive.offset,
+ bucket_counts=(value_positive.get_offset_counts()),
+ ),
+ negative=BucketsPoint(
+ offset=value_negative.offset,
+ bucket_counts=(value_negative.get_offset_counts()),
+ ),
+ # FIXME: Find the right value for flags
+ flags=0,
+ min=min_,
+ max=max_,
+ )
+
+ # Here collection_temporality is CUMULATIVE.
+ # instrument_temporality is always DELTA for the time being.
+ # Here we need to handle the case where:
+ # collect is called after at least one other call to collect
+ # (there is data in previous buckets, a call to merge is needed
+ # to handle possible differences in bucket sizes).
+ # collect is called without another call previous call to
+ # collect was made (there is no previous buckets, previous,
+ # empty buckets that are the same scale of the current buckets
+ # need to be made so that they can be cumulatively aggregated
+ # to the current buckets).
+
+ if (
+ value_positive is None
+ and self._previous_value_positive is None
+ ):
+ # This happens if collect is called for the first time
+ # and aggregate has not yet been called.
+ value_positive = Buckets()
+ self._previous_value_positive = value_positive.copy_empty()
+ if (
+ value_negative is None
+ and self._previous_value_negative is None
+ ):
+ value_negative = Buckets()
+ self._previous_value_negative = value_negative.copy_empty()
+ if scale is None and self._previous_scale is None:
+ scale = self._mapping.scale
+ self._previous_scale = scale
+
+ if (
+ value_positive is not None
+ and self._previous_value_positive is None
+ ):
+ # This happens when collect is called the very first time
+ # and aggregate has been called before.
+
+ # We need previous buckets to add them to the current ones.
+ # When collect is called for the first time, there are no
+ # previous buckets, so we need to create empty buckets to
+ # add them to the current ones. The addition of empty
+ # buckets to the current ones will result in the current
+ # ones unchanged.
+
+ # The way the previous buckets are generated here is
+ # different from the explicit bucket histogram where
+ # the size and amount of the buckets does not change once
+ # they are instantiated. Here, the size and amount of the
+ # buckets can change with every call to aggregate. In order
+ # to get empty buckets that can be added to the current
+ # ones resulting in the current ones unchanged we need to
+ # generate empty buckets that have the same size and amount
+ # as the current ones, this is what copy_empty does.
+ self._previous_value_positive = value_positive.copy_empty()
+ if (
+ value_negative is not None
+ and self._previous_value_negative is None
+ ):
+ self._previous_value_negative = value_negative.copy_empty()
+ if scale is not None and self._previous_scale is None:
+ self._previous_scale = scale
+
+ if (
+ value_positive is None
+ and self._previous_value_positive is not None
+ ):
+ value_positive = self._previous_value_positive.copy_empty()
+ if (
+ value_negative is None
+ and self._previous_value_negative is not None
+ ):
+ value_negative = self._previous_value_negative.copy_empty()
+ if scale is None and self._previous_scale is not None:
+ scale = self._previous_scale
+
+ min_scale = min(self._previous_scale, scale)
+
+ low_positive, high_positive = (
+ self._get_low_high_previous_current(
+ self._previous_value_positive,
+ value_positive,
+ scale,
+ min_scale,
+ )
+ )
+ low_negative, high_negative = (
+ self._get_low_high_previous_current(
+ self._previous_value_negative,
+ value_negative,
+ scale,
+ min_scale,
+ )
+ )
+
+ min_scale = min(
+ min_scale
+ - self._get_scale_change(low_positive, high_positive),
+ min_scale
+ - self._get_scale_change(low_negative, high_negative),
+ )
+
+ self._downscale(
+ self._previous_scale - min_scale,
+ self._previous_value_positive,
+ self._previous_value_negative,
+ )
+
+ # self._merge adds the values from value to
+ # self._previous_value, this is analogous to
+ # self._previous_value = [
+ # value_element + previous_value_element
+ # for (
+ # value_element,
+ # previous_value_element,
+ # ) in zip(value, self._previous_value)
+ # ]
+ # in _ExplicitBucketHistogramAggregation.collect.
+ self._merge(
+ self._previous_value_positive,
+ value_positive,
+ scale,
+ min_scale,
+ collection_aggregation_temporality,
+ )
+ self._merge(
+ self._previous_value_negative,
+ value_negative,
+ scale,
+ min_scale,
+ collection_aggregation_temporality,
+ )
+
+ self._previous_min = min(min_, self._previous_min)
+ self._previous_max = max(max_, self._previous_max)
+ self._previous_sum = sum_ + self._previous_sum
+ self._previous_count = count + self._previous_count
+ self._previous_zero_count = (
+ zero_count + self._previous_zero_count
+ )
+ self._previous_scale = min_scale
+
+ return ExponentialHistogramDataPoint(
+ attributes=self._attributes,
+ exemplars=self._collect_exemplars(),
+ start_time_unix_nano=self._start_time_unix_nano,
+ time_unix_nano=collection_start_nano,
+ count=self._previous_count,
+ sum=self._previous_sum,
+ scale=self._previous_scale,
+ zero_count=self._previous_zero_count,
+ positive=BucketsPoint(
+ offset=self._previous_value_positive.offset,
+ bucket_counts=(
+ self._previous_value_positive.get_offset_counts()
+ ),
+ ),
+ negative=BucketsPoint(
+ offset=self._previous_value_negative.offset,
+ bucket_counts=(
+ self._previous_value_negative.get_offset_counts()
+ ),
+ ),
+ # FIXME: Find the right value for flags
+ flags=0,
+ min=self._previous_min,
+ max=self._previous_max,
+ )
+
+ return None
+
+ def _get_low_high_previous_current(
+ self,
+ previous_point_buckets,
+ current_point_buckets,
+ current_scale,
+ min_scale,
+ ):
+ (previous_point_low, previous_point_high) = self._get_low_high(
+ previous_point_buckets, self._previous_scale, min_scale
+ )
+ (current_point_low, current_point_high) = self._get_low_high(
+ current_point_buckets, current_scale, min_scale
+ )
+
+ if current_point_low > current_point_high:
+ low = previous_point_low
+ high = previous_point_high
+
+ elif previous_point_low > previous_point_high:
+ low = current_point_low
+ high = current_point_high
+
+ else:
+ low = min(previous_point_low, current_point_low)
+ high = max(previous_point_high, current_point_high)
+
+ return low, high
+
+ @staticmethod
+ def _get_low_high(buckets, scale, min_scale):
+ if buckets.counts == [0]:
+ return 0, -1
+
+ shift = scale - min_scale
+
+ return buckets.index_start >> shift, buckets.index_end >> shift
+
+ @staticmethod
+ def _new_mapping(scale: int) -> Mapping:
+ if scale <= 0:
+ return ExponentMapping(scale)
+ return LogarithmMapping(scale)
+
+ def _get_scale_change(self, low, high):
+ change = 0
+
+ while high - low >= self._max_size:
+ high = high >> 1
+ low = low >> 1
+
+ change += 1
+
+ return change
+
+ @staticmethod
+ def _downscale(change: int, positive, negative):
+ if change == 0:
+ return
+
+ if change < 0:
+ # pylint: disable=broad-exception-raised
+ raise Exception("Invalid change of scale")
+
+ positive.downscale(change)
+ negative.downscale(change)
+
+ def _merge(
+ self,
+ previous_buckets: Buckets,
+ current_buckets: Buckets,
+ current_scale,
+ min_scale,
+ aggregation_temporality,
+ ):
+ current_change = current_scale - min_scale
+
+ for current_bucket_index, current_bucket in enumerate(
+ current_buckets.counts
+ ):
+ if current_bucket == 0:
+ continue
+
+ # Not considering the case where len(previous_buckets) == 0. This
+ # would not happen because self._previous_point is only assigned to
+ # an ExponentialHistogramDataPoint object if self._count != 0.
+
+ current_index = current_buckets.index_base + current_bucket_index
+ if current_index > current_buckets.index_end:
+ current_index -= len(current_buckets.counts)
+
+ index = current_index >> current_change
+
+ if index < previous_buckets.index_start:
+ span = previous_buckets.index_end - index
+
+ if span >= self._max_size:
+ # pylint: disable=broad-exception-raised
+ raise Exception("Incorrect merge scale")
+
+ if span >= len(previous_buckets.counts):
+ previous_buckets.grow(span + 1, self._max_size)
+
+ previous_buckets.index_start = index
+
+ if index > previous_buckets.index_end:
+ span = index - previous_buckets.index_start
+
+ if span >= self._max_size:
+ # pylint: disable=broad-exception-raised
+ raise Exception("Incorrect merge scale")
+
+ if span >= len(previous_buckets.counts):
+ previous_buckets.grow(span + 1, self._max_size)
+
+ previous_buckets.index_end = index
+
+ bucket_index = index - previous_buckets.index_base
+
+ if bucket_index < 0:
+ bucket_index += len(previous_buckets.counts)
+
+ if aggregation_temporality is AggregationTemporality.DELTA:
+ current_bucket = -current_bucket
+
+ previous_buckets.increment_bucket(
+ bucket_index, increment=current_bucket
+ )
+
+
+class Aggregation(ABC):
+ """
+ Base class for all aggregation types.
+ """
+
+ @abstractmethod
+ def _create_aggregation(
+ self,
+ instrument: Instrument,
+ attributes: Attributes,
+ reservoir_factory: Callable[
+ [Type[_Aggregation]], ExemplarReservoirBuilder
+ ],
+ start_time_unix_nano: int,
+ ) -> _Aggregation:
+ """Creates an aggregation"""
+
+
+class DefaultAggregation(Aggregation):
+ """
+ The default aggregation to be used in a `View`.
+
+ This aggregation will create an actual aggregation depending on the
+ instrument type, as specified next:
+
+ ==================================================== ====================================
+ Instrument Aggregation
+ ==================================================== ====================================
+ `opentelemetry.sdk.metrics.Counter` `SumAggregation`
+ `opentelemetry.sdk.metrics.UpDownCounter` `SumAggregation`
+ `opentelemetry.sdk.metrics.ObservableCounter` `SumAggregation`
+ `opentelemetry.sdk.metrics.ObservableUpDownCounter` `SumAggregation`
+ `opentelemetry.sdk.metrics.Histogram` `ExplicitBucketHistogramAggregation`
+ `opentelemetry.sdk.metrics.ObservableGauge` `LastValueAggregation`
+ ==================================================== ====================================
+ """
+
+ def _create_aggregation(
+ self,
+ instrument: Instrument,
+ attributes: Attributes,
+ reservoir_factory: Callable[
+ [Type[_Aggregation]], ExemplarReservoirBuilder
+ ],
+ start_time_unix_nano: int,
+ ) -> _Aggregation:
+ # pylint: disable=too-many-return-statements
+ if isinstance(instrument, Counter):
+ return _SumAggregation(
+ attributes,
+ reservoir_builder=reservoir_factory(_SumAggregation),
+ instrument_is_monotonic=True,
+ instrument_aggregation_temporality=(
+ AggregationTemporality.DELTA
+ ),
+ start_time_unix_nano=start_time_unix_nano,
+ )
+ if isinstance(instrument, UpDownCounter):
+ return _SumAggregation(
+ attributes,
+ reservoir_builder=reservoir_factory(_SumAggregation),
+ instrument_is_monotonic=False,
+ instrument_aggregation_temporality=(
+ AggregationTemporality.DELTA
+ ),
+ start_time_unix_nano=start_time_unix_nano,
+ )
+
+ if isinstance(instrument, ObservableCounter):
+ return _SumAggregation(
+ attributes,
+ reservoir_builder=reservoir_factory(_SumAggregation),
+ instrument_is_monotonic=True,
+ instrument_aggregation_temporality=(
+ AggregationTemporality.CUMULATIVE
+ ),
+ start_time_unix_nano=start_time_unix_nano,
+ )
+
+ if isinstance(instrument, ObservableUpDownCounter):
+ return _SumAggregation(
+ attributes,
+ reservoir_builder=reservoir_factory(_SumAggregation),
+ instrument_is_monotonic=False,
+ instrument_aggregation_temporality=(
+ AggregationTemporality.CUMULATIVE
+ ),
+ start_time_unix_nano=start_time_unix_nano,
+ )
+
+ if isinstance(instrument, Histogram):
+ boundaries = instrument._advisory.explicit_bucket_boundaries
+ return _ExplicitBucketHistogramAggregation(
+ attributes,
+ reservoir_builder=reservoir_factory(
+ _ExplicitBucketHistogramAggregation
+ ),
+ instrument_aggregation_temporality=(
+ AggregationTemporality.DELTA
+ ),
+ boundaries=boundaries,
+ start_time_unix_nano=start_time_unix_nano,
+ )
+
+ if isinstance(instrument, ObservableGauge):
+ return _LastValueAggregation(
+ attributes,
+ reservoir_builder=reservoir_factory(_LastValueAggregation),
+ )
+
+ if isinstance(instrument, _Gauge):
+ return _LastValueAggregation(
+ attributes,
+ reservoir_builder=reservoir_factory(_LastValueAggregation),
+ )
+
+ # pylint: disable=broad-exception-raised
+ raise Exception(f"Invalid instrument type {type(instrument)} found")
+
+
+class ExponentialBucketHistogramAggregation(Aggregation):
+ def __init__(
+ self,
+ max_size: int = 160,
+ max_scale: int = 20,
+ ):
+ self._max_size = max_size
+ self._max_scale = max_scale
+
+ def _create_aggregation(
+ self,
+ instrument: Instrument,
+ attributes: Attributes,
+ reservoir_factory: Callable[
+ [Type[_Aggregation]], ExemplarReservoirBuilder
+ ],
+ start_time_unix_nano: int,
+ ) -> _Aggregation:
+ instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
+ if isinstance(instrument, Synchronous):
+ instrument_aggregation_temporality = AggregationTemporality.DELTA
+ elif isinstance(instrument, Asynchronous):
+ instrument_aggregation_temporality = (
+ AggregationTemporality.CUMULATIVE
+ )
+
+ return _ExponentialBucketHistogramAggregation(
+ attributes,
+ reservoir_factory(_ExponentialBucketHistogramAggregation),
+ instrument_aggregation_temporality,
+ start_time_unix_nano,
+ max_size=self._max_size,
+ max_scale=self._max_scale,
+ )
+
+
+class ExplicitBucketHistogramAggregation(Aggregation):
+ """This aggregation informs the SDK to collect:
+
+ - Count of Measurement values falling within explicit bucket boundaries.
+ - Arithmetic sum of Measurement values in population. This SHOULD NOT be collected when used with instruments that record negative measurements, e.g. UpDownCounter or ObservableGauge.
+ - Min (optional) Measurement value in population.
+ - Max (optional) Measurement value in population.
+
+
+ Args:
+ boundaries: Array of increasing values representing explicit bucket boundary values.
+ record_min_max: Whether to record min and max.
+ """
+
+ def __init__(
+ self,
+ boundaries: Optional[Sequence[float]] = None,
+ record_min_max: bool = True,
+ ) -> None:
+ self._boundaries = boundaries
+ self._record_min_max = record_min_max
+
+ def _create_aggregation(
+ self,
+ instrument: Instrument,
+ attributes: Attributes,
+ reservoir_factory: Callable[
+ [Type[_Aggregation]], ExemplarReservoirBuilder
+ ],
+ start_time_unix_nano: int,
+ ) -> _Aggregation:
+ instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
+ if isinstance(instrument, Synchronous):
+ instrument_aggregation_temporality = AggregationTemporality.DELTA
+ elif isinstance(instrument, Asynchronous):
+ instrument_aggregation_temporality = (
+ AggregationTemporality.CUMULATIVE
+ )
+
+ if self._boundaries is None:
+ self._boundaries = (
+ instrument._advisory.explicit_bucket_boundaries
+ or _DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES
+ )
+
+ return _ExplicitBucketHistogramAggregation(
+ attributes,
+ instrument_aggregation_temporality,
+ start_time_unix_nano,
+ reservoir_factory(_ExplicitBucketHistogramAggregation),
+ self._boundaries,
+ self._record_min_max,
+ )
+
+
+class SumAggregation(Aggregation):
+ """This aggregation informs the SDK to collect:
+
+ - The arithmetic sum of Measurement values.
+ """
+
+ def _create_aggregation(
+ self,
+ instrument: Instrument,
+ attributes: Attributes,
+ reservoir_factory: Callable[
+ [Type[_Aggregation]], ExemplarReservoirBuilder
+ ],
+ start_time_unix_nano: int,
+ ) -> _Aggregation:
+ instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
+ if isinstance(instrument, Synchronous):
+ instrument_aggregation_temporality = AggregationTemporality.DELTA
+ elif isinstance(instrument, Asynchronous):
+ instrument_aggregation_temporality = (
+ AggregationTemporality.CUMULATIVE
+ )
+
+ return _SumAggregation(
+ attributes,
+ isinstance(instrument, (Counter, ObservableCounter)),
+ instrument_aggregation_temporality,
+ start_time_unix_nano,
+ reservoir_factory(_SumAggregation),
+ )
+
+
+class LastValueAggregation(Aggregation):
+ """
+ This aggregation informs the SDK to collect:
+
+ - The last Measurement.
+ - The timestamp of the last Measurement.
+ """
+
+ def _create_aggregation(
+ self,
+ instrument: Instrument,
+ attributes: Attributes,
+ reservoir_factory: Callable[
+ [Type[_Aggregation]], ExemplarReservoirBuilder
+ ],
+ start_time_unix_nano: int,
+ ) -> _Aggregation:
+ return _LastValueAggregation(
+ attributes,
+ reservoir_builder=reservoir_factory(_LastValueAggregation),
+ )
+
+
+class DropAggregation(Aggregation):
+ """Using this aggregation will make all measurements be ignored."""
+
+ def _create_aggregation(
+ self,
+ instrument: Instrument,
+ attributes: Attributes,
+ reservoir_factory: Callable[
+ [Type[_Aggregation]], ExemplarReservoirBuilder
+ ],
+ start_time_unix_nano: int,
+ ) -> _Aggregation:
+ return _DropAggregation(
+ attributes, reservoir_factory(_DropAggregation)
+ )