diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py | 1475 |
1 files changed, 1475 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py new file mode 100644 index 00000000..8443d951 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -0,0 +1,1475 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=too-many-lines + +from abc import ABC, abstractmethod +from bisect import bisect_left +from enum import IntEnum +from functools import partial +from logging import getLogger +from math import inf +from threading import Lock +from typing import ( + Callable, + Generic, + List, + Optional, + Sequence, + Type, + TypeVar, +) + +from opentelemetry.metrics import ( + Asynchronous, + Counter, + Histogram, + Instrument, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + Synchronous, + UpDownCounter, + _Gauge, +) +from opentelemetry.sdk.metrics._internal.exemplar import ( + Exemplar, + ExemplarReservoirBuilder, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import ( + Buckets, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( + Mapping, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import ( + ExponentMapping, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.logarithm_mapping import ( + LogarithmMapping, +) +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.point import Buckets as BucketsPoint +from opentelemetry.sdk.metrics._internal.point import ( + ExponentialHistogramDataPoint, + HistogramDataPoint, + NumberDataPoint, + Sum, +) +from opentelemetry.sdk.metrics._internal.point import Gauge as GaugePoint +from opentelemetry.sdk.metrics._internal.point import ( + Histogram as HistogramPoint, +) +from opentelemetry.util.types import Attributes + +_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint) + +_logger = getLogger(__name__) + + +class AggregationTemporality(IntEnum): + """ + The temporality to use when aggregating data. + + Can be one of the following values: + """ + + UNSPECIFIED = 0 + DELTA = 1 + CUMULATIVE = 2 + + +class _Aggregation(ABC, Generic[_DataPointVarT]): + def __init__( + self, + attributes: Attributes, + reservoir_builder: ExemplarReservoirBuilder, + ): + self._lock = Lock() + self._attributes = attributes + self._reservoir = reservoir_builder() + self._previous_point = None + + @abstractmethod + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + """Aggregate a measurement. + + Args: + measurement: Measurement to aggregate + should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not. + """ + + @abstractmethod + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + pass + + def _collect_exemplars(self) -> Sequence[Exemplar]: + """Returns the collected exemplars. + + Returns: + The exemplars collected by the reservoir + """ + return self._reservoir.collect(self._attributes) + + def _sample_exemplar( + self, measurement: Measurement, should_sample_exemplar: bool + ) -> None: + """Offer the measurement to the exemplar reservoir for sampling. + + It should be called within the each :ref:`aggregate` call. + + Args: + measurement: The new measurement + should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not. + """ + if should_sample_exemplar: + self._reservoir.offer( + measurement.value, + measurement.time_unix_nano, + measurement.attributes, + measurement.context, + ) + + +class _DropAggregation(_Aggregation): + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + pass + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + pass + + +class _SumAggregation(_Aggregation[Sum]): + def __init__( + self, + attributes: Attributes, + instrument_is_monotonic: bool, + instrument_aggregation_temporality: AggregationTemporality, + start_time_unix_nano: int, + reservoir_builder: ExemplarReservoirBuilder, + ): + super().__init__(attributes, reservoir_builder) + + self._start_time_unix_nano = start_time_unix_nano + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + self._instrument_is_monotonic = instrument_is_monotonic + + self._value = None + + self._previous_collection_start_nano = self._start_time_unix_nano + self._previous_value = 0 + + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + with self._lock: + if self._value is None: + self._value = 0 + + self._value = self._value + measurement.value + + self._sample_exemplar(measurement, should_sample_exemplar) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[NumberDataPoint]: + """ + Atomically return a point for the current value of the metric and + reset the aggregation value. + + Synchronous instruments have a method which is called directly with + increments for a given quantity: + + For example, an instrument that counts the amount of passengers in + every vehicle that crosses a certain point in a highway: + + synchronous_instrument.add(2) + collect(...) # 2 passengers are counted + synchronous_instrument.add(3) + collect(...) # 3 passengers are counted + synchronous_instrument.add(1) + collect(...) # 1 passenger is counted + + In this case the instrument aggregation temporality is DELTA because + every value represents an increment to the count, + + Asynchronous instruments have a callback which returns the total value + of a given quantity: + + For example, an instrument that measures the amount of bytes written to + a certain hard drive: + + callback() -> 1352 + collect(...) # 1352 bytes have been written so far + callback() -> 2324 + collect(...) # 2324 bytes have been written so far + callback() -> 4542 + collect(...) # 4542 bytes have been written so far + + In this case the instrument aggregation temporality is CUMULATIVE + because every value represents the total of the measurement. + + There is also the collection aggregation temporality, which is passed + to this method. The collection aggregation temporality defines the + nature of the returned value by this aggregation. + + When the collection aggregation temporality matches the + instrument aggregation temporality, then this method returns the + current value directly: + + synchronous_instrument.add(2) + collect(DELTA) -> 2 + synchronous_instrument.add(3) + collect(DELTA) -> 3 + synchronous_instrument.add(1) + collect(DELTA) -> 1 + + callback() -> 1352 + collect(CUMULATIVE) -> 1352 + callback() -> 2324 + collect(CUMULATIVE) -> 2324 + callback() -> 4542 + collect(CUMULATIVE) -> 4542 + + When the collection aggregation temporality does not match the + instrument aggregation temporality, then a conversion is made. For this + purpose, this aggregation keeps a private attribute, + self._previous_value. + + When the instrument is synchronous: + + self._previous_value is the sum of every previously + collected (delta) value. In this case, the returned (cumulative) value + will be: + + self._previous_value + value + + synchronous_instrument.add(2) + collect(CUMULATIVE) -> 2 + synchronous_instrument.add(3) + collect(CUMULATIVE) -> 5 + synchronous_instrument.add(1) + collect(CUMULATIVE) -> 6 + + Also, as a diagram: + + time -> + + self._previous_value + |-------------| + + value (delta) + |----| + + returned value (cumulative) + |------------------| + + When the instrument is asynchronous: + + self._previous_value is the value of the previously + collected (cumulative) value. In this case, the returned (delta) value + will be: + + value - self._previous_value + + callback() -> 1352 + collect(DELTA) -> 1352 + callback() -> 2324 + collect(DELTA) -> 972 + callback() -> 4542 + collect(DELTA) -> 2218 + + Also, as a diagram: + + time -> + + self._previous_value + |-------------| + + value (cumulative) + |------------------| + + returned value (delta) + |----| + """ + + with self._lock: + value = self._value + self._value = None + + if ( + self._instrument_aggregation_temporality + is AggregationTemporality.DELTA + ): + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) + + if value is None: + return None + + return NumberDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + value=value, + ) + + if value is None: + value = 0 + + self._previous_value = value + self._previous_value + + return NumberDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + value=self._previous_value, + ) + + # This happens when the corresponding instrument for this + # aggregation is asynchronous. + + if value is None: + # This happens when the corresponding instrument callback + # does not produce measurements. + return None + + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + result_value = value - self._previous_value + + self._previous_value = value + + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = collection_start_nano + + return NumberDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + value=result_value, + ) + + return NumberDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + value=value, + ) + + +class _LastValueAggregation(_Aggregation[GaugePoint]): + def __init__( + self, + attributes: Attributes, + reservoir_builder: ExemplarReservoirBuilder, + ): + super().__init__(attributes, reservoir_builder) + self._value = None + + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ): + with self._lock: + self._value = measurement.value + + self._sample_exemplar(measurement, should_sample_exemplar) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + """ + Atomically return a point for the current value of the metric. + """ + with self._lock: + if self._value is None: + return None + value = self._value + self._value = None + + exemplars = self._collect_exemplars() + + return NumberDataPoint( + attributes=self._attributes, + exemplars=exemplars, + start_time_unix_nano=None, + time_unix_nano=collection_start_nano, + value=value, + ) + + +_DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES: Sequence[float] = ( + 0.0, + 5.0, + 10.0, + 25.0, + 50.0, + 75.0, + 100.0, + 250.0, + 500.0, + 750.0, + 1000.0, + 2500.0, + 5000.0, + 7500.0, + 10000.0, +) + + +class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): + def __init__( + self, + attributes: Attributes, + instrument_aggregation_temporality: AggregationTemporality, + start_time_unix_nano: int, + reservoir_builder: ExemplarReservoirBuilder, + boundaries: Optional[Sequence[float]] = None, + record_min_max: bool = True, + ): + if boundaries is None: + boundaries = ( + _DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES + ) + super().__init__( + attributes, + reservoir_builder=partial( + reservoir_builder, boundaries=boundaries + ), + ) + + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + self._start_time_unix_nano = start_time_unix_nano + self._boundaries = tuple(boundaries) + self._record_min_max = record_min_max + + self._value = None + self._min = inf + self._max = -inf + self._sum = 0 + + self._previous_value = None + self._previous_min = inf + self._previous_max = -inf + self._previous_sum = 0 + + self._previous_collection_start_nano = self._start_time_unix_nano + + def _get_empty_bucket_counts(self) -> List[int]: + return [0] * (len(self._boundaries) + 1) + + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + with self._lock: + if self._value is None: + self._value = self._get_empty_bucket_counts() + + measurement_value = measurement.value + + self._sum += measurement_value + + if self._record_min_max: + self._min = min(self._min, measurement_value) + self._max = max(self._max, measurement_value) + + self._value[bisect_left(self._boundaries, measurement_value)] += 1 + + self._sample_exemplar(measurement, should_sample_exemplar) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + """ + Atomically return a point for the current value of the metric. + """ + + with self._lock: + value = self._value + sum_ = self._sum + min_ = self._min + max_ = self._max + + self._value = None + self._sum = 0 + self._min = inf + self._max = -inf + + if ( + self._instrument_aggregation_temporality + is AggregationTemporality.DELTA + ): + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) + + if value is None: + return None + + return HistogramDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + count=sum(value), + sum=sum_, + bucket_counts=tuple(value), + explicit_bounds=self._boundaries, + min=min_, + max=max_, + ) + + if value is None: + value = self._get_empty_bucket_counts() + + if self._previous_value is None: + self._previous_value = self._get_empty_bucket_counts() + + self._previous_value = [ + value_element + previous_value_element + for ( + value_element, + previous_value_element, + ) in zip(value, self._previous_value) + ] + self._previous_min = min(min_, self._previous_min) + self._previous_max = max(max_, self._previous_max) + self._previous_sum = sum_ + self._previous_sum + + return HistogramDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=sum(self._previous_value), + sum=self._previous_sum, + bucket_counts=tuple(self._previous_value), + explicit_bounds=self._boundaries, + min=self._previous_min, + max=self._previous_max, + ) + + return None + + +# pylint: disable=protected-access +class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): + # _min_max_size and _max_max_size are the smallest and largest values + # the max_size parameter may have, respectively. + + # _min_max_size is is the smallest reasonable value which is small enough + # to contain the entire normal floating point range at the minimum scale. + _min_max_size = 2 + + # _max_max_size is an arbitrary limit meant to limit accidental creation of + # giant exponential bucket histograms. + _max_max_size = 16384 + + def __init__( + self, + attributes: Attributes, + reservoir_builder: ExemplarReservoirBuilder, + instrument_aggregation_temporality: AggregationTemporality, + start_time_unix_nano: int, + # This is the default maximum number of buckets per positive or + # negative number range. The value 160 is specified by OpenTelemetry. + # See the derivation here: + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-bucket-histogram-aggregation) + max_size: int = 160, + max_scale: int = 20, + ): + # max_size is the maximum capacity of the positive and negative + # buckets. + # _sum is the sum of all the values aggregated by this aggregator. + # _count is the count of all calls to aggregate. + # _zero_count is the count of all the calls to aggregate when the value + # to be aggregated is exactly 0. + # _min is the smallest value aggregated by this aggregator. + # _max is the smallest value aggregated by this aggregator. + # _positive holds the positive values. + # _negative holds the negative values by their absolute value. + if max_size < self._min_max_size: + raise ValueError( + f"Buckets max size {max_size} is smaller than " + "minimum max size {self._min_max_size}" + ) + + if max_size > self._max_max_size: + raise ValueError( + f"Buckets max size {max_size} is larger than " + "maximum max size {self._max_max_size}" + ) + if max_scale > 20: + _logger.warning( + "max_scale is set to %s which is " + "larger than the recommended value of 20", + max_scale, + ) + + # This aggregation is analogous to _ExplicitBucketHistogramAggregation, + # the only difference is that with every call to aggregate, the size + # and amount of buckets can change (in + # _ExplicitBucketHistogramAggregation both size and amount of buckets + # remain constant once it is instantiated). + + super().__init__( + attributes, + reservoir_builder=partial( + reservoir_builder, size=min(20, max_size) + ), + ) + + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + self._start_time_unix_nano = start_time_unix_nano + self._max_size = max_size + self._max_scale = max_scale + + self._value_positive = None + self._value_negative = None + self._min = inf + self._max = -inf + self._sum = 0 + self._count = 0 + self._zero_count = 0 + self._scale = None + + self._previous_value_positive = None + self._previous_value_negative = None + self._previous_min = inf + self._previous_max = -inf + self._previous_sum = 0 + self._previous_count = 0 + self._previous_zero_count = 0 + self._previous_scale = None + + self._previous_collection_start_nano = self._start_time_unix_nano + + self._mapping = self._new_mapping(self._max_scale) + + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + # pylint: disable=too-many-branches,too-many-statements, too-many-locals + + with self._lock: + if self._value_positive is None: + self._value_positive = Buckets() + if self._value_negative is None: + self._value_negative = Buckets() + + measurement_value = measurement.value + + self._sum += measurement_value + + self._min = min(self._min, measurement_value) + self._max = max(self._max, measurement_value) + + self._count += 1 + + if measurement_value == 0: + self._zero_count += 1 + + if self._count == self._zero_count: + self._scale = 0 + + return + + if measurement_value > 0: + value = self._value_positive + + else: + measurement_value = -measurement_value + value = self._value_negative + + # The following code finds out if it is necessary to change the + # buckets to hold the incoming measurement_value, changes them if + # necessary. This process does not exist in + # _ExplicitBucketHistogram aggregation because the buckets there + # are constant in size and amount. + index = self._mapping.map_to_index(measurement_value) + + is_rescaling_needed = False + low, high = 0, 0 + + if len(value) == 0: + value.index_start = index + value.index_end = index + value.index_base = index + + elif ( + index < value.index_start + and (value.index_end - index) >= self._max_size + ): + is_rescaling_needed = True + low = index + high = value.index_end + + elif ( + index > value.index_end + and (index - value.index_start) >= self._max_size + ): + is_rescaling_needed = True + low = value.index_start + high = index + + if is_rescaling_needed: + scale_change = self._get_scale_change(low, high) + self._downscale( + scale_change, + self._value_positive, + self._value_negative, + ) + self._mapping = self._new_mapping( + self._mapping.scale - scale_change + ) + + index = self._mapping.map_to_index(measurement_value) + + self._scale = self._mapping.scale + + if index < value.index_start: + span = value.index_end - index + + if span >= len(value.counts): + value.grow(span + 1, self._max_size) + + value.index_start = index + + elif index > value.index_end: + span = index - value.index_start + + if span >= len(value.counts): + value.grow(span + 1, self._max_size) + + value.index_end = index + + bucket_index = index - value.index_base + + if bucket_index < 0: + bucket_index += len(value.counts) + + # Now the buckets have been changed if needed and bucket_index will + # be used to increment the counter of the bucket that needs to be + # incremented. + + # This is analogous to + # self._value[bisect_left(self._boundaries, measurement_value)] += 1 + # in _ExplicitBucketHistogramAggregation.aggregate + value.increment_bucket(bucket_index) + + self._sample_exemplar(measurement, should_sample_exemplar) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + """ + Atomically return a point for the current value of the metric. + """ + + # pylint: disable=too-many-statements, too-many-locals + with self._lock: + value_positive = self._value_positive + value_negative = self._value_negative + sum_ = self._sum + min_ = self._min + max_ = self._max + count = self._count + zero_count = self._zero_count + scale = self._scale + + self._value_positive = None + self._value_negative = None + self._sum = 0 + self._min = inf + self._max = -inf + self._count = 0 + self._zero_count = 0 + self._scale = None + + if ( + self._instrument_aggregation_temporality + is AggregationTemporality.DELTA + ): + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) + + if value_positive is None and value_negative is None: + return None + + return ExponentialHistogramDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + count=count, + sum=sum_, + scale=scale, + zero_count=zero_count, + positive=BucketsPoint( + offset=value_positive.offset, + bucket_counts=(value_positive.get_offset_counts()), + ), + negative=BucketsPoint( + offset=value_negative.offset, + bucket_counts=(value_negative.get_offset_counts()), + ), + # FIXME: Find the right value for flags + flags=0, + min=min_, + max=max_, + ) + + # Here collection_temporality is CUMULATIVE. + # instrument_temporality is always DELTA for the time being. + # Here we need to handle the case where: + # collect is called after at least one other call to collect + # (there is data in previous buckets, a call to merge is needed + # to handle possible differences in bucket sizes). + # collect is called without another call previous call to + # collect was made (there is no previous buckets, previous, + # empty buckets that are the same scale of the current buckets + # need to be made so that they can be cumulatively aggregated + # to the current buckets). + + if ( + value_positive is None + and self._previous_value_positive is None + ): + # This happens if collect is called for the first time + # and aggregate has not yet been called. + value_positive = Buckets() + self._previous_value_positive = value_positive.copy_empty() + if ( + value_negative is None + and self._previous_value_negative is None + ): + value_negative = Buckets() + self._previous_value_negative = value_negative.copy_empty() + if scale is None and self._previous_scale is None: + scale = self._mapping.scale + self._previous_scale = scale + + if ( + value_positive is not None + and self._previous_value_positive is None + ): + # This happens when collect is called the very first time + # and aggregate has been called before. + + # We need previous buckets to add them to the current ones. + # When collect is called for the first time, there are no + # previous buckets, so we need to create empty buckets to + # add them to the current ones. The addition of empty + # buckets to the current ones will result in the current + # ones unchanged. + + # The way the previous buckets are generated here is + # different from the explicit bucket histogram where + # the size and amount of the buckets does not change once + # they are instantiated. Here, the size and amount of the + # buckets can change with every call to aggregate. In order + # to get empty buckets that can be added to the current + # ones resulting in the current ones unchanged we need to + # generate empty buckets that have the same size and amount + # as the current ones, this is what copy_empty does. + self._previous_value_positive = value_positive.copy_empty() + if ( + value_negative is not None + and self._previous_value_negative is None + ): + self._previous_value_negative = value_negative.copy_empty() + if scale is not None and self._previous_scale is None: + self._previous_scale = scale + + if ( + value_positive is None + and self._previous_value_positive is not None + ): + value_positive = self._previous_value_positive.copy_empty() + if ( + value_negative is None + and self._previous_value_negative is not None + ): + value_negative = self._previous_value_negative.copy_empty() + if scale is None and self._previous_scale is not None: + scale = self._previous_scale + + min_scale = min(self._previous_scale, scale) + + low_positive, high_positive = ( + self._get_low_high_previous_current( + self._previous_value_positive, + value_positive, + scale, + min_scale, + ) + ) + low_negative, high_negative = ( + self._get_low_high_previous_current( + self._previous_value_negative, + value_negative, + scale, + min_scale, + ) + ) + + min_scale = min( + min_scale + - self._get_scale_change(low_positive, high_positive), + min_scale + - self._get_scale_change(low_negative, high_negative), + ) + + self._downscale( + self._previous_scale - min_scale, + self._previous_value_positive, + self._previous_value_negative, + ) + + # self._merge adds the values from value to + # self._previous_value, this is analogous to + # self._previous_value = [ + # value_element + previous_value_element + # for ( + # value_element, + # previous_value_element, + # ) in zip(value, self._previous_value) + # ] + # in _ExplicitBucketHistogramAggregation.collect. + self._merge( + self._previous_value_positive, + value_positive, + scale, + min_scale, + collection_aggregation_temporality, + ) + self._merge( + self._previous_value_negative, + value_negative, + scale, + min_scale, + collection_aggregation_temporality, + ) + + self._previous_min = min(min_, self._previous_min) + self._previous_max = max(max_, self._previous_max) + self._previous_sum = sum_ + self._previous_sum + self._previous_count = count + self._previous_count + self._previous_zero_count = ( + zero_count + self._previous_zero_count + ) + self._previous_scale = min_scale + + return ExponentialHistogramDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=self._previous_count, + sum=self._previous_sum, + scale=self._previous_scale, + zero_count=self._previous_zero_count, + positive=BucketsPoint( + offset=self._previous_value_positive.offset, + bucket_counts=( + self._previous_value_positive.get_offset_counts() + ), + ), + negative=BucketsPoint( + offset=self._previous_value_negative.offset, + bucket_counts=( + self._previous_value_negative.get_offset_counts() + ), + ), + # FIXME: Find the right value for flags + flags=0, + min=self._previous_min, + max=self._previous_max, + ) + + return None + + def _get_low_high_previous_current( + self, + previous_point_buckets, + current_point_buckets, + current_scale, + min_scale, + ): + (previous_point_low, previous_point_high) = self._get_low_high( + previous_point_buckets, self._previous_scale, min_scale + ) + (current_point_low, current_point_high) = self._get_low_high( + current_point_buckets, current_scale, min_scale + ) + + if current_point_low > current_point_high: + low = previous_point_low + high = previous_point_high + + elif previous_point_low > previous_point_high: + low = current_point_low + high = current_point_high + + else: + low = min(previous_point_low, current_point_low) + high = max(previous_point_high, current_point_high) + + return low, high + + @staticmethod + def _get_low_high(buckets, scale, min_scale): + if buckets.counts == [0]: + return 0, -1 + + shift = scale - min_scale + + return buckets.index_start >> shift, buckets.index_end >> shift + + @staticmethod + def _new_mapping(scale: int) -> Mapping: + if scale <= 0: + return ExponentMapping(scale) + return LogarithmMapping(scale) + + def _get_scale_change(self, low, high): + change = 0 + + while high - low >= self._max_size: + high = high >> 1 + low = low >> 1 + + change += 1 + + return change + + @staticmethod + def _downscale(change: int, positive, negative): + if change == 0: + return + + if change < 0: + # pylint: disable=broad-exception-raised + raise Exception("Invalid change of scale") + + positive.downscale(change) + negative.downscale(change) + + def _merge( + self, + previous_buckets: Buckets, + current_buckets: Buckets, + current_scale, + min_scale, + aggregation_temporality, + ): + current_change = current_scale - min_scale + + for current_bucket_index, current_bucket in enumerate( + current_buckets.counts + ): + if current_bucket == 0: + continue + + # Not considering the case where len(previous_buckets) == 0. This + # would not happen because self._previous_point is only assigned to + # an ExponentialHistogramDataPoint object if self._count != 0. + + current_index = current_buckets.index_base + current_bucket_index + if current_index > current_buckets.index_end: + current_index -= len(current_buckets.counts) + + index = current_index >> current_change + + if index < previous_buckets.index_start: + span = previous_buckets.index_end - index + + if span >= self._max_size: + # pylint: disable=broad-exception-raised + raise Exception("Incorrect merge scale") + + if span >= len(previous_buckets.counts): + previous_buckets.grow(span + 1, self._max_size) + + previous_buckets.index_start = index + + if index > previous_buckets.index_end: + span = index - previous_buckets.index_start + + if span >= self._max_size: + # pylint: disable=broad-exception-raised + raise Exception("Incorrect merge scale") + + if span >= len(previous_buckets.counts): + previous_buckets.grow(span + 1, self._max_size) + + previous_buckets.index_end = index + + bucket_index = index - previous_buckets.index_base + + if bucket_index < 0: + bucket_index += len(previous_buckets.counts) + + if aggregation_temporality is AggregationTemporality.DELTA: + current_bucket = -current_bucket + + previous_buckets.increment_bucket( + bucket_index, increment=current_bucket + ) + + +class Aggregation(ABC): + """ + Base class for all aggregation types. + """ + + @abstractmethod + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + """Creates an aggregation""" + + +class DefaultAggregation(Aggregation): + """ + The default aggregation to be used in a `View`. + + This aggregation will create an actual aggregation depending on the + instrument type, as specified next: + + ==================================================== ==================================== + Instrument Aggregation + ==================================================== ==================================== + `opentelemetry.sdk.metrics.Counter` `SumAggregation` + `opentelemetry.sdk.metrics.UpDownCounter` `SumAggregation` + `opentelemetry.sdk.metrics.ObservableCounter` `SumAggregation` + `opentelemetry.sdk.metrics.ObservableUpDownCounter` `SumAggregation` + `opentelemetry.sdk.metrics.Histogram` `ExplicitBucketHistogramAggregation` + `opentelemetry.sdk.metrics.ObservableGauge` `LastValueAggregation` + ==================================================== ==================================== + """ + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + # pylint: disable=too-many-return-statements + if isinstance(instrument, Counter): + return _SumAggregation( + attributes, + reservoir_builder=reservoir_factory(_SumAggregation), + instrument_is_monotonic=True, + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), + start_time_unix_nano=start_time_unix_nano, + ) + if isinstance(instrument, UpDownCounter): + return _SumAggregation( + attributes, + reservoir_builder=reservoir_factory(_SumAggregation), + instrument_is_monotonic=False, + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), + start_time_unix_nano=start_time_unix_nano, + ) + + if isinstance(instrument, ObservableCounter): + return _SumAggregation( + attributes, + reservoir_builder=reservoir_factory(_SumAggregation), + instrument_is_monotonic=True, + instrument_aggregation_temporality=( + AggregationTemporality.CUMULATIVE + ), + start_time_unix_nano=start_time_unix_nano, + ) + + if isinstance(instrument, ObservableUpDownCounter): + return _SumAggregation( + attributes, + reservoir_builder=reservoir_factory(_SumAggregation), + instrument_is_monotonic=False, + instrument_aggregation_temporality=( + AggregationTemporality.CUMULATIVE + ), + start_time_unix_nano=start_time_unix_nano, + ) + + if isinstance(instrument, Histogram): + boundaries = instrument._advisory.explicit_bucket_boundaries + return _ExplicitBucketHistogramAggregation( + attributes, + reservoir_builder=reservoir_factory( + _ExplicitBucketHistogramAggregation + ), + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), + boundaries=boundaries, + start_time_unix_nano=start_time_unix_nano, + ) + + if isinstance(instrument, ObservableGauge): + return _LastValueAggregation( + attributes, + reservoir_builder=reservoir_factory(_LastValueAggregation), + ) + + if isinstance(instrument, _Gauge): + return _LastValueAggregation( + attributes, + reservoir_builder=reservoir_factory(_LastValueAggregation), + ) + + # pylint: disable=broad-exception-raised + raise Exception(f"Invalid instrument type {type(instrument)} found") + + +class ExponentialBucketHistogramAggregation(Aggregation): + def __init__( + self, + max_size: int = 160, + max_scale: int = 20, + ): + self._max_size = max_size + self._max_scale = max_scale + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + instrument_aggregation_temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) + + return _ExponentialBucketHistogramAggregation( + attributes, + reservoir_factory(_ExponentialBucketHistogramAggregation), + instrument_aggregation_temporality, + start_time_unix_nano, + max_size=self._max_size, + max_scale=self._max_scale, + ) + + +class ExplicitBucketHistogramAggregation(Aggregation): + """This aggregation informs the SDK to collect: + + - Count of Measurement values falling within explicit bucket boundaries. + - Arithmetic sum of Measurement values in population. This SHOULD NOT be collected when used with instruments that record negative measurements, e.g. UpDownCounter or ObservableGauge. + - Min (optional) Measurement value in population. + - Max (optional) Measurement value in population. + + + Args: + boundaries: Array of increasing values representing explicit bucket boundary values. + record_min_max: Whether to record min and max. + """ + + def __init__( + self, + boundaries: Optional[Sequence[float]] = None, + record_min_max: bool = True, + ) -> None: + self._boundaries = boundaries + self._record_min_max = record_min_max + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + instrument_aggregation_temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) + + if self._boundaries is None: + self._boundaries = ( + instrument._advisory.explicit_bucket_boundaries + or _DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES + ) + + return _ExplicitBucketHistogramAggregation( + attributes, + instrument_aggregation_temporality, + start_time_unix_nano, + reservoir_factory(_ExplicitBucketHistogramAggregation), + self._boundaries, + self._record_min_max, + ) + + +class SumAggregation(Aggregation): + """This aggregation informs the SDK to collect: + + - The arithmetic sum of Measurement values. + """ + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + instrument_aggregation_temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) + + return _SumAggregation( + attributes, + isinstance(instrument, (Counter, ObservableCounter)), + instrument_aggregation_temporality, + start_time_unix_nano, + reservoir_factory(_SumAggregation), + ) + + +class LastValueAggregation(Aggregation): + """ + This aggregation informs the SDK to collect: + + - The last Measurement. + - The timestamp of the last Measurement. + """ + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + return _LastValueAggregation( + attributes, + reservoir_builder=reservoir_factory(_LastValueAggregation), + ) + + +class DropAggregation(Aggregation): + """Using this aggregation will make all measurements be ignored.""" + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + return _DropAggregation( + attributes, reservoir_factory(_DropAggregation) + ) |