diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics')
27 files changed, 5742 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/__init__.py new file mode 100644 index 00000000..b022f129 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/__init__.py @@ -0,0 +1,57 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from opentelemetry.sdk.metrics._internal import Meter, MeterProvider +from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError +from opentelemetry.sdk.metrics._internal.exemplar import ( + AlignedHistogramBucketExemplarReservoir, + AlwaysOffExemplarFilter, + AlwaysOnExemplarFilter, + Exemplar, + ExemplarFilter, + ExemplarReservoir, + SimpleFixedSizeExemplarReservoir, + TraceBasedExemplarFilter, +) +from opentelemetry.sdk.metrics._internal.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics._internal.instrument import Gauge as _Gauge + +__all__ = [ + "AlignedHistogramBucketExemplarReservoir", + "AlwaysOnExemplarFilter", + "AlwaysOffExemplarFilter", + "Exemplar", + "ExemplarFilter", + "ExemplarReservoir", + "Meter", + "MeterProvider", + "MetricsTimeoutError", + "Counter", + "Histogram", + "_Gauge", + "ObservableCounter", + "ObservableGauge", + "ObservableUpDownCounter", + "SimpleFixedSizeExemplarReservoir", + "UpDownCounter", + "TraceBasedExemplarFilter", +] diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py new file mode 100644 index 00000000..faa0959f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py @@ -0,0 +1,582 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import weakref +from atexit import register, unregister +from logging import getLogger +from os import environ +from threading import Lock +from time import time_ns +from typing import Optional, Sequence + +# This kind of import is needed to avoid Sphinx errors. +import opentelemetry.sdk.metrics +from opentelemetry.metrics import Counter as APICounter +from opentelemetry.metrics import Histogram as APIHistogram +from opentelemetry.metrics import Meter as APIMeter +from opentelemetry.metrics import MeterProvider as APIMeterProvider +from opentelemetry.metrics import NoOpMeter +from opentelemetry.metrics import ObservableCounter as APIObservableCounter +from opentelemetry.metrics import ObservableGauge as APIObservableGauge +from opentelemetry.metrics import ( + ObservableUpDownCounter as APIObservableUpDownCounter, +) +from opentelemetry.metrics import UpDownCounter as APIUpDownCounter +from opentelemetry.metrics import _Gauge as APIGauge +from opentelemetry.sdk.environment_variables import ( + OTEL_METRICS_EXEMPLAR_FILTER, + OTEL_SDK_DISABLED, +) +from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError +from opentelemetry.sdk.metrics._internal.exemplar import ( + AlwaysOffExemplarFilter, + AlwaysOnExemplarFilter, + ExemplarFilter, + TraceBasedExemplarFilter, +) +from opentelemetry.sdk.metrics._internal.instrument import ( + _Counter, + _Gauge, + _Histogram, + _ObservableCounter, + _ObservableGauge, + _ObservableUpDownCounter, + _UpDownCounter, +) +from opentelemetry.sdk.metrics._internal.measurement_consumer import ( + MeasurementConsumer, + SynchronousMeasurementConsumer, +) +from opentelemetry.sdk.metrics._internal.sdk_configuration import ( + SdkConfiguration, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.util._once import Once +from opentelemetry.util.types import ( + Attributes, +) + +_logger = getLogger(__name__) + + +class Meter(APIMeter): + """See `opentelemetry.metrics.Meter`.""" + + def __init__( + self, + instrumentation_scope: InstrumentationScope, + measurement_consumer: MeasurementConsumer, + ): + super().__init__( + name=instrumentation_scope.name, + version=instrumentation_scope.version, + schema_url=instrumentation_scope.schema_url, + ) + self._instrumentation_scope = instrumentation_scope + self._measurement_consumer = measurement_consumer + self._instrument_id_instrument = {} + self._instrument_id_instrument_lock = Lock() + + def create_counter(self, name, unit="", description="") -> APICounter: + status = self._register_instrument(name, _Counter, unit, description) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APICounter.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _Counter( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_up_down_counter( + self, name, unit="", description="" + ) -> APIUpDownCounter: + status = self._register_instrument( + name, _UpDownCounter, unit, description + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIUpDownCounter.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _UpDownCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_observable_counter( + self, + name, + callbacks=None, + unit="", + description="", + ) -> APIObservableCounter: + status = self._register_instrument( + name, _ObservableCounter, unit, description + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIObservableCounter.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _ObservableCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + + self._measurement_consumer.register_asynchronous_instrument(instrument) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_histogram( + self, + name: str, + unit: str = "", + description: str = "", + *, + explicit_bucket_boundaries_advisory: Optional[Sequence[float]] = None, + ) -> APIHistogram: + if explicit_bucket_boundaries_advisory is not None: + invalid_advisory = False + if isinstance(explicit_bucket_boundaries_advisory, Sequence): + try: + invalid_advisory = not ( + all( + isinstance(e, (float, int)) + for e in explicit_bucket_boundaries_advisory + ) + ) + except (KeyError, TypeError): + invalid_advisory = True + else: + invalid_advisory = True + + if invalid_advisory: + explicit_bucket_boundaries_advisory = None + _logger.warning( + "explicit_bucket_boundaries_advisory must be a sequence of numbers" + ) + + status = self._register_instrument( + name, + _Histogram, + unit, + description, + explicit_bucket_boundaries_advisory, + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIHistogram.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _Histogram( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + explicit_bucket_boundaries_advisory, + ) + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_gauge(self, name, unit="", description="") -> APIGauge: + status = self._register_instrument(name, _Gauge, unit, description) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIGauge.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _Gauge( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_observable_gauge( + self, name, callbacks=None, unit="", description="" + ) -> APIObservableGauge: + status = self._register_instrument( + name, _ObservableGauge, unit, description + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIObservableGauge.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _ObservableGauge( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + + self._measurement_consumer.register_asynchronous_instrument(instrument) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_observable_up_down_counter( + self, name, callbacks=None, unit="", description="" + ) -> APIObservableUpDownCounter: + status = self._register_instrument( + name, _ObservableUpDownCounter, unit, description + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIObservableUpDownCounter.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _ObservableUpDownCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + + self._measurement_consumer.register_asynchronous_instrument(instrument) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + +def _get_exemplar_filter(exemplar_filter: str) -> ExemplarFilter: + if exemplar_filter == "trace_based": + return TraceBasedExemplarFilter() + if exemplar_filter == "always_on": + return AlwaysOnExemplarFilter() + if exemplar_filter == "always_off": + return AlwaysOffExemplarFilter() + msg = f"Unknown exemplar filter '{exemplar_filter}'." + raise ValueError(msg) + + +class MeterProvider(APIMeterProvider): + r"""See `opentelemetry.metrics.MeterProvider`. + + Args: + metric_readers: Register metric readers to collect metrics from the SDK + on demand. Each :class:`opentelemetry.sdk.metrics.export.MetricReader` is + completely independent and will collect separate streams of + metrics. TODO: reference ``PeriodicExportingMetricReader`` usage with push + exporters here. + resource: The resource representing what the metrics emitted from the SDK pertain to. + shutdown_on_exit: If true, registers an `atexit` handler to call + `MeterProvider.shutdown` + views: The views to configure the metric output the SDK + + By default, instruments which do not match any :class:`opentelemetry.sdk.metrics.view.View` (or if no :class:`opentelemetry.sdk.metrics.view.View`\ s + are provided) will report metrics with the default aggregation for the + instrument's kind. To disable instruments by default, configure a match-all + :class:`opentelemetry.sdk.metrics.view.View` with `DropAggregation` and then create :class:`opentelemetry.sdk.metrics.view.View`\ s to re-enable + individual instruments: + + .. code-block:: python + :caption: Disable default views + + MeterProvider( + views=[ + View(instrument_name="*", aggregation=DropAggregation()), + View(instrument_name="mycounter"), + ], + # ... + ) + """ + + _all_metric_readers_lock = Lock() + _all_metric_readers = weakref.WeakSet() + + def __init__( + self, + metric_readers: Sequence[ + "opentelemetry.sdk.metrics.export.MetricReader" + ] = (), + resource: Optional[Resource] = None, + exemplar_filter: Optional[ExemplarFilter] = None, + shutdown_on_exit: bool = True, + views: Sequence["opentelemetry.sdk.metrics.view.View"] = (), + ): + self._lock = Lock() + self._meter_lock = Lock() + self._atexit_handler = None + if resource is None: + resource = Resource.create({}) + self._sdk_config = SdkConfiguration( + exemplar_filter=( + exemplar_filter + or _get_exemplar_filter( + environ.get(OTEL_METRICS_EXEMPLAR_FILTER, "trace_based") + ) + ), + resource=resource, + metric_readers=metric_readers, + views=views, + ) + self._measurement_consumer = SynchronousMeasurementConsumer( + sdk_config=self._sdk_config + ) + disabled = environ.get(OTEL_SDK_DISABLED, "") + self._disabled = disabled.lower().strip() == "true" + + if shutdown_on_exit: + self._atexit_handler = register(self.shutdown) + + self._meters = {} + self._shutdown_once = Once() + self._shutdown = False + + for metric_reader in self._sdk_config.metric_readers: + with self._all_metric_readers_lock: + if metric_reader in self._all_metric_readers: + # pylint: disable=broad-exception-raised + raise Exception( + f"MetricReader {metric_reader} has been registered " + "already in other MeterProvider instance" + ) + + self._all_metric_readers.add(metric_reader) + + metric_reader._set_collect_callback( + self._measurement_consumer.collect + ) + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + deadline_ns = time_ns() + timeout_millis * 10**6 + + metric_reader_error = {} + + for metric_reader in self._sdk_config.metric_readers: + current_ts = time_ns() + try: + if current_ts >= deadline_ns: + raise MetricsTimeoutError( + "Timed out while flushing metric readers" + ) + metric_reader.force_flush( + timeout_millis=(deadline_ns - current_ts) / 10**6 + ) + + # pylint: disable=broad-exception-caught + except Exception as error: + metric_reader_error[metric_reader] = error + + if metric_reader_error: + metric_reader_error_string = "\n".join( + [ + f"{metric_reader.__class__.__name__}: {repr(error)}" + for metric_reader, error in metric_reader_error.items() + ] + ) + + # pylint: disable=broad-exception-raised + raise Exception( + "MeterProvider.force_flush failed because the following " + "metric readers failed during collect:\n" + f"{metric_reader_error_string}" + ) + return True + + def shutdown(self, timeout_millis: float = 30_000): + deadline_ns = time_ns() + timeout_millis * 10**6 + + def _shutdown(): + self._shutdown = True + + did_shutdown = self._shutdown_once.do_once(_shutdown) + + if not did_shutdown: + _logger.warning("shutdown can only be called once") + return + + metric_reader_error = {} + + for metric_reader in self._sdk_config.metric_readers: + current_ts = time_ns() + try: + if current_ts >= deadline_ns: + # pylint: disable=broad-exception-raised + raise Exception( + "Didn't get to execute, deadline already exceeded" + ) + metric_reader.shutdown( + timeout_millis=(deadline_ns - current_ts) / 10**6 + ) + + # pylint: disable=broad-exception-caught + except Exception as error: + metric_reader_error[metric_reader] = error + + if self._atexit_handler is not None: + unregister(self._atexit_handler) + self._atexit_handler = None + + if metric_reader_error: + metric_reader_error_string = "\n".join( + [ + f"{metric_reader.__class__.__name__}: {repr(error)}" + for metric_reader, error in metric_reader_error.items() + ] + ) + + # pylint: disable=broad-exception-raised + raise Exception( + ( + "MeterProvider.shutdown failed because the following " + "metric readers failed during shutdown:\n" + f"{metric_reader_error_string}" + ) + ) + + def get_meter( + self, + name: str, + version: Optional[str] = None, + schema_url: Optional[str] = None, + attributes: Optional[Attributes] = None, + ) -> Meter: + if self._disabled: + return NoOpMeter(name, version=version, schema_url=schema_url) + + if self._shutdown: + _logger.warning( + "A shutdown `MeterProvider` can not provide a `Meter`" + ) + return NoOpMeter(name, version=version, schema_url=schema_url) + + if not name: + _logger.warning("Meter name cannot be None or empty.") + return NoOpMeter(name, version=version, schema_url=schema_url) + + info = InstrumentationScope(name, version, schema_url, attributes) + with self._meter_lock: + if not self._meters.get(info): + # FIXME #2558 pass SDKConfig object to meter so that the meter + # has access to views. + self._meters[info] = Meter( + info, + self._measurement_consumer, + ) + return self._meters[info] diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py new file mode 100644 index 00000000..be81d70e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py @@ -0,0 +1,153 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from logging import getLogger +from threading import Lock +from time import time_ns +from typing import Dict, List, Optional, Sequence + +from opentelemetry.metrics import Instrument +from opentelemetry.sdk.metrics._internal.aggregation import ( + Aggregation, + DefaultAggregation, + _Aggregation, + _SumAggregation, +) +from opentelemetry.sdk.metrics._internal.export import AggregationTemporality +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.point import DataPointT +from opentelemetry.sdk.metrics._internal.view import View + +_logger = getLogger(__name__) + + +class _ViewInstrumentMatch: + def __init__( + self, + view: View, + instrument: Instrument, + instrument_class_aggregation: Dict[type, Aggregation], + ): + self._view = view + self._instrument = instrument + self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} + self._lock = Lock() + self._instrument_class_aggregation = instrument_class_aggregation + self._name = self._view._name or self._instrument.name + self._description = ( + self._view._description or self._instrument.description + ) + if not isinstance(self._view._aggregation, DefaultAggregation): + self._aggregation = self._view._aggregation._create_aggregation( + self._instrument, + None, + self._view._exemplar_reservoir_factory, + 0, + ) + else: + self._aggregation = self._instrument_class_aggregation[ + self._instrument.__class__ + ]._create_aggregation( + self._instrument, + None, + self._view._exemplar_reservoir_factory, + 0, + ) + + def conflicts(self, other: "_ViewInstrumentMatch") -> bool: + # pylint: disable=protected-access + + result = ( + self._name == other._name + and self._instrument.unit == other._instrument.unit + # The aggregation class is being used here instead of data point + # type since they are functionally equivalent. + and self._aggregation.__class__ == other._aggregation.__class__ + ) + if isinstance(self._aggregation, _SumAggregation): + result = ( + result + and self._aggregation._instrument_is_monotonic + == other._aggregation._instrument_is_monotonic + and self._aggregation._instrument_aggregation_temporality + == other._aggregation._instrument_aggregation_temporality + ) + + return result + + # pylint: disable=protected-access + def consume_measurement( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + if self._view._attribute_keys is not None: + attributes = {} + + for key, value in (measurement.attributes or {}).items(): + if key in self._view._attribute_keys: + attributes[key] = value + elif measurement.attributes is not None: + attributes = measurement.attributes + else: + attributes = {} + + aggr_key = frozenset(attributes.items()) + + if aggr_key not in self._attributes_aggregation: + with self._lock: + if aggr_key not in self._attributes_aggregation: + if not isinstance( + self._view._aggregation, DefaultAggregation + ): + aggregation = ( + self._view._aggregation._create_aggregation( + self._instrument, + attributes, + self._view._exemplar_reservoir_factory, + time_ns(), + ) + ) + else: + aggregation = self._instrument_class_aggregation[ + self._instrument.__class__ + ]._create_aggregation( + self._instrument, + attributes, + self._view._exemplar_reservoir_factory, + time_ns(), + ) + self._attributes_aggregation[aggr_key] = aggregation + + self._attributes_aggregation[aggr_key].aggregate( + measurement, should_sample_exemplar + ) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nanos: int, + ) -> Optional[Sequence[DataPointT]]: + data_points: List[DataPointT] = [] + with self._lock: + for aggregation in self._attributes_aggregation.values(): + data_point = aggregation.collect( + collection_aggregation_temporality, collection_start_nanos + ) + if data_point is not None: + data_points.append(data_point) + + # Returning here None instead of an empty list because the caller + # does not consume a sequence and to be consistent with the rest of + # collect methods that also return None. + return data_points or None diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py new file mode 100644 index 00000000..8443d951 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -0,0 +1,1475 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=too-many-lines + +from abc import ABC, abstractmethod +from bisect import bisect_left +from enum import IntEnum +from functools import partial +from logging import getLogger +from math import inf +from threading import Lock +from typing import ( + Callable, + Generic, + List, + Optional, + Sequence, + Type, + TypeVar, +) + +from opentelemetry.metrics import ( + Asynchronous, + Counter, + Histogram, + Instrument, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + Synchronous, + UpDownCounter, + _Gauge, +) +from opentelemetry.sdk.metrics._internal.exemplar import ( + Exemplar, + ExemplarReservoirBuilder, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import ( + Buckets, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( + Mapping, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import ( + ExponentMapping, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.logarithm_mapping import ( + LogarithmMapping, +) +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.point import Buckets as BucketsPoint +from opentelemetry.sdk.metrics._internal.point import ( + ExponentialHistogramDataPoint, + HistogramDataPoint, + NumberDataPoint, + Sum, +) +from opentelemetry.sdk.metrics._internal.point import Gauge as GaugePoint +from opentelemetry.sdk.metrics._internal.point import ( + Histogram as HistogramPoint, +) +from opentelemetry.util.types import Attributes + +_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint) + +_logger = getLogger(__name__) + + +class AggregationTemporality(IntEnum): + """ + The temporality to use when aggregating data. + + Can be one of the following values: + """ + + UNSPECIFIED = 0 + DELTA = 1 + CUMULATIVE = 2 + + +class _Aggregation(ABC, Generic[_DataPointVarT]): + def __init__( + self, + attributes: Attributes, + reservoir_builder: ExemplarReservoirBuilder, + ): + self._lock = Lock() + self._attributes = attributes + self._reservoir = reservoir_builder() + self._previous_point = None + + @abstractmethod + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + """Aggregate a measurement. + + Args: + measurement: Measurement to aggregate + should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not. + """ + + @abstractmethod + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + pass + + def _collect_exemplars(self) -> Sequence[Exemplar]: + """Returns the collected exemplars. + + Returns: + The exemplars collected by the reservoir + """ + return self._reservoir.collect(self._attributes) + + def _sample_exemplar( + self, measurement: Measurement, should_sample_exemplar: bool + ) -> None: + """Offer the measurement to the exemplar reservoir for sampling. + + It should be called within the each :ref:`aggregate` call. + + Args: + measurement: The new measurement + should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not. + """ + if should_sample_exemplar: + self._reservoir.offer( + measurement.value, + measurement.time_unix_nano, + measurement.attributes, + measurement.context, + ) + + +class _DropAggregation(_Aggregation): + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + pass + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + pass + + +class _SumAggregation(_Aggregation[Sum]): + def __init__( + self, + attributes: Attributes, + instrument_is_monotonic: bool, + instrument_aggregation_temporality: AggregationTemporality, + start_time_unix_nano: int, + reservoir_builder: ExemplarReservoirBuilder, + ): + super().__init__(attributes, reservoir_builder) + + self._start_time_unix_nano = start_time_unix_nano + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + self._instrument_is_monotonic = instrument_is_monotonic + + self._value = None + + self._previous_collection_start_nano = self._start_time_unix_nano + self._previous_value = 0 + + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + with self._lock: + if self._value is None: + self._value = 0 + + self._value = self._value + measurement.value + + self._sample_exemplar(measurement, should_sample_exemplar) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[NumberDataPoint]: + """ + Atomically return a point for the current value of the metric and + reset the aggregation value. + + Synchronous instruments have a method which is called directly with + increments for a given quantity: + + For example, an instrument that counts the amount of passengers in + every vehicle that crosses a certain point in a highway: + + synchronous_instrument.add(2) + collect(...) # 2 passengers are counted + synchronous_instrument.add(3) + collect(...) # 3 passengers are counted + synchronous_instrument.add(1) + collect(...) # 1 passenger is counted + + In this case the instrument aggregation temporality is DELTA because + every value represents an increment to the count, + + Asynchronous instruments have a callback which returns the total value + of a given quantity: + + For example, an instrument that measures the amount of bytes written to + a certain hard drive: + + callback() -> 1352 + collect(...) # 1352 bytes have been written so far + callback() -> 2324 + collect(...) # 2324 bytes have been written so far + callback() -> 4542 + collect(...) # 4542 bytes have been written so far + + In this case the instrument aggregation temporality is CUMULATIVE + because every value represents the total of the measurement. + + There is also the collection aggregation temporality, which is passed + to this method. The collection aggregation temporality defines the + nature of the returned value by this aggregation. + + When the collection aggregation temporality matches the + instrument aggregation temporality, then this method returns the + current value directly: + + synchronous_instrument.add(2) + collect(DELTA) -> 2 + synchronous_instrument.add(3) + collect(DELTA) -> 3 + synchronous_instrument.add(1) + collect(DELTA) -> 1 + + callback() -> 1352 + collect(CUMULATIVE) -> 1352 + callback() -> 2324 + collect(CUMULATIVE) -> 2324 + callback() -> 4542 + collect(CUMULATIVE) -> 4542 + + When the collection aggregation temporality does not match the + instrument aggregation temporality, then a conversion is made. For this + purpose, this aggregation keeps a private attribute, + self._previous_value. + + When the instrument is synchronous: + + self._previous_value is the sum of every previously + collected (delta) value. In this case, the returned (cumulative) value + will be: + + self._previous_value + value + + synchronous_instrument.add(2) + collect(CUMULATIVE) -> 2 + synchronous_instrument.add(3) + collect(CUMULATIVE) -> 5 + synchronous_instrument.add(1) + collect(CUMULATIVE) -> 6 + + Also, as a diagram: + + time -> + + self._previous_value + |-------------| + + value (delta) + |----| + + returned value (cumulative) + |------------------| + + When the instrument is asynchronous: + + self._previous_value is the value of the previously + collected (cumulative) value. In this case, the returned (delta) value + will be: + + value - self._previous_value + + callback() -> 1352 + collect(DELTA) -> 1352 + callback() -> 2324 + collect(DELTA) -> 972 + callback() -> 4542 + collect(DELTA) -> 2218 + + Also, as a diagram: + + time -> + + self._previous_value + |-------------| + + value (cumulative) + |------------------| + + returned value (delta) + |----| + """ + + with self._lock: + value = self._value + self._value = None + + if ( + self._instrument_aggregation_temporality + is AggregationTemporality.DELTA + ): + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) + + if value is None: + return None + + return NumberDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + value=value, + ) + + if value is None: + value = 0 + + self._previous_value = value + self._previous_value + + return NumberDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + value=self._previous_value, + ) + + # This happens when the corresponding instrument for this + # aggregation is asynchronous. + + if value is None: + # This happens when the corresponding instrument callback + # does not produce measurements. + return None + + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + result_value = value - self._previous_value + + self._previous_value = value + + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = collection_start_nano + + return NumberDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + value=result_value, + ) + + return NumberDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + value=value, + ) + + +class _LastValueAggregation(_Aggregation[GaugePoint]): + def __init__( + self, + attributes: Attributes, + reservoir_builder: ExemplarReservoirBuilder, + ): + super().__init__(attributes, reservoir_builder) + self._value = None + + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ): + with self._lock: + self._value = measurement.value + + self._sample_exemplar(measurement, should_sample_exemplar) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + """ + Atomically return a point for the current value of the metric. + """ + with self._lock: + if self._value is None: + return None + value = self._value + self._value = None + + exemplars = self._collect_exemplars() + + return NumberDataPoint( + attributes=self._attributes, + exemplars=exemplars, + start_time_unix_nano=None, + time_unix_nano=collection_start_nano, + value=value, + ) + + +_DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES: Sequence[float] = ( + 0.0, + 5.0, + 10.0, + 25.0, + 50.0, + 75.0, + 100.0, + 250.0, + 500.0, + 750.0, + 1000.0, + 2500.0, + 5000.0, + 7500.0, + 10000.0, +) + + +class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): + def __init__( + self, + attributes: Attributes, + instrument_aggregation_temporality: AggregationTemporality, + start_time_unix_nano: int, + reservoir_builder: ExemplarReservoirBuilder, + boundaries: Optional[Sequence[float]] = None, + record_min_max: bool = True, + ): + if boundaries is None: + boundaries = ( + _DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES + ) + super().__init__( + attributes, + reservoir_builder=partial( + reservoir_builder, boundaries=boundaries + ), + ) + + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + self._start_time_unix_nano = start_time_unix_nano + self._boundaries = tuple(boundaries) + self._record_min_max = record_min_max + + self._value = None + self._min = inf + self._max = -inf + self._sum = 0 + + self._previous_value = None + self._previous_min = inf + self._previous_max = -inf + self._previous_sum = 0 + + self._previous_collection_start_nano = self._start_time_unix_nano + + def _get_empty_bucket_counts(self) -> List[int]: + return [0] * (len(self._boundaries) + 1) + + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + with self._lock: + if self._value is None: + self._value = self._get_empty_bucket_counts() + + measurement_value = measurement.value + + self._sum += measurement_value + + if self._record_min_max: + self._min = min(self._min, measurement_value) + self._max = max(self._max, measurement_value) + + self._value[bisect_left(self._boundaries, measurement_value)] += 1 + + self._sample_exemplar(measurement, should_sample_exemplar) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + """ + Atomically return a point for the current value of the metric. + """ + + with self._lock: + value = self._value + sum_ = self._sum + min_ = self._min + max_ = self._max + + self._value = None + self._sum = 0 + self._min = inf + self._max = -inf + + if ( + self._instrument_aggregation_temporality + is AggregationTemporality.DELTA + ): + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) + + if value is None: + return None + + return HistogramDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + count=sum(value), + sum=sum_, + bucket_counts=tuple(value), + explicit_bounds=self._boundaries, + min=min_, + max=max_, + ) + + if value is None: + value = self._get_empty_bucket_counts() + + if self._previous_value is None: + self._previous_value = self._get_empty_bucket_counts() + + self._previous_value = [ + value_element + previous_value_element + for ( + value_element, + previous_value_element, + ) in zip(value, self._previous_value) + ] + self._previous_min = min(min_, self._previous_min) + self._previous_max = max(max_, self._previous_max) + self._previous_sum = sum_ + self._previous_sum + + return HistogramDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=sum(self._previous_value), + sum=self._previous_sum, + bucket_counts=tuple(self._previous_value), + explicit_bounds=self._boundaries, + min=self._previous_min, + max=self._previous_max, + ) + + return None + + +# pylint: disable=protected-access +class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): + # _min_max_size and _max_max_size are the smallest and largest values + # the max_size parameter may have, respectively. + + # _min_max_size is is the smallest reasonable value which is small enough + # to contain the entire normal floating point range at the minimum scale. + _min_max_size = 2 + + # _max_max_size is an arbitrary limit meant to limit accidental creation of + # giant exponential bucket histograms. + _max_max_size = 16384 + + def __init__( + self, + attributes: Attributes, + reservoir_builder: ExemplarReservoirBuilder, + instrument_aggregation_temporality: AggregationTemporality, + start_time_unix_nano: int, + # This is the default maximum number of buckets per positive or + # negative number range. The value 160 is specified by OpenTelemetry. + # See the derivation here: + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-bucket-histogram-aggregation) + max_size: int = 160, + max_scale: int = 20, + ): + # max_size is the maximum capacity of the positive and negative + # buckets. + # _sum is the sum of all the values aggregated by this aggregator. + # _count is the count of all calls to aggregate. + # _zero_count is the count of all the calls to aggregate when the value + # to be aggregated is exactly 0. + # _min is the smallest value aggregated by this aggregator. + # _max is the smallest value aggregated by this aggregator. + # _positive holds the positive values. + # _negative holds the negative values by their absolute value. + if max_size < self._min_max_size: + raise ValueError( + f"Buckets max size {max_size} is smaller than " + "minimum max size {self._min_max_size}" + ) + + if max_size > self._max_max_size: + raise ValueError( + f"Buckets max size {max_size} is larger than " + "maximum max size {self._max_max_size}" + ) + if max_scale > 20: + _logger.warning( + "max_scale is set to %s which is " + "larger than the recommended value of 20", + max_scale, + ) + + # This aggregation is analogous to _ExplicitBucketHistogramAggregation, + # the only difference is that with every call to aggregate, the size + # and amount of buckets can change (in + # _ExplicitBucketHistogramAggregation both size and amount of buckets + # remain constant once it is instantiated). + + super().__init__( + attributes, + reservoir_builder=partial( + reservoir_builder, size=min(20, max_size) + ), + ) + + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + self._start_time_unix_nano = start_time_unix_nano + self._max_size = max_size + self._max_scale = max_scale + + self._value_positive = None + self._value_negative = None + self._min = inf + self._max = -inf + self._sum = 0 + self._count = 0 + self._zero_count = 0 + self._scale = None + + self._previous_value_positive = None + self._previous_value_negative = None + self._previous_min = inf + self._previous_max = -inf + self._previous_sum = 0 + self._previous_count = 0 + self._previous_zero_count = 0 + self._previous_scale = None + + self._previous_collection_start_nano = self._start_time_unix_nano + + self._mapping = self._new_mapping(self._max_scale) + + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + # pylint: disable=too-many-branches,too-many-statements, too-many-locals + + with self._lock: + if self._value_positive is None: + self._value_positive = Buckets() + if self._value_negative is None: + self._value_negative = Buckets() + + measurement_value = measurement.value + + self._sum += measurement_value + + self._min = min(self._min, measurement_value) + self._max = max(self._max, measurement_value) + + self._count += 1 + + if measurement_value == 0: + self._zero_count += 1 + + if self._count == self._zero_count: + self._scale = 0 + + return + + if measurement_value > 0: + value = self._value_positive + + else: + measurement_value = -measurement_value + value = self._value_negative + + # The following code finds out if it is necessary to change the + # buckets to hold the incoming measurement_value, changes them if + # necessary. This process does not exist in + # _ExplicitBucketHistogram aggregation because the buckets there + # are constant in size and amount. + index = self._mapping.map_to_index(measurement_value) + + is_rescaling_needed = False + low, high = 0, 0 + + if len(value) == 0: + value.index_start = index + value.index_end = index + value.index_base = index + + elif ( + index < value.index_start + and (value.index_end - index) >= self._max_size + ): + is_rescaling_needed = True + low = index + high = value.index_end + + elif ( + index > value.index_end + and (index - value.index_start) >= self._max_size + ): + is_rescaling_needed = True + low = value.index_start + high = index + + if is_rescaling_needed: + scale_change = self._get_scale_change(low, high) + self._downscale( + scale_change, + self._value_positive, + self._value_negative, + ) + self._mapping = self._new_mapping( + self._mapping.scale - scale_change + ) + + index = self._mapping.map_to_index(measurement_value) + + self._scale = self._mapping.scale + + if index < value.index_start: + span = value.index_end - index + + if span >= len(value.counts): + value.grow(span + 1, self._max_size) + + value.index_start = index + + elif index > value.index_end: + span = index - value.index_start + + if span >= len(value.counts): + value.grow(span + 1, self._max_size) + + value.index_end = index + + bucket_index = index - value.index_base + + if bucket_index < 0: + bucket_index += len(value.counts) + + # Now the buckets have been changed if needed and bucket_index will + # be used to increment the counter of the bucket that needs to be + # incremented. + + # This is analogous to + # self._value[bisect_left(self._boundaries, measurement_value)] += 1 + # in _ExplicitBucketHistogramAggregation.aggregate + value.increment_bucket(bucket_index) + + self._sample_exemplar(measurement, should_sample_exemplar) + + def collect( + self, + collection_aggregation_temporality: AggregationTemporality, + collection_start_nano: int, + ) -> Optional[_DataPointVarT]: + """ + Atomically return a point for the current value of the metric. + """ + + # pylint: disable=too-many-statements, too-many-locals + with self._lock: + value_positive = self._value_positive + value_negative = self._value_negative + sum_ = self._sum + min_ = self._min + max_ = self._max + count = self._count + zero_count = self._zero_count + scale = self._scale + + self._value_positive = None + self._value_negative = None + self._sum = 0 + self._min = inf + self._max = -inf + self._count = 0 + self._zero_count = 0 + self._scale = None + + if ( + self._instrument_aggregation_temporality + is AggregationTemporality.DELTA + ): + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) + + if value_positive is None and value_negative is None: + return None + + return ExponentialHistogramDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + count=count, + sum=sum_, + scale=scale, + zero_count=zero_count, + positive=BucketsPoint( + offset=value_positive.offset, + bucket_counts=(value_positive.get_offset_counts()), + ), + negative=BucketsPoint( + offset=value_negative.offset, + bucket_counts=(value_negative.get_offset_counts()), + ), + # FIXME: Find the right value for flags + flags=0, + min=min_, + max=max_, + ) + + # Here collection_temporality is CUMULATIVE. + # instrument_temporality is always DELTA for the time being. + # Here we need to handle the case where: + # collect is called after at least one other call to collect + # (there is data in previous buckets, a call to merge is needed + # to handle possible differences in bucket sizes). + # collect is called without another call previous call to + # collect was made (there is no previous buckets, previous, + # empty buckets that are the same scale of the current buckets + # need to be made so that they can be cumulatively aggregated + # to the current buckets). + + if ( + value_positive is None + and self._previous_value_positive is None + ): + # This happens if collect is called for the first time + # and aggregate has not yet been called. + value_positive = Buckets() + self._previous_value_positive = value_positive.copy_empty() + if ( + value_negative is None + and self._previous_value_negative is None + ): + value_negative = Buckets() + self._previous_value_negative = value_negative.copy_empty() + if scale is None and self._previous_scale is None: + scale = self._mapping.scale + self._previous_scale = scale + + if ( + value_positive is not None + and self._previous_value_positive is None + ): + # This happens when collect is called the very first time + # and aggregate has been called before. + + # We need previous buckets to add them to the current ones. + # When collect is called for the first time, there are no + # previous buckets, so we need to create empty buckets to + # add them to the current ones. The addition of empty + # buckets to the current ones will result in the current + # ones unchanged. + + # The way the previous buckets are generated here is + # different from the explicit bucket histogram where + # the size and amount of the buckets does not change once + # they are instantiated. Here, the size and amount of the + # buckets can change with every call to aggregate. In order + # to get empty buckets that can be added to the current + # ones resulting in the current ones unchanged we need to + # generate empty buckets that have the same size and amount + # as the current ones, this is what copy_empty does. + self._previous_value_positive = value_positive.copy_empty() + if ( + value_negative is not None + and self._previous_value_negative is None + ): + self._previous_value_negative = value_negative.copy_empty() + if scale is not None and self._previous_scale is None: + self._previous_scale = scale + + if ( + value_positive is None + and self._previous_value_positive is not None + ): + value_positive = self._previous_value_positive.copy_empty() + if ( + value_negative is None + and self._previous_value_negative is not None + ): + value_negative = self._previous_value_negative.copy_empty() + if scale is None and self._previous_scale is not None: + scale = self._previous_scale + + min_scale = min(self._previous_scale, scale) + + low_positive, high_positive = ( + self._get_low_high_previous_current( + self._previous_value_positive, + value_positive, + scale, + min_scale, + ) + ) + low_negative, high_negative = ( + self._get_low_high_previous_current( + self._previous_value_negative, + value_negative, + scale, + min_scale, + ) + ) + + min_scale = min( + min_scale + - self._get_scale_change(low_positive, high_positive), + min_scale + - self._get_scale_change(low_negative, high_negative), + ) + + self._downscale( + self._previous_scale - min_scale, + self._previous_value_positive, + self._previous_value_negative, + ) + + # self._merge adds the values from value to + # self._previous_value, this is analogous to + # self._previous_value = [ + # value_element + previous_value_element + # for ( + # value_element, + # previous_value_element, + # ) in zip(value, self._previous_value) + # ] + # in _ExplicitBucketHistogramAggregation.collect. + self._merge( + self._previous_value_positive, + value_positive, + scale, + min_scale, + collection_aggregation_temporality, + ) + self._merge( + self._previous_value_negative, + value_negative, + scale, + min_scale, + collection_aggregation_temporality, + ) + + self._previous_min = min(min_, self._previous_min) + self._previous_max = max(max_, self._previous_max) + self._previous_sum = sum_ + self._previous_sum + self._previous_count = count + self._previous_count + self._previous_zero_count = ( + zero_count + self._previous_zero_count + ) + self._previous_scale = min_scale + + return ExponentialHistogramDataPoint( + attributes=self._attributes, + exemplars=self._collect_exemplars(), + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=self._previous_count, + sum=self._previous_sum, + scale=self._previous_scale, + zero_count=self._previous_zero_count, + positive=BucketsPoint( + offset=self._previous_value_positive.offset, + bucket_counts=( + self._previous_value_positive.get_offset_counts() + ), + ), + negative=BucketsPoint( + offset=self._previous_value_negative.offset, + bucket_counts=( + self._previous_value_negative.get_offset_counts() + ), + ), + # FIXME: Find the right value for flags + flags=0, + min=self._previous_min, + max=self._previous_max, + ) + + return None + + def _get_low_high_previous_current( + self, + previous_point_buckets, + current_point_buckets, + current_scale, + min_scale, + ): + (previous_point_low, previous_point_high) = self._get_low_high( + previous_point_buckets, self._previous_scale, min_scale + ) + (current_point_low, current_point_high) = self._get_low_high( + current_point_buckets, current_scale, min_scale + ) + + if current_point_low > current_point_high: + low = previous_point_low + high = previous_point_high + + elif previous_point_low > previous_point_high: + low = current_point_low + high = current_point_high + + else: + low = min(previous_point_low, current_point_low) + high = max(previous_point_high, current_point_high) + + return low, high + + @staticmethod + def _get_low_high(buckets, scale, min_scale): + if buckets.counts == [0]: + return 0, -1 + + shift = scale - min_scale + + return buckets.index_start >> shift, buckets.index_end >> shift + + @staticmethod + def _new_mapping(scale: int) -> Mapping: + if scale <= 0: + return ExponentMapping(scale) + return LogarithmMapping(scale) + + def _get_scale_change(self, low, high): + change = 0 + + while high - low >= self._max_size: + high = high >> 1 + low = low >> 1 + + change += 1 + + return change + + @staticmethod + def _downscale(change: int, positive, negative): + if change == 0: + return + + if change < 0: + # pylint: disable=broad-exception-raised + raise Exception("Invalid change of scale") + + positive.downscale(change) + negative.downscale(change) + + def _merge( + self, + previous_buckets: Buckets, + current_buckets: Buckets, + current_scale, + min_scale, + aggregation_temporality, + ): + current_change = current_scale - min_scale + + for current_bucket_index, current_bucket in enumerate( + current_buckets.counts + ): + if current_bucket == 0: + continue + + # Not considering the case where len(previous_buckets) == 0. This + # would not happen because self._previous_point is only assigned to + # an ExponentialHistogramDataPoint object if self._count != 0. + + current_index = current_buckets.index_base + current_bucket_index + if current_index > current_buckets.index_end: + current_index -= len(current_buckets.counts) + + index = current_index >> current_change + + if index < previous_buckets.index_start: + span = previous_buckets.index_end - index + + if span >= self._max_size: + # pylint: disable=broad-exception-raised + raise Exception("Incorrect merge scale") + + if span >= len(previous_buckets.counts): + previous_buckets.grow(span + 1, self._max_size) + + previous_buckets.index_start = index + + if index > previous_buckets.index_end: + span = index - previous_buckets.index_start + + if span >= self._max_size: + # pylint: disable=broad-exception-raised + raise Exception("Incorrect merge scale") + + if span >= len(previous_buckets.counts): + previous_buckets.grow(span + 1, self._max_size) + + previous_buckets.index_end = index + + bucket_index = index - previous_buckets.index_base + + if bucket_index < 0: + bucket_index += len(previous_buckets.counts) + + if aggregation_temporality is AggregationTemporality.DELTA: + current_bucket = -current_bucket + + previous_buckets.increment_bucket( + bucket_index, increment=current_bucket + ) + + +class Aggregation(ABC): + """ + Base class for all aggregation types. + """ + + @abstractmethod + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + """Creates an aggregation""" + + +class DefaultAggregation(Aggregation): + """ + The default aggregation to be used in a `View`. + + This aggregation will create an actual aggregation depending on the + instrument type, as specified next: + + ==================================================== ==================================== + Instrument Aggregation + ==================================================== ==================================== + `opentelemetry.sdk.metrics.Counter` `SumAggregation` + `opentelemetry.sdk.metrics.UpDownCounter` `SumAggregation` + `opentelemetry.sdk.metrics.ObservableCounter` `SumAggregation` + `opentelemetry.sdk.metrics.ObservableUpDownCounter` `SumAggregation` + `opentelemetry.sdk.metrics.Histogram` `ExplicitBucketHistogramAggregation` + `opentelemetry.sdk.metrics.ObservableGauge` `LastValueAggregation` + ==================================================== ==================================== + """ + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + # pylint: disable=too-many-return-statements + if isinstance(instrument, Counter): + return _SumAggregation( + attributes, + reservoir_builder=reservoir_factory(_SumAggregation), + instrument_is_monotonic=True, + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), + start_time_unix_nano=start_time_unix_nano, + ) + if isinstance(instrument, UpDownCounter): + return _SumAggregation( + attributes, + reservoir_builder=reservoir_factory(_SumAggregation), + instrument_is_monotonic=False, + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), + start_time_unix_nano=start_time_unix_nano, + ) + + if isinstance(instrument, ObservableCounter): + return _SumAggregation( + attributes, + reservoir_builder=reservoir_factory(_SumAggregation), + instrument_is_monotonic=True, + instrument_aggregation_temporality=( + AggregationTemporality.CUMULATIVE + ), + start_time_unix_nano=start_time_unix_nano, + ) + + if isinstance(instrument, ObservableUpDownCounter): + return _SumAggregation( + attributes, + reservoir_builder=reservoir_factory(_SumAggregation), + instrument_is_monotonic=False, + instrument_aggregation_temporality=( + AggregationTemporality.CUMULATIVE + ), + start_time_unix_nano=start_time_unix_nano, + ) + + if isinstance(instrument, Histogram): + boundaries = instrument._advisory.explicit_bucket_boundaries + return _ExplicitBucketHistogramAggregation( + attributes, + reservoir_builder=reservoir_factory( + _ExplicitBucketHistogramAggregation + ), + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), + boundaries=boundaries, + start_time_unix_nano=start_time_unix_nano, + ) + + if isinstance(instrument, ObservableGauge): + return _LastValueAggregation( + attributes, + reservoir_builder=reservoir_factory(_LastValueAggregation), + ) + + if isinstance(instrument, _Gauge): + return _LastValueAggregation( + attributes, + reservoir_builder=reservoir_factory(_LastValueAggregation), + ) + + # pylint: disable=broad-exception-raised + raise Exception(f"Invalid instrument type {type(instrument)} found") + + +class ExponentialBucketHistogramAggregation(Aggregation): + def __init__( + self, + max_size: int = 160, + max_scale: int = 20, + ): + self._max_size = max_size + self._max_scale = max_scale + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + instrument_aggregation_temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) + + return _ExponentialBucketHistogramAggregation( + attributes, + reservoir_factory(_ExponentialBucketHistogramAggregation), + instrument_aggregation_temporality, + start_time_unix_nano, + max_size=self._max_size, + max_scale=self._max_scale, + ) + + +class ExplicitBucketHistogramAggregation(Aggregation): + """This aggregation informs the SDK to collect: + + - Count of Measurement values falling within explicit bucket boundaries. + - Arithmetic sum of Measurement values in population. This SHOULD NOT be collected when used with instruments that record negative measurements, e.g. UpDownCounter or ObservableGauge. + - Min (optional) Measurement value in population. + - Max (optional) Measurement value in population. + + + Args: + boundaries: Array of increasing values representing explicit bucket boundary values. + record_min_max: Whether to record min and max. + """ + + def __init__( + self, + boundaries: Optional[Sequence[float]] = None, + record_min_max: bool = True, + ) -> None: + self._boundaries = boundaries + self._record_min_max = record_min_max + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + instrument_aggregation_temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) + + if self._boundaries is None: + self._boundaries = ( + instrument._advisory.explicit_bucket_boundaries + or _DEFAULT_EXPLICIT_BUCKET_HISTOGRAM_AGGREGATION_BOUNDARIES + ) + + return _ExplicitBucketHistogramAggregation( + attributes, + instrument_aggregation_temporality, + start_time_unix_nano, + reservoir_factory(_ExplicitBucketHistogramAggregation), + self._boundaries, + self._record_min_max, + ) + + +class SumAggregation(Aggregation): + """This aggregation informs the SDK to collect: + + - The arithmetic sum of Measurement values. + """ + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + instrument_aggregation_temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) + + return _SumAggregation( + attributes, + isinstance(instrument, (Counter, ObservableCounter)), + instrument_aggregation_temporality, + start_time_unix_nano, + reservoir_factory(_SumAggregation), + ) + + +class LastValueAggregation(Aggregation): + """ + This aggregation informs the SDK to collect: + + - The last Measurement. + - The timestamp of the last Measurement. + """ + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + return _LastValueAggregation( + attributes, + reservoir_builder=reservoir_factory(_LastValueAggregation), + ) + + +class DropAggregation(Aggregation): + """Using this aggregation will make all measurements be ignored.""" + + def _create_aggregation( + self, + instrument: Instrument, + attributes: Attributes, + reservoir_factory: Callable[ + [Type[_Aggregation]], ExemplarReservoirBuilder + ], + start_time_unix_nano: int, + ) -> _Aggregation: + return _DropAggregation( + attributes, reservoir_factory(_DropAggregation) + ) diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exceptions.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exceptions.py new file mode 100644 index 00000000..0f8c3a75 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exceptions.py @@ -0,0 +1,17 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class MetricsTimeoutError(Exception): + """Raised when a metrics function times out""" diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py new file mode 100644 index 00000000..ee93dd18 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py @@ -0,0 +1,39 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .exemplar import Exemplar +from .exemplar_filter import ( + AlwaysOffExemplarFilter, + AlwaysOnExemplarFilter, + ExemplarFilter, + TraceBasedExemplarFilter, +) +from .exemplar_reservoir import ( + AlignedHistogramBucketExemplarReservoir, + ExemplarReservoir, + ExemplarReservoirBuilder, + SimpleFixedSizeExemplarReservoir, +) + +__all__ = [ + "Exemplar", + "ExemplarFilter", + "AlwaysOffExemplarFilter", + "AlwaysOnExemplarFilter", + "TraceBasedExemplarFilter", + "AlignedHistogramBucketExemplarReservoir", + "ExemplarReservoir", + "ExemplarReservoirBuilder", + "SimpleFixedSizeExemplarReservoir", +] diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py new file mode 100644 index 00000000..95582e16 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar.py @@ -0,0 +1,50 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +from typing import Optional, Union + +from opentelemetry.util.types import Attributes + + +@dataclasses.dataclass(frozen=True) +class Exemplar: + """A representation of an exemplar, which is a sample input measurement. + + Exemplars also hold information about the environment when the measurement + was recorded, for example the span and trace ID of the active span when the + exemplar was recorded. + + Attributes + trace_id: (optional) The trace associated with a recording + span_id: (optional) The span associated with a recording + time_unix_nano: The time of the observation + value: The recorded value + filtered_attributes: A set of filtered attributes which provide additional insight into the Context when the observation was made. + + References: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#exemplars + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar + """ + + # TODO Fix doc - if using valid Google `Attributes:` key, the attributes are duplicated + # one will come from napoleon extension and the other from autodoc extension. This + # will raise an sphinx error of duplicated object description + # See https://github.com/sphinx-doc/sphinx/issues/8664 + + filtered_attributes: Attributes + value: Union[int, float] + time_unix_nano: int + span_id: Optional[int] = None + trace_id: Optional[int] = None diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py new file mode 100644 index 00000000..8961d101 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py @@ -0,0 +1,134 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Union + +from opentelemetry import trace +from opentelemetry.context import Context +from opentelemetry.trace.span import INVALID_SPAN +from opentelemetry.util.types import Attributes + + +class ExemplarFilter(ABC): + """``ExemplarFilter`` determines which measurements are eligible for becoming an + ``Exemplar``. + + Exemplar filters are used to filter measurements before attempting to store them + in a reservoir. + + Reference: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarfilter + """ + + @abstractmethod + def should_sample( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> bool: + """Returns whether or not a reservoir should attempt to filter a measurement. + + Args: + value: The value of the measurement + timestamp: A timestamp that best represents when the measurement was taken + attributes: The complete set of measurement attributes + context: The Context of the measurement + """ + raise NotImplementedError( + "ExemplarFilter.should_sample is not implemented" + ) + + +class AlwaysOnExemplarFilter(ExemplarFilter): + """An ExemplarFilter which makes all measurements eligible for being an Exemplar. + + Reference: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alwayson + """ + + def should_sample( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> bool: + """Returns whether or not a reservoir should attempt to filter a measurement. + + Args: + value: The value of the measurement + timestamp: A timestamp that best represents when the measurement was taken + attributes: The complete set of measurement attributes + context: The Context of the measurement + """ + return True + + +class AlwaysOffExemplarFilter(ExemplarFilter): + """An ExemplarFilter which makes no measurements eligible for being an Exemplar. + + Using this ExemplarFilter is as good as disabling Exemplar feature. + + Reference: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alwaysoff + """ + + def should_sample( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> bool: + """Returns whether or not a reservoir should attempt to filter a measurement. + + Args: + value: The value of the measurement + timestamp: A timestamp that best represents when the measurement was taken + attributes: The complete set of measurement attributes + context: The Context of the measurement + """ + return False + + +class TraceBasedExemplarFilter(ExemplarFilter): + """An ExemplarFilter which makes those measurements eligible for being an Exemplar, + which are recorded in the context of a sampled parent span. + + Reference: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#tracebased + """ + + def should_sample( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> bool: + """Returns whether or not a reservoir should attempt to filter a measurement. + + Args: + value: The value of the measurement + timestamp: A timestamp that best represents when the measurement was taken + attributes: The complete set of measurement attributes + context: The Context of the measurement + """ + span = trace.get_current_span(context) + if span == INVALID_SPAN: + return False + return span.get_span_context().trace_flags.sampled diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py new file mode 100644 index 00000000..22d1ee9f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py @@ -0,0 +1,332 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from collections import defaultdict +from random import randrange +from typing import ( + Any, + Callable, + Dict, + List, + Mapping, + Optional, + Sequence, + Union, +) + +from opentelemetry import trace +from opentelemetry.context import Context +from opentelemetry.trace.span import INVALID_SPAN +from opentelemetry.util.types import Attributes + +from .exemplar import Exemplar + + +class ExemplarReservoir(ABC): + """ExemplarReservoir provide a method to offer measurements to the reservoir + and another to collect accumulated Exemplars. + + Note: + The constructor MUST accept ``**kwargs`` that may be set from aggregation + parameters. + + Reference: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarreservoir + """ + + @abstractmethod + def offer( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> None: + """Offers a measurement to be sampled. + + Args: + value: Measured value + time_unix_nano: Measurement instant + attributes: Measurement attributes + context: Measurement context + """ + raise NotImplementedError("ExemplarReservoir.offer is not implemented") + + @abstractmethod + def collect(self, point_attributes: Attributes) -> List[Exemplar]: + """Returns accumulated Exemplars and also resets the reservoir for the next + sampling period + + Args: + point_attributes: The attributes associated with metric point. + + Returns: + a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned + exemplars contain the attributes that were filtered out by the aggregator, + but recorded alongside the original measurement. + """ + raise NotImplementedError( + "ExemplarReservoir.collect is not implemented" + ) + + +class ExemplarBucket: + def __init__(self) -> None: + self.__value: Union[int, float] = 0 + self.__attributes: Attributes = None + self.__time_unix_nano: int = 0 + self.__span_id: Optional[int] = None + self.__trace_id: Optional[int] = None + self.__offered: bool = False + + def offer( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> None: + """Offers a measurement to be sampled. + + Args: + value: Measured value + time_unix_nano: Measurement instant + attributes: Measurement attributes + context: Measurement context + """ + self.__value = value + self.__time_unix_nano = time_unix_nano + self.__attributes = attributes + span = trace.get_current_span(context) + if span != INVALID_SPAN: + span_context = span.get_span_context() + self.__span_id = span_context.span_id + self.__trace_id = span_context.trace_id + + self.__offered = True + + def collect(self, point_attributes: Attributes) -> Optional[Exemplar]: + """May return an Exemplar and resets the bucket for the next sampling period.""" + if not self.__offered: + return None + + # filters out attributes from the measurement that are already included in the metric data point + # See the specification for more details: + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar + filtered_attributes = ( + { + k: v + for k, v in self.__attributes.items() + if k not in point_attributes + } + if self.__attributes + else None + ) + + exemplar = Exemplar( + filtered_attributes, + self.__value, + self.__time_unix_nano, + self.__span_id, + self.__trace_id, + ) + self.__reset() + return exemplar + + def __reset(self) -> None: + """Reset the bucket state after a collection cycle.""" + self.__value = 0 + self.__attributes = {} + self.__time_unix_nano = 0 + self.__span_id = None + self.__trace_id = None + self.__offered = False + + +class BucketIndexError(ValueError): + """An exception raised when the bucket index cannot be found.""" + + +class FixedSizeExemplarReservoirABC(ExemplarReservoir): + """Abstract class for a reservoir with fixed size.""" + + def __init__(self, size: int, **kwargs) -> None: + super().__init__(**kwargs) + self._size: int = size + self._reservoir_storage: Mapping[int, ExemplarBucket] = defaultdict( + ExemplarBucket + ) + + def collect(self, point_attributes: Attributes) -> List[Exemplar]: + """Returns accumulated Exemplars and also resets the reservoir for the next + sampling period + + Args: + point_attributes: The attributes associated with metric point. + + Returns: + a list of ``opentelemetry.sdk.metrics._internal.exemplar.exemplar.Exemplar`` s. Returned + exemplars contain the attributes that were filtered out by the aggregator, + but recorded alongside the original measurement. + """ + exemplars = [ + e + for e in ( + bucket.collect(point_attributes) + for _, bucket in sorted(self._reservoir_storage.items()) + ) + if e is not None + ] + self._reset() + return exemplars + + def offer( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> None: + """Offers a measurement to be sampled. + + Args: + value: Measured value + time_unix_nano: Measurement instant + attributes: Measurement attributes + context: Measurement context + """ + try: + index = self._find_bucket_index( + value, time_unix_nano, attributes, context + ) + + self._reservoir_storage[index].offer( + value, time_unix_nano, attributes, context + ) + except BucketIndexError: + # Ignore invalid bucket index + pass + + @abstractmethod + def _find_bucket_index( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> int: + """Determines the bucket index for the given measurement. + + It should be implemented by subclasses based on specific strategies. + + Args: + value: Measured value + time_unix_nano: Measurement instant + attributes: Measurement attributes + context: Measurement context + + Returns: + The bucket index + + Raises: + BucketIndexError: If no bucket index can be found. + """ + + def _reset(self) -> None: + """Reset the reservoir by resetting any stateful logic after a collection cycle.""" + + +class SimpleFixedSizeExemplarReservoir(FixedSizeExemplarReservoirABC): + """This reservoir uses an uniformly-weighted sampling algorithm based on the number + of samples the reservoir has seen so far to determine if the offered measurements + should be sampled. + + Reference: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir + """ + + def __init__(self, size: int = 1, **kwargs) -> None: + super().__init__(size, **kwargs) + self._measurements_seen: int = 0 + + def _reset(self) -> None: + super()._reset() + self._measurements_seen = 0 + + def _find_bucket_index( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> int: + self._measurements_seen += 1 + if self._measurements_seen < self._size: + return self._measurements_seen - 1 + + index = randrange(0, self._measurements_seen) + if index < self._size: + return index + + raise BucketIndexError("Unable to find the bucket index.") + + +class AlignedHistogramBucketExemplarReservoir(FixedSizeExemplarReservoirABC): + """This Exemplar reservoir takes a configuration parameter that is the + configuration of a Histogram. This implementation keeps the last seen measurement + that falls within a histogram bucket. + + Reference: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alignedhistogrambucketexemplarreservoir + """ + + def __init__(self, boundaries: Sequence[float], **kwargs) -> None: + super().__init__(len(boundaries) + 1, **kwargs) + self._boundaries: Sequence[float] = boundaries + + def offer( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> None: + """Offers a measurement to be sampled.""" + index = self._find_bucket_index( + value, time_unix_nano, attributes, context + ) + self._reservoir_storage[index].offer( + value, time_unix_nano, attributes, context + ) + + def _find_bucket_index( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> int: + for index, boundary in enumerate(self._boundaries): + if value <= boundary: + return index + return len(self._boundaries) + + +ExemplarReservoirBuilder = Callable[[Dict[str, Any]], ExemplarReservoir] +ExemplarReservoirBuilder.__doc__ = """ExemplarReservoir builder. + +It may receive the Aggregation parameters it is bounded to; e.g. +the _ExplicitBucketHistogramAggregation will provide the boundaries. +""" diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/__init__.py diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py new file mode 100644 index 00000000..e8a93326 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py @@ -0,0 +1,190 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from math import ceil, log2 + + +class Buckets: + # No method of this class is protected by locks because instances of this + # class are only used in methods that are protected by locks themselves. + + def __init__(self): + self._counts = [0] + + # The term index refers to the number of the exponential histogram bucket + # used to determine its boundaries. The lower boundary of a bucket is + # determined by base ** index and the upper boundary of a bucket is + # determined by base ** (index + 1). index values are signedto account + # for values less than or equal to 1. + + # self._index_* will all have values equal to a certain index that is + # determined by the corresponding mapping _map_to_index function and + # the value of the index depends on the value passed to _map_to_index. + + # Index of the 0th position in self._counts: self._counts[0] is the + # count in the bucket with index self.__index_base. + self.__index_base = 0 + + # self.__index_start is the smallest index value represented in + # self._counts. + self.__index_start = 0 + + # self.__index_start is the largest index value represented in + # self._counts. + self.__index_end = 0 + + @property + def index_start(self) -> int: + return self.__index_start + + @index_start.setter + def index_start(self, value: int) -> None: + self.__index_start = value + + @property + def index_end(self) -> int: + return self.__index_end + + @index_end.setter + def index_end(self, value: int) -> None: + self.__index_end = value + + @property + def index_base(self) -> int: + return self.__index_base + + @index_base.setter + def index_base(self, value: int) -> None: + self.__index_base = value + + @property + def counts(self): + return self._counts + + def get_offset_counts(self): + bias = self.__index_base - self.__index_start + return self._counts[-bias:] + self._counts[:-bias] + + def grow(self, needed: int, max_size: int) -> None: + size = len(self._counts) + bias = self.__index_base - self.__index_start + old_positive_limit = size - bias + + # 2 ** ceil(log2(needed)) finds the smallest power of two that is larger + # or equal than needed: + # 2 ** ceil(log2(1)) == 1 + # 2 ** ceil(log2(2)) == 2 + # 2 ** ceil(log2(3)) == 4 + # 2 ** ceil(log2(4)) == 4 + # 2 ** ceil(log2(5)) == 8 + # 2 ** ceil(log2(6)) == 8 + # 2 ** ceil(log2(7)) == 8 + # 2 ** ceil(log2(8)) == 8 + new_size = min(2 ** ceil(log2(needed)), max_size) + + new_positive_limit = new_size - bias + + tmp = [0] * new_size + tmp[new_positive_limit:] = self._counts[old_positive_limit:] + tmp[0:old_positive_limit] = self._counts[0:old_positive_limit] + self._counts = tmp + + @property + def offset(self) -> int: + return self.__index_start + + def __len__(self) -> int: + if len(self._counts) == 0: + return 0 + + if self.__index_end == self.__index_start and self[0] == 0: + return 0 + + return self.__index_end - self.__index_start + 1 + + def __getitem__(self, key: int) -> int: + bias = self.__index_base - self.__index_start + + if key < bias: + key += len(self._counts) + + key -= bias + + return self._counts[key] + + def downscale(self, amount: int) -> None: + """ + Rotates, then collapses 2 ** amount to 1 buckets. + """ + + bias = self.__index_base - self.__index_start + + if bias != 0: + self.__index_base = self.__index_start + + # [0, 1, 2, 3, 4] Original backing array + + self._counts = self._counts[::-1] + # [4, 3, 2, 1, 0] + + self._counts = ( + self._counts[:bias][::-1] + self._counts[bias:][::-1] + ) + # [3, 4, 0, 1, 2] This is a rotation of the backing array. + + size = 1 + self.__index_end - self.__index_start + each = 1 << amount + inpos = 0 + outpos = 0 + + pos = self.__index_start + + while pos <= self.__index_end: + mod = pos % each + if mod < 0: + mod += each + + index = mod + + while index < each and inpos < size: + if outpos != inpos: + self._counts[outpos] += self._counts[inpos] + self._counts[inpos] = 0 + + inpos += 1 + pos += 1 + index += 1 + + outpos += 1 + + self.__index_start >>= amount + self.__index_end >>= amount + self.__index_base = self.__index_start + + def increment_bucket(self, bucket_index: int, increment: int = 1) -> None: + self._counts[bucket_index] += increment + + def copy_empty(self) -> "Buckets": + copy = Buckets() + + # pylint: disable=no-member + # pylint: disable=protected-access + # pylint: disable=attribute-defined-outside-init + # pylint: disable=invalid-name + copy._Buckets__index_base = self._Buckets__index_base + copy._Buckets__index_start = self._Buckets__index_start + copy._Buckets__index_end = self._Buckets__index_end + copy._counts = [0 for _ in self._counts] + + return copy diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/__init__.py new file mode 100644 index 00000000..387b1d14 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/__init__.py @@ -0,0 +1,98 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod + + +class Mapping(ABC): + """ + Parent class for `LogarithmMapping` and `ExponentialMapping`. + """ + + # pylint: disable=no-member + def __new__(cls, scale: int): + with cls._mappings_lock: + # cls._mappings and cls._mappings_lock are implemented in each of + # the child classes as a dictionary and a lock, respectively. They + # are not instantiated here because that would lead to both child + # classes having the same instance of cls._mappings and + # cls._mappings_lock. + if scale not in cls._mappings: + cls._mappings[scale] = super().__new__(cls) + cls._mappings[scale]._init(scale) + + return cls._mappings[scale] + + @abstractmethod + def _init(self, scale: int) -> None: + # pylint: disable=attribute-defined-outside-init + + if scale > self._get_max_scale(): + # pylint: disable=broad-exception-raised + raise Exception(f"scale is larger than {self._max_scale}") + + if scale < self._get_min_scale(): + # pylint: disable=broad-exception-raised + raise Exception(f"scale is smaller than {self._min_scale}") + + # The size of the exponential histogram buckets is determined by a + # parameter known as scale, larger values of scale will produce smaller + # buckets. Bucket boundaries of the exponential histogram are located + # at integer powers of the base, where: + # + # base = 2 ** (2 ** (-scale)) + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#all-scales-use-the-logarithm-function + self._scale = scale + + @abstractmethod + def _get_min_scale(self) -> int: + """ + Return the smallest possible value for the mapping scale + """ + + @abstractmethod + def _get_max_scale(self) -> int: + """ + Return the largest possible value for the mapping scale + """ + + @abstractmethod + def map_to_index(self, value: float) -> int: + """ + Maps positive floating point values to indexes corresponding to + `Mapping.scale`. Implementations are not expected to handle zeros, + +inf, NaN, or negative values. + """ + + @abstractmethod + def get_lower_boundary(self, index: int) -> float: + """ + Returns the lower boundary of a given bucket index. The index is + expected to map onto a range that is at least partially inside the + range of normal floating point values. If the corresponding + bucket's upper boundary is less than or equal to 2 ** -1022, + :class:`~opentelemetry.sdk.metrics.MappingUnderflowError` + will be raised. If the corresponding bucket's lower boundary is greater + than ``sys.float_info.max``, + :class:`~opentelemetry.sdk.metrics.MappingOverflowError` + will be raised. + """ + + @property + def scale(self) -> int: + """ + Returns the parameter that controls the resolution of this mapping. + See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#exponential-scale + """ + return self._scale diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/errors.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/errors.py new file mode 100644 index 00000000..477ed6f0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/errors.py @@ -0,0 +1,26 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class MappingUnderflowError(Exception): + """ + Raised when computing the lower boundary of an index that maps into a + denormal floating point value. + """ + + +class MappingOverflowError(Exception): + """ + Raised when computing the lower boundary of an index that maps into +inf. + """ diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py new file mode 100644 index 00000000..297bb7a4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/exponent_mapping.py @@ -0,0 +1,141 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from math import ldexp +from threading import Lock + +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( + Mapping, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.errors import ( + MappingOverflowError, + MappingUnderflowError, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.ieee_754 import ( + MANTISSA_WIDTH, + MAX_NORMAL_EXPONENT, + MIN_NORMAL_EXPONENT, + MIN_NORMAL_VALUE, + get_ieee_754_exponent, + get_ieee_754_mantissa, +) + + +class ExponentMapping(Mapping): + # Reference implementation here: + # https://github.com/open-telemetry/opentelemetry-go/blob/0e6f9c29c10d6078e8131418e1d1d166c7195d61/sdk/metric/aggregator/exponential/mapping/exponent/exponent.go + + _mappings = {} + _mappings_lock = Lock() + + _min_scale = -10 + _max_scale = 0 + + def _get_min_scale(self): + # _min_scale defines the point at which the exponential mapping + # function becomes useless for 64-bit floats. With scale -10, ignoring + # subnormal values, bucket indices range from -1 to 1. + return -10 + + def _get_max_scale(self): + # _max_scale is the largest scale supported by exponential mapping. Use + # a logarithm mapping for larger scales. + return 0 + + def _init(self, scale: int): + # pylint: disable=attribute-defined-outside-init + + super()._init(scale) + + # self._min_normal_lower_boundary_index is the largest index such that + # base ** index < MIN_NORMAL_VALUE and + # base ** (index + 1) >= MIN_NORMAL_VALUE. An exponential histogram + # bucket with this index covers the range + # (base ** index, base (index + 1)], including MIN_NORMAL_VALUE. This + # is the smallest valid index that contains at least one normal value. + index = MIN_NORMAL_EXPONENT >> -self._scale + + if -self._scale < 2: + # For scales -1 and 0, the maximum value 2 ** -1022 is a + # power-of-two multiple, meaning base ** index == MIN_NORMAL_VALUE. + # Subtracting 1 so that base ** (index + 1) == MIN_NORMAL_VALUE. + index -= 1 + + self._min_normal_lower_boundary_index = index + + # self._max_normal_lower_boundary_index is the index such that + # base**index equals the greatest representable lower boundary. An + # exponential histogram bucket with this index covers the range + # ((2 ** 1024) / base, 2 ** 1024], which includes opentelemetry.sdk. + # metrics._internal.exponential_histogram.ieee_754.MAX_NORMAL_VALUE. + # This bucket is incomplete, since the upper boundary cannot be + # represented. One greater than this index corresponds with the bucket + # containing values > 2 ** 1024. + self._max_normal_lower_boundary_index = ( + MAX_NORMAL_EXPONENT >> -self._scale + ) + + def map_to_index(self, value: float) -> int: + if value < MIN_NORMAL_VALUE: + return self._min_normal_lower_boundary_index + + exponent = get_ieee_754_exponent(value) + + # Positive integers are represented in binary as having an infinite + # amount of leading zeroes, for example 2 is represented as ...00010. + + # A negative integer -x is represented in binary as the complement of + # (x - 1). For example, -4 is represented as the complement of 4 - 1 + # == 3. 3 is represented as ...00011. Its compliment is ...11100, the + # binary representation of -4. + + # get_ieee_754_mantissa(value) gets the positive integer made up + # from the rightmost MANTISSA_WIDTH bits (the mantissa) of the IEEE + # 754 representation of value. If value is an exact power of 2, all + # these MANTISSA_WIDTH bits would be all zeroes, and when 1 is + # subtracted the resulting value is -1. The binary representation of + # -1 is ...111, so when these bits are right shifted MANTISSA_WIDTH + # places, the resulting value for correction is -1. If value is not an + # exact power of 2, at least one of the rightmost MANTISSA_WIDTH + # bits would be 1 (even for values whose decimal part is 0, like 5.0 + # since the IEEE 754 of such number is too the product of a power of 2 + # (defined in the exponent part of the IEEE 754 representation) and the + # value defined in the mantissa). Having at least one of the rightmost + # MANTISSA_WIDTH bit being 1 means that get_ieee_754(value) will + # always be greater or equal to 1, and when 1 is subtracted, the + # result will be greater or equal to 0, whose representation in binary + # will be of at most MANTISSA_WIDTH ones that have an infinite + # amount of leading zeroes. When those MANTISSA_WIDTH bits are + # shifted to the right MANTISSA_WIDTH places, the resulting value + # will be 0. + + # In summary, correction will be -1 if value is a power of 2, 0 if not. + + # FIXME Document why we can assume value will not be 0, inf, or NaN. + correction = (get_ieee_754_mantissa(value) - 1) >> MANTISSA_WIDTH + + return (exponent + correction) >> -self._scale + + def get_lower_boundary(self, index: int) -> float: + if index < self._min_normal_lower_boundary_index: + raise MappingUnderflowError() + + if index > self._max_normal_lower_boundary_index: + raise MappingOverflowError() + + return ldexp(1, index << -self._scale) + + @property + def scale(self) -> int: + return self._scale diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/ieee_754.md b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/ieee_754.md new file mode 100644 index 00000000..0cf5c8c5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/ieee_754.md @@ -0,0 +1,175 @@ +# IEEE 754 Explained + +IEEE 754 is a standard that defines a way to represent certain mathematical +objects using binary numbers. + +## Binary Number Fields + +The binary numbers used in IEEE 754 can have different lengths, the length that +is interesting for the purposes of this project is 64 bits. These binary +numbers are made up of 3 contiguous fields of bits, from left to right: + +1. 1 sign bit +2. 11 exponent bits +3. 52 mantissa bits + +Depending on the values these fields have, the represented mathematical object +can be one of: + +* Floating point number +* Zero +* NaN +* Infinite + +## Floating Point Numbers + +IEEE 754 represents a floating point number $f$ using an exponential +notation with 4 components: $sign$, $mantissa$, $base$ and $exponent$: + +$$f = sign \times mantissa \times base ^ {exponent}$$ + +There are two possible representations of floating point numbers: +_normal_ and _denormal_, which have different valid values for +their $mantissa$ and $exponent$ fields. + +### Binary Representation + +$sign$, $mantissa$, and $exponent$ are represented in binary, the +representation of each component has certain details explained next. + +$base$ is always $2$ and it is not represented in binary. + +#### Sign + +$sign$ can have 2 values: + +1. $1$ if the `sign` bit is `0` +2. $-1$ if the `sign` bit is `1`. + +#### Mantissa + +##### Normal Floating Point Numbers + +$mantissa$ is a positive fractional number whose integer part is $1$, for example +$1.2345 \dots$. The `mantissa` bits represent only the fractional part and the +$mantissa$ value can be calculated as: + +$$mantissa = 1 + \sum_{i=1}^{52} b_{i} \times 2^{-i} = 1 + \frac{b_{1}}{2^{1}} + \frac{b_{2}}{2^{2}} + \dots + \frac{b_{51}}{2^{51}} + \frac{b_{52}}{2^{52}}$$ + +Where $b_{i}$ is: + +1. $0$ if the bit at the position `i - 1` is `0`. +2. $1$ if the bit at the position `i - 1` is `1`. + +##### Denormal Floating Point Numbers + +$mantissa$ is a positive fractional number whose integer part is $0$, for example +$0.12345 \dots$. The `mantissa` bits represent only the fractional part and the +$mantissa$ value can be calculated as: + +$$mantissa = \sum_{i=1}^{52} b_{i} \times 2^{-i} = \frac{b_{1}}{2^{1}} + \frac{b_{2}}{2^{2}} + \dots + \frac{b_{51}}{2^{51}} + \frac{b_{52}}{2^{52}}$$ + +Where $b_{i}$ is: + +1. $0$ if the bit at the position `i - 1` is `0`. +2. $1$ if the bit at the position `i - 1` is `1`. + +#### Exponent + +##### Normal Floating Point Numbers + +Only the following bit sequences are allowed: `00000000001` to `11111111110`. +That is, there must be at least one `0` and one `1` in the exponent bits. + +The actual value of the $exponent$ can be calculated as: + +$$exponent = v - bias$$ + +where $v$ is the value of the binary number in the exponent bits and $bias$ is $1023$. +Considering the restrictions above, the respective minimum and maximum values for the +exponent are: + +1. `00000000001` = $1$, $1 - 1023 = -1022$ +2. `11111111110` = $2046$, $2046 - 1023 = 1023$ + +So, $exponent$ is an integer in the range $\left[-1022, 1023\right]$. + + +##### Denormal Floating Point Numbers + +$exponent$ is always $-1022$. Nevertheless, it is always represented as `00000000000`. + +### Normal and Denormal Floating Point Numbers + +The smallest absolute value a normal floating point number can have is calculated +like this: + +$$1 \times 1.0\dots0 \times 2^{-1022} = 2.2250738585072014 \times 10^{-308}$$ + +Since normal floating point numbers always have a $1$ as the integer part of the +$mantissa$, then smaller values can be achieved by using the smallest possible exponent +( $-1022$ ) and a $0$ in the integer part of the $mantissa$, but significant digits are lost. + +The smallest absolute value a denormal floating point number can have is calculated +like this: + +$$1 \times 2^{-52} \times 2^{-1022} = 5 \times 10^{-324}$$ + +## Zero + +Zero is represented like this: + +* Sign bit: `X` +* Exponent bits: `00000000000` +* Mantissa bits: `0000000000000000000000000000000000000000000000000000` + +where `X` means `0` or `1`. + +## NaN + +There are 2 kinds of NaNs that are represented: + +1. QNaNs (Quiet NaNs): represent the result of indeterminate operations. +2. SNaNs (Signalling NaNs): represent the result of invalid operations. + +### QNaNs + +QNaNs are represented like this: + +* Sign bit: `X` +* Exponent bits: `11111111111` +* Mantissa bits: `1XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX` + +where `X` means `0` or `1`. + +### SNaNs + +SNaNs are represented like this: + +* Sign bit: `X` +* Exponent bits: `11111111111` +* Mantissa bits: `0XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX1` + +where `X` means `0` or `1`. + +## Infinite + +### Positive Infinite + +Positive infinite is represented like this: + +* Sign bit: `0` +* Exponent bits: `11111111111` +* Mantissa bits: `0000000000000000000000000000000000000000000000000000` + +where `X` means `0` or `1`. + +### Negative Infinite + +Negative infinite is represented like this: + +* Sign bit: `1` +* Exponent bits: `11111111111` +* Mantissa bits: `0000000000000000000000000000000000000000000000000000` + +where `X` means `0` or `1`. diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/ieee_754.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/ieee_754.py new file mode 100644 index 00000000..d4b7e861 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/ieee_754.py @@ -0,0 +1,117 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ctypes import c_double, c_uint64 +from sys import float_info + +# IEEE 754 64-bit floating point numbers use 11 bits for the exponent and 52 +# bits for the mantissa. +MANTISSA_WIDTH = 52 +EXPONENT_WIDTH = 11 + +# This mask is equivalent to 52 "1" bits (there are 13 hexadecimal 4-bit "f"s +# in the mantissa mask, 13 * 4 == 52) or 0xfffffffffffff in hexadecimal. +MANTISSA_MASK = (1 << MANTISSA_WIDTH) - 1 + +# There are 11 bits for the exponent, but the exponent values 0 (11 "0" +# bits) and 2047 (11 "1" bits) have special meanings so the exponent range is +# from 1 to 2046. To calculate the exponent value, 1023 (the bias) is +# subtracted from the exponent, so the exponent value range is from -1022 to +# +1023. +EXPONENT_BIAS = (2 ** (EXPONENT_WIDTH - 1)) - 1 + +# All the exponent mask bits are set to 1 for the 11 exponent bits. +EXPONENT_MASK = ((1 << EXPONENT_WIDTH) - 1) << MANTISSA_WIDTH + +# The sign mask has the first bit set to 1 and the rest to 0. +SIGN_MASK = 1 << (EXPONENT_WIDTH + MANTISSA_WIDTH) + +# For normal floating point numbers, the exponent can have a value in the +# range [-1022, 1023]. +MIN_NORMAL_EXPONENT = -EXPONENT_BIAS + 1 +MAX_NORMAL_EXPONENT = EXPONENT_BIAS + +# The smallest possible normal value is 2.2250738585072014e-308. +# This value is the result of using the smallest possible number in the +# mantissa, 1.0000000000000000000000000000000000000000000000000000 (52 "0"s in +# the fractional part) and a single "1" in the exponent. +# Finally 1 * (2 ** -1022) = 2.2250738585072014e-308. +MIN_NORMAL_VALUE = float_info.min + +# Greatest possible normal value (1.7976931348623157e+308) +# The binary representation of a float in scientific notation uses (for the +# mantissa) one bit for the integer part (which is implicit) and 52 bits for +# the fractional part. Consider a float binary 1.111. It is equal to 1 + 1/2 + +# 1/4 + 1/8. The greatest possible value in the 52-bit binary mantissa would be +# then 1.1111111111111111111111111111111111111111111111111111 (52 "1"s in the +# fractional part) whose decimal value is 1.9999999999999998. Finally, +# 1.9999999999999998 * (2 ** 1023) = 1.7976931348623157e+308. +MAX_NORMAL_VALUE = float_info.max + + +def get_ieee_754_exponent(value: float) -> int: + """ + Gets the exponent of the IEEE 754 representation of a float. + """ + + return ( + ( + # This step gives the integer that corresponds to the IEEE 754 + # representation of a float. For example, consider + # -MAX_NORMAL_VALUE for an example. We choose this value because + # of its binary representation which makes easy to understand the + # subsequent operations. + # + # c_uint64.from_buffer(c_double(-MAX_NORMAL_VALUE)).value == 18442240474082181119 + # bin(18442240474082181119) == '0b1111111111101111111111111111111111111111111111111111111111111111' + # + # The first bit of the previous binary number is the sign bit: 1 (1 means negative, 0 means positive) + # The next 11 bits are the exponent bits: 11111111110 + # The next 52 bits are the mantissa bits: 1111111111111111111111111111111111111111111111111111 + # + # This step isolates the exponent bits, turning every bit outside + # of the exponent field (sign and mantissa bits) to 0. + c_uint64.from_buffer(c_double(value)).value & EXPONENT_MASK + # For the example this means: + # 18442240474082181119 & EXPONENT_MASK == 9214364837600034816 + # bin(9214364837600034816) == '0b111111111100000000000000000000000000000000000000000000000000000' + # Notice that the previous binary representation does not include + # leading zeroes, so the sign bit is not included since it is a + # zero. + ) + # This step moves the exponent bits to the right, removing the + # mantissa bits that were set to 0 by the previous step. This + # leaves the IEEE 754 exponent value, ready for the next step. + >> MANTISSA_WIDTH + # For the example this means: + # 9214364837600034816 >> MANTISSA_WIDTH == 2046 + # bin(2046) == '0b11111111110' + # As shown above, these are the original 11 bits that correspond to the + # exponent. + # This step subtracts the exponent bias from the IEEE 754 value, + # leaving the actual exponent value. + ) - EXPONENT_BIAS + # For the example this means: + # 2046 - EXPONENT_BIAS == 1023 + # As mentioned in a comment above, the largest value for the exponent is + + +def get_ieee_754_mantissa(value: float) -> int: + return ( + c_uint64.from_buffer(c_double(value)).value + # This step isolates the mantissa bits. There is no need to do any + # bit shifting as the mantissa bits are already the rightmost field + # in an IEEE 754 representation. + & MANTISSA_MASK + ) diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py new file mode 100644 index 00000000..e73f3a81 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/exponential_histogram/mapping/logarithm_mapping.py @@ -0,0 +1,138 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from math import exp, floor, ldexp, log +from threading import Lock + +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( + Mapping, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.errors import ( + MappingOverflowError, + MappingUnderflowError, +) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.ieee_754 import ( + MAX_NORMAL_EXPONENT, + MIN_NORMAL_EXPONENT, + MIN_NORMAL_VALUE, + get_ieee_754_exponent, + get_ieee_754_mantissa, +) + + +class LogarithmMapping(Mapping): + # Reference implementation here: + # https://github.com/open-telemetry/opentelemetry-go/blob/0e6f9c29c10d6078e8131418e1d1d166c7195d61/sdk/metric/aggregator/exponential/mapping/logarithm/logarithm.go + + _mappings = {} + _mappings_lock = Lock() + + _min_scale = 1 + _max_scale = 20 + + def _get_min_scale(self): + # _min_scale ensures that ExponentMapping is used for zero and negative + # scale values. + return self._min_scale + + def _get_max_scale(self): + # FIXME The Go implementation uses a value of 20 here, find out the + # right value for this implementation, more information here: + # https://github.com/lightstep/otel-launcher-go/blob/c9ca8483be067a39ab306b09060446e7fda65f35/lightstep/sdk/metric/aggregator/histogram/structure/README.md#mapping-function + # https://github.com/open-telemetry/opentelemetry-go/blob/0e6f9c29c10d6078e8131418e1d1d166c7195d61/sdk/metric/aggregator/exponential/mapping/logarithm/logarithm.go#L32-L45 + return self._max_scale + + def _init(self, scale: int): + # pylint: disable=attribute-defined-outside-init + + super()._init(scale) + + # self._scale_factor is defined as a multiplier because multiplication + # is faster than division. self._scale_factor is defined as: + # index = log(value) * self._scale_factor + # Where: + # index = log(value) / log(base) + # index = log(value) / log(2 ** (2 ** -scale)) + # index = log(value) / ((2 ** -scale) * log(2)) + # index = log(value) * ((1 / log(2)) * (2 ** scale)) + # self._scale_factor = ((1 / log(2)) * (2 ** scale)) + # self._scale_factor = (1 /log(2)) * (2 ** scale) + # self._scale_factor = ldexp(1 / log(2), scale) + # This implementation was copied from a Java prototype. See: + # https://github.com/newrelic-experimental/newrelic-sketch-java/blob/1ce245713603d61ba3a4510f6df930a5479cd3f6/src/main/java/com/newrelic/nrsketch/indexer/LogIndexer.java + # for the equations used here. + self._scale_factor = ldexp(1 / log(2), scale) + + # self._min_normal_lower_boundary_index is the index such that + # base ** index == MIN_NORMAL_VALUE. An exponential histogram bucket + # with this index covers the range + # (MIN_NORMAL_VALUE, MIN_NORMAL_VALUE * base]. One less than this index + # corresponds with the bucket containing values <= MIN_NORMAL_VALUE. + self._min_normal_lower_boundary_index = ( + MIN_NORMAL_EXPONENT << self._scale + ) + + # self._max_normal_lower_boundary_index is the index such that + # base ** index equals the greatest representable lower boundary. An + # exponential histogram bucket with this index covers the range + # ((2 ** 1024) / base, 2 ** 1024], which includes opentelemetry.sdk. + # metrics._internal.exponential_histogram.ieee_754.MAX_NORMAL_VALUE. + # This bucket is incomplete, since the upper boundary cannot be + # represented. One greater than this index corresponds with the bucket + # containing values > 2 ** 1024. + self._max_normal_lower_boundary_index = ( + (MAX_NORMAL_EXPONENT + 1) << self._scale + ) - 1 + + def map_to_index(self, value: float) -> int: + """ + Maps positive floating point values to indexes corresponding to scale. + """ + + # value is subnormal + if value <= MIN_NORMAL_VALUE: + return self._min_normal_lower_boundary_index - 1 + + # value is an exact power of two. + if get_ieee_754_mantissa(value) == 0: + exponent = get_ieee_754_exponent(value) + return (exponent << self._scale) - 1 + + return min( + floor(log(value) * self._scale_factor), + self._max_normal_lower_boundary_index, + ) + + def get_lower_boundary(self, index: int) -> float: + if index >= self._max_normal_lower_boundary_index: + if index == self._max_normal_lower_boundary_index: + return 2 * exp( + (index - (1 << self._scale)) / self._scale_factor + ) + raise MappingOverflowError() + + if index <= self._min_normal_lower_boundary_index: + if index == self._min_normal_lower_boundary_index: + return MIN_NORMAL_VALUE + if index == self._min_normal_lower_boundary_index - 1: + return ( + exp((index + (1 << self._scale)) / self._scale_factor) / 2 + ) + raise MappingUnderflowError() + + return exp(index / self._scale_factor) + + @property + def scale(self) -> int: + return self._scale diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/export/__init__.py new file mode 100644 index 00000000..52c68334 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -0,0 +1,576 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import math +import os +import weakref +from abc import ABC, abstractmethod +from enum import Enum +from logging import getLogger +from os import environ, linesep +from sys import stdout +from threading import Event, Lock, RLock, Thread +from time import time_ns +from typing import IO, Callable, Iterable, Optional + +from typing_extensions import final + +# This kind of import is needed to avoid Sphinx errors. +import opentelemetry.sdk.metrics._internal +from opentelemetry.context import ( + _SUPPRESS_INSTRUMENTATION_KEY, + attach, + detach, + set_value, +) +from opentelemetry.sdk.environment_variables import ( + OTEL_METRIC_EXPORT_INTERVAL, + OTEL_METRIC_EXPORT_TIMEOUT, +) +from opentelemetry.sdk.metrics._internal.aggregation import ( + AggregationTemporality, + DefaultAggregation, +) +from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError +from opentelemetry.sdk.metrics._internal.instrument import ( + Counter, + Gauge, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, + _Counter, + _Gauge, + _Histogram, + _ObservableCounter, + _ObservableGauge, + _ObservableUpDownCounter, + _UpDownCounter, +) +from opentelemetry.sdk.metrics._internal.point import MetricsData +from opentelemetry.util._once import Once + +_logger = getLogger(__name__) + + +class MetricExportResult(Enum): + """Result of exporting a metric + + Can be any of the following values:""" + + SUCCESS = 0 + FAILURE = 1 + + +class MetricExporter(ABC): + """Interface for exporting metrics. + + Interface to be implemented by services that want to export metrics received + in their own format. + + Args: + preferred_temporality: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to + configure exporter level preferred temporality. See `opentelemetry.sdk.metrics.export.MetricReader` for + more details on what preferred temporality is. + preferred_aggregation: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to + configure exporter level preferred aggregation. See `opentelemetry.sdk.metrics.export.MetricReader` for + more details on what preferred aggregation is. + """ + + def __init__( + self, + preferred_temporality: dict[type, AggregationTemporality] + | None = None, + preferred_aggregation: dict[ + type, "opentelemetry.sdk.metrics.view.Aggregation" + ] + | None = None, + ) -> None: + self._preferred_temporality = preferred_temporality + self._preferred_aggregation = preferred_aggregation + + @abstractmethod + def export( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + """Exports a batch of telemetry data. + + Args: + metrics: The list of `opentelemetry.sdk.metrics.export.Metric` objects to be exported + + Returns: + The result of the export + """ + + @abstractmethod + def force_flush(self, timeout_millis: float = 10_000) -> bool: + """ + Ensure that export of any metrics currently received by the exporter + are completed as soon as possible. + """ + + @abstractmethod + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + """Shuts down the exporter. + + Called when the SDK is shut down. + """ + + +class ConsoleMetricExporter(MetricExporter): + """Implementation of :class:`MetricExporter` that prints metrics to the + console. + + This class can be used for diagnostic purposes. It prints the exported + metrics to the console STDOUT. + """ + + def __init__( + self, + out: IO = stdout, + formatter: Callable[ + ["opentelemetry.sdk.metrics.export.MetricsData"], str + ] = lambda metrics_data: metrics_data.to_json() + linesep, + preferred_temporality: dict[type, AggregationTemporality] + | None = None, + preferred_aggregation: dict[ + type, "opentelemetry.sdk.metrics.view.Aggregation" + ] + | None = None, + ): + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) + self.out = out + self.formatter = formatter + + def export( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + self.out.write(self.formatter(metrics_data)) + self.out.flush() + return MetricExportResult.SUCCESS + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + pass + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + return True + + +class MetricReader(ABC): + # pylint: disable=too-many-branches,broad-exception-raised + """ + Base class for all metric readers + + Args: + preferred_temporality: A mapping between instrument classes and + aggregation temporality. By default uses CUMULATIVE for all instrument + classes. This mapping will be used to define the default aggregation + temporality of every instrument class. If the user wants to make a + change in the default aggregation temporality of an instrument class, + it is enough to pass here a dictionary whose keys are the instrument + classes and the values are the corresponding desired aggregation + temporalities of the classes that the user wants to change, not all of + them. The classes not included in the passed dictionary will retain + their association to their default aggregation temporalities. + preferred_aggregation: A mapping between instrument classes and + aggregation instances. By default maps all instrument classes to an + instance of `DefaultAggregation`. This mapping will be used to + define the default aggregation of every instrument class. If the + user wants to make a change in the default aggregation of an + instrument class, it is enough to pass here a dictionary whose keys + are the instrument classes and the values are the corresponding + desired aggregation for the instrument classes that the user wants + to change, not necessarily all of them. The classes not included in + the passed dictionary will retain their association to their + default aggregations. The aggregation defined here will be + overridden by an aggregation defined by a view that is not + `DefaultAggregation`. + + .. document protected _receive_metrics which is a intended to be overridden by subclass + .. automethod:: _receive_metrics + """ + + def __init__( + self, + preferred_temporality: dict[type, AggregationTemporality] + | None = None, + preferred_aggregation: dict[ + type, "opentelemetry.sdk.metrics.view.Aggregation" + ] + | None = None, + ) -> None: + self._collect: Callable[ + [ + "opentelemetry.sdk.metrics.export.MetricReader", + AggregationTemporality, + ], + Iterable["opentelemetry.sdk.metrics.export.Metric"], + ] = None + + self._instrument_class_temporality = { + _Counter: AggregationTemporality.CUMULATIVE, + _UpDownCounter: AggregationTemporality.CUMULATIVE, + _Histogram: AggregationTemporality.CUMULATIVE, + _Gauge: AggregationTemporality.CUMULATIVE, + _ObservableCounter: AggregationTemporality.CUMULATIVE, + _ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + _ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + if preferred_temporality is not None: + for temporality in preferred_temporality.values(): + if temporality not in ( + AggregationTemporality.CUMULATIVE, + AggregationTemporality.DELTA, + ): + raise Exception( + f"Invalid temporality value found {temporality}" + ) + + if preferred_temporality is not None: + for typ, temporality in preferred_temporality.items(): + if typ is Counter: + self._instrument_class_temporality[_Counter] = temporality + elif typ is UpDownCounter: + self._instrument_class_temporality[_UpDownCounter] = ( + temporality + ) + elif typ is Histogram: + self._instrument_class_temporality[_Histogram] = ( + temporality + ) + elif typ is Gauge: + self._instrument_class_temporality[_Gauge] = temporality + elif typ is ObservableCounter: + self._instrument_class_temporality[_ObservableCounter] = ( + temporality + ) + elif typ is ObservableUpDownCounter: + self._instrument_class_temporality[ + _ObservableUpDownCounter + ] = temporality + elif typ is ObservableGauge: + self._instrument_class_temporality[_ObservableGauge] = ( + temporality + ) + else: + raise Exception(f"Invalid instrument class found {typ}") + + self._preferred_temporality = preferred_temporality + self._instrument_class_aggregation = { + _Counter: DefaultAggregation(), + _UpDownCounter: DefaultAggregation(), + _Histogram: DefaultAggregation(), + _Gauge: DefaultAggregation(), + _ObservableCounter: DefaultAggregation(), + _ObservableUpDownCounter: DefaultAggregation(), + _ObservableGauge: DefaultAggregation(), + } + + if preferred_aggregation is not None: + for typ, aggregation in preferred_aggregation.items(): + if typ is Counter: + self._instrument_class_aggregation[_Counter] = aggregation + elif typ is UpDownCounter: + self._instrument_class_aggregation[_UpDownCounter] = ( + aggregation + ) + elif typ is Histogram: + self._instrument_class_aggregation[_Histogram] = ( + aggregation + ) + elif typ is Gauge: + self._instrument_class_aggregation[_Gauge] = aggregation + elif typ is ObservableCounter: + self._instrument_class_aggregation[_ObservableCounter] = ( + aggregation + ) + elif typ is ObservableUpDownCounter: + self._instrument_class_aggregation[ + _ObservableUpDownCounter + ] = aggregation + elif typ is ObservableGauge: + self._instrument_class_aggregation[_ObservableGauge] = ( + aggregation + ) + else: + raise Exception(f"Invalid instrument class found {typ}") + + @final + def collect(self, timeout_millis: float = 10_000) -> None: + """Collects the metrics from the internal SDK state and + invokes the `_receive_metrics` with the collection. + + Args: + timeout_millis: Amount of time in milliseconds before this function + raises a timeout error. + + If any of the underlying ``collect`` methods called by this method + fails by any reason (including timeout) an exception will be raised + detailing the individual errors that caused this function to fail. + """ + if self._collect is None: + _logger.warning( + "Cannot call collect on a MetricReader until it is registered on a MeterProvider" + ) + return + + metrics = self._collect(self, timeout_millis=timeout_millis) + + if metrics is not None: + self._receive_metrics( + metrics, + timeout_millis=timeout_millis, + ) + + @final + def _set_collect_callback( + self, + func: Callable[ + [ + "opentelemetry.sdk.metrics.export.MetricReader", + AggregationTemporality, + ], + Iterable["opentelemetry.sdk.metrics.export.Metric"], + ], + ) -> None: + """This function is internal to the SDK. It should not be called or overridden by users""" + self._collect = func + + @abstractmethod + def _receive_metrics( + self, + metrics_data: "opentelemetry.sdk.metrics.export.MetricsData", + timeout_millis: float = 10_000, + **kwargs, + ) -> None: + """Called by `MetricReader.collect` when it receives a batch of metrics""" + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + self.collect(timeout_millis=timeout_millis) + return True + + @abstractmethod + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + """Shuts down the MetricReader. This method provides a way + for the MetricReader to do any cleanup required. A metric reader can + only be shutdown once, any subsequent calls are ignored and return + failure status. + + When a `MetricReader` is registered on a + :class:`~opentelemetry.sdk.metrics.MeterProvider`, + :meth:`~opentelemetry.sdk.metrics.MeterProvider.shutdown` will invoke this + automatically. + """ + + +class InMemoryMetricReader(MetricReader): + """Implementation of `MetricReader` that returns its metrics from :func:`get_metrics_data`. + + This is useful for e.g. unit tests. + """ + + def __init__( + self, + preferred_temporality: dict[type, AggregationTemporality] + | None = None, + preferred_aggregation: dict[ + type, "opentelemetry.sdk.metrics.view.Aggregation" + ] + | None = None, + ) -> None: + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) + self._lock = RLock() + self._metrics_data: "opentelemetry.sdk.metrics.export.MetricsData" = ( + None + ) + + def get_metrics_data( + self, + ) -> Optional["opentelemetry.sdk.metrics.export.MetricsData"]: + """Reads and returns current metrics from the SDK""" + with self._lock: + self.collect() + metrics_data = self._metrics_data + self._metrics_data = None + return metrics_data + + def _receive_metrics( + self, + metrics_data: "opentelemetry.sdk.metrics.export.MetricsData", + timeout_millis: float = 10_000, + **kwargs, + ) -> None: + with self._lock: + self._metrics_data = metrics_data + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + pass + + +class PeriodicExportingMetricReader(MetricReader): + """`PeriodicExportingMetricReader` is an implementation of `MetricReader` + that collects metrics based on a user-configurable time interval, and passes the + metrics to the configured exporter. If the time interval is set to `math.inf`, the + reader will not invoke periodic collection. + + The configured exporter's :py:meth:`~MetricExporter.export` method will not be called + concurrently. + """ + + def __init__( + self, + exporter: MetricExporter, + export_interval_millis: Optional[float] = None, + export_timeout_millis: Optional[float] = None, + ) -> None: + # PeriodicExportingMetricReader defers to exporter for configuration + super().__init__( + preferred_temporality=exporter._preferred_temporality, + preferred_aggregation=exporter._preferred_aggregation, + ) + + # This lock is held whenever calling self._exporter.export() to prevent concurrent + # execution of MetricExporter.export() + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch + self._export_lock = Lock() + + self._exporter = exporter + if export_interval_millis is None: + try: + export_interval_millis = float( + environ.get(OTEL_METRIC_EXPORT_INTERVAL, 60000) + ) + except ValueError: + _logger.warning( + "Found invalid value for export interval, using default" + ) + export_interval_millis = 60000 + if export_timeout_millis is None: + try: + export_timeout_millis = float( + environ.get(OTEL_METRIC_EXPORT_TIMEOUT, 30000) + ) + except ValueError: + _logger.warning( + "Found invalid value for export timeout, using default" + ) + export_timeout_millis = 30000 + self._export_interval_millis = export_interval_millis + self._export_timeout_millis = export_timeout_millis + self._shutdown = False + self._shutdown_event = Event() + self._shutdown_once = Once() + self._daemon_thread = None + if ( + self._export_interval_millis > 0 + and self._export_interval_millis < math.inf + ): + self._daemon_thread = Thread( + name="OtelPeriodicExportingMetricReader", + target=self._ticker, + daemon=True, + ) + self._daemon_thread.start() + if hasattr(os, "register_at_fork"): + weak_at_fork = weakref.WeakMethod(self._at_fork_reinit) + + os.register_at_fork( + after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access + ) + elif self._export_interval_millis <= 0: + raise ValueError( + f"interval value {self._export_interval_millis} is invalid \ + and needs to be larger than zero." + ) + + def _at_fork_reinit(self): + self._daemon_thread = Thread( + name="OtelPeriodicExportingMetricReader", + target=self._ticker, + daemon=True, + ) + self._daemon_thread.start() + + def _ticker(self) -> None: + interval_secs = self._export_interval_millis / 1e3 + while not self._shutdown_event.wait(interval_secs): + try: + self.collect(timeout_millis=self._export_timeout_millis) + except MetricsTimeoutError: + _logger.warning( + "Metric collection timed out. Will try again after %s seconds", + interval_secs, + exc_info=True, + ) + # one last collection below before shutting down completely + try: + self.collect(timeout_millis=self._export_interval_millis) + except MetricsTimeoutError: + _logger.warning( + "Metric collection timed out.", + exc_info=True, + ) + + def _receive_metrics( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> None: + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + # pylint: disable=broad-exception-caught,invalid-name + try: + with self._export_lock: + self._exporter.export( + metrics_data, timeout_millis=timeout_millis + ) + except Exception: + _logger.exception("Exception while exporting metrics") + detach(token) + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + deadline_ns = time_ns() + timeout_millis * 10**6 + + def _shutdown(): + self._shutdown = True + + did_set = self._shutdown_once.do_once(_shutdown) + if not did_set: + _logger.warning("Can't shutdown multiple times") + return + + self._shutdown_event.set() + if self._daemon_thread: + self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9) + self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6) + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + super().force_flush(timeout_millis=timeout_millis) + self._exporter.force_flush(timeout_millis=timeout_millis) + return True diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py new file mode 100644 index 00000000..b01578f4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py @@ -0,0 +1,334 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=too-many-ancestors, unused-import +from __future__ import annotations + +from logging import getLogger +from time import time_ns +from typing import Generator, Iterable, List, Sequence, Union + +# This kind of import is needed to avoid Sphinx errors. +import opentelemetry.sdk.metrics +from opentelemetry.context import Context, get_current +from opentelemetry.metrics import CallbackT +from opentelemetry.metrics import Counter as APICounter +from opentelemetry.metrics import Histogram as APIHistogram +from opentelemetry.metrics import ObservableCounter as APIObservableCounter +from opentelemetry.metrics import ObservableGauge as APIObservableGauge +from opentelemetry.metrics import ( + ObservableUpDownCounter as APIObservableUpDownCounter, +) +from opentelemetry.metrics import UpDownCounter as APIUpDownCounter +from opentelemetry.metrics import _Gauge as APIGauge +from opentelemetry.metrics._internal.instrument import ( + CallbackOptions, + _MetricsHistogramAdvisory, +) +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.util.instrumentation import InstrumentationScope + +_logger = getLogger(__name__) + + +_ERROR_MESSAGE = ( + "Expected ASCII string of maximum length 63 characters but got {}" +) + + +class _Synchronous: + def __init__( + self, + name: str, + instrumentation_scope: InstrumentationScope, + measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer", + unit: str = "", + description: str = "", + ): + # pylint: disable=no-member + result = self._check_name_unit_description(name, unit, description) + + if result["name"] is None: + # pylint: disable=broad-exception-raised + raise Exception(_ERROR_MESSAGE.format(name)) + + if result["unit"] is None: + # pylint: disable=broad-exception-raised + raise Exception(_ERROR_MESSAGE.format(unit)) + + name = result["name"] + unit = result["unit"] + description = result["description"] + + self.name = name.lower() + self.unit = unit + self.description = description + self.instrumentation_scope = instrumentation_scope + self._measurement_consumer = measurement_consumer + super().__init__(name, unit=unit, description=description) + + +class _Asynchronous: + def __init__( + self, + name: str, + instrumentation_scope: InstrumentationScope, + measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer", + callbacks: Iterable[CallbackT] | None = None, + unit: str = "", + description: str = "", + ): + # pylint: disable=no-member + result = self._check_name_unit_description(name, unit, description) + + if result["name"] is None: + # pylint: disable=broad-exception-raised + raise Exception(_ERROR_MESSAGE.format(name)) + + if result["unit"] is None: + # pylint: disable=broad-exception-raised + raise Exception(_ERROR_MESSAGE.format(unit)) + + name = result["name"] + unit = result["unit"] + description = result["description"] + + self.name = name.lower() + self.unit = unit + self.description = description + self.instrumentation_scope = instrumentation_scope + self._measurement_consumer = measurement_consumer + super().__init__(name, callbacks, unit=unit, description=description) + + self._callbacks: List[CallbackT] = [] + + if callbacks is not None: + for callback in callbacks: + if isinstance(callback, Generator): + # advance generator to it's first yield + next(callback) + + def inner( + options: CallbackOptions, + callback=callback, + ) -> Iterable[Measurement]: + try: + return callback.send(options) + except StopIteration: + return [] + + self._callbacks.append(inner) + else: + self._callbacks.append(callback) + + def callback( + self, callback_options: CallbackOptions + ) -> Iterable[Measurement]: + for callback in self._callbacks: + try: + for api_measurement in callback(callback_options): + yield Measurement( + api_measurement.value, + time_unix_nano=time_ns(), + instrument=self, + context=api_measurement.context or get_current(), + attributes=api_measurement.attributes, + ) + except Exception: # pylint: disable=broad-exception-caught + _logger.exception( + "Callback failed for instrument %s.", self.name + ) + + +class Counter(_Synchronous, APICounter): + def __new__(cls, *args, **kwargs): + if cls is Counter: + raise TypeError("Counter must be instantiated via a meter.") + return super().__new__(cls) + + def add( + self, + amount: Union[int, float], + attributes: dict[str, str] | None = None, + context: Context | None = None, + ): + if amount < 0: + _logger.warning( + "Add amount must be non-negative on Counter %s.", self.name + ) + return + time_unix_nano = time_ns() + self._measurement_consumer.consume_measurement( + Measurement( + amount, + time_unix_nano, + self, + context or get_current(), + attributes, + ) + ) + + +class UpDownCounter(_Synchronous, APIUpDownCounter): + def __new__(cls, *args, **kwargs): + if cls is UpDownCounter: + raise TypeError("UpDownCounter must be instantiated via a meter.") + return super().__new__(cls) + + def add( + self, + amount: Union[int, float], + attributes: dict[str, str] | None = None, + context: Context | None = None, + ): + time_unix_nano = time_ns() + self._measurement_consumer.consume_measurement( + Measurement( + amount, + time_unix_nano, + self, + context or get_current(), + attributes, + ) + ) + + +class ObservableCounter(_Asynchronous, APIObservableCounter): + def __new__(cls, *args, **kwargs): + if cls is ObservableCounter: + raise TypeError( + "ObservableCounter must be instantiated via a meter." + ) + return super().__new__(cls) + + +class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter): + def __new__(cls, *args, **kwargs): + if cls is ObservableUpDownCounter: + raise TypeError( + "ObservableUpDownCounter must be instantiated via a meter." + ) + return super().__new__(cls) + + +class Histogram(_Synchronous, APIHistogram): + def __init__( + self, + name: str, + instrumentation_scope: InstrumentationScope, + measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer", + unit: str = "", + description: str = "", + explicit_bucket_boundaries_advisory: Sequence[float] | None = None, + ): + super().__init__( + name, + unit=unit, + description=description, + instrumentation_scope=instrumentation_scope, + measurement_consumer=measurement_consumer, + ) + self._advisory = _MetricsHistogramAdvisory( + explicit_bucket_boundaries=explicit_bucket_boundaries_advisory + ) + + def __new__(cls, *args, **kwargs): + if cls is Histogram: + raise TypeError("Histogram must be instantiated via a meter.") + return super().__new__(cls) + + def record( + self, + amount: Union[int, float], + attributes: dict[str, str] | None = None, + context: Context | None = None, + ): + if amount < 0: + _logger.warning( + "Record amount must be non-negative on Histogram %s.", + self.name, + ) + return + time_unix_nano = time_ns() + self._measurement_consumer.consume_measurement( + Measurement( + amount, + time_unix_nano, + self, + context or get_current(), + attributes, + ) + ) + + +class Gauge(_Synchronous, APIGauge): + def __new__(cls, *args, **kwargs): + if cls is Gauge: + raise TypeError("Gauge must be instantiated via a meter.") + return super().__new__(cls) + + def set( + self, + amount: Union[int, float], + attributes: dict[str, str] | None = None, + context: Context | None = None, + ): + time_unix_nano = time_ns() + self._measurement_consumer.consume_measurement( + Measurement( + amount, + time_unix_nano, + self, + context or get_current(), + attributes, + ) + ) + + +class ObservableGauge(_Asynchronous, APIObservableGauge): + def __new__(cls, *args, **kwargs): + if cls is ObservableGauge: + raise TypeError( + "ObservableGauge must be instantiated via a meter." + ) + return super().__new__(cls) + + +# Below classes exist to prevent the direct instantiation +class _Counter(Counter): + pass + + +class _UpDownCounter(UpDownCounter): + pass + + +class _ObservableCounter(ObservableCounter): + pass + + +class _ObservableUpDownCounter(ObservableUpDownCounter): + pass + + +class _Histogram(Histogram): + pass + + +class _Gauge(Gauge): + pass + + +class _ObservableGauge(ObservableGauge): + pass diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/measurement.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/measurement.py new file mode 100644 index 00000000..56619a83 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/measurement.py @@ -0,0 +1,45 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import Union + +from opentelemetry.context import Context +from opentelemetry.metrics import Instrument +from opentelemetry.util.types import Attributes + + +@dataclass(frozen=True) +class Measurement: + """ + Represents a data point reported via the metrics API to the SDK. + + Attributes + value: Measured value + time_unix_nano: The time the API call was made to record the Measurement + instrument: The instrument that produced this `Measurement`. + context: The active Context of the Measurement at API call time. + attributes: Measurement attributes + """ + + # TODO Fix doc - if using valid Google `Attributes:` key, the attributes are duplicated + # one will come from napoleon extension and the other from autodoc extension. This + # will raise an sphinx error of duplicated object description + # See https://github.com/sphinx-doc/sphinx/issues/8664 + + value: Union[int, float] + time_unix_nano: int + instrument: Instrument + context: Context + attributes: Attributes = None diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/measurement_consumer.py new file mode 100644 index 00000000..c6510330 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -0,0 +1,145 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-import + +from abc import ABC, abstractmethod +from threading import Lock +from time import time_ns +from typing import Iterable, List, Mapping, Optional + +# This kind of import is needed to avoid Sphinx errors. +import opentelemetry.sdk.metrics +import opentelemetry.sdk.metrics._internal.instrument +import opentelemetry.sdk.metrics._internal.sdk_configuration +from opentelemetry.metrics._internal.instrument import CallbackOptions +from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.metric_reader_storage import ( + MetricReaderStorage, +) +from opentelemetry.sdk.metrics._internal.point import Metric + + +class MeasurementConsumer(ABC): + @abstractmethod + def consume_measurement(self, measurement: Measurement) -> None: + pass + + @abstractmethod + def register_asynchronous_instrument( + self, + instrument: ( + "opentelemetry.sdk.metrics._internal.instrument_Asynchronous" + ), + ): + pass + + @abstractmethod + def collect( + self, + metric_reader: "opentelemetry.sdk.metrics.MetricReader", + timeout_millis: float = 10_000, + ) -> Optional[Iterable[Metric]]: + pass + + +class SynchronousMeasurementConsumer(MeasurementConsumer): + def __init__( + self, + sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration", + ) -> None: + self._lock = Lock() + self._sdk_config = sdk_config + # should never be mutated + self._reader_storages: Mapping[ + "opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage + ] = { + reader: MetricReaderStorage( + sdk_config, + reader._instrument_class_temporality, + reader._instrument_class_aggregation, + ) + for reader in sdk_config.metric_readers + } + self._async_instruments: List[ + "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" + ] = [] + + def consume_measurement(self, measurement: Measurement) -> None: + should_sample_exemplar = ( + self._sdk_config.exemplar_filter.should_sample( + measurement.value, + measurement.time_unix_nano, + measurement.attributes, + measurement.context, + ) + ) + for reader_storage in self._reader_storages.values(): + reader_storage.consume_measurement( + measurement, should_sample_exemplar + ) + + def register_asynchronous_instrument( + self, + instrument: ( + "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" + ), + ) -> None: + with self._lock: + self._async_instruments.append(instrument) + + def collect( + self, + metric_reader: "opentelemetry.sdk.metrics.MetricReader", + timeout_millis: float = 10_000, + ) -> Optional[Iterable[Metric]]: + with self._lock: + metric_reader_storage = self._reader_storages[metric_reader] + # for now, just use the defaults + callback_options = CallbackOptions() + deadline_ns = time_ns() + (timeout_millis * 1e6) + + default_timeout_ns = 10000 * 1e6 + + for async_instrument in self._async_instruments: + remaining_time = deadline_ns - time_ns() + + if remaining_time < default_timeout_ns: + callback_options = CallbackOptions( + timeout_millis=remaining_time / 1e6 + ) + + measurements = async_instrument.callback(callback_options) + if time_ns() >= deadline_ns: + raise MetricsTimeoutError( + "Timed out while executing callback" + ) + + for measurement in measurements: + should_sample_exemplar = ( + self._sdk_config.exemplar_filter.should_sample( + measurement.value, + measurement.time_unix_nano, + measurement.attributes, + measurement.context, + ) + ) + metric_reader_storage.consume_measurement( + measurement, should_sample_exemplar + ) + + result = self._reader_storages[metric_reader].collect() + + return result diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py new file mode 100644 index 00000000..f5121811 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -0,0 +1,315 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from logging import getLogger +from threading import RLock +from time import time_ns +from typing import Dict, List, Optional + +from opentelemetry.metrics import ( + Asynchronous, + Counter, + Instrument, + ObservableCounter, +) +from opentelemetry.sdk.metrics._internal._view_instrument_match import ( + _ViewInstrumentMatch, +) +from opentelemetry.sdk.metrics._internal.aggregation import ( + Aggregation, + ExplicitBucketHistogramAggregation, + _DropAggregation, + _ExplicitBucketHistogramAggregation, + _ExponentialBucketHistogramAggregation, + _LastValueAggregation, + _SumAggregation, +) +from opentelemetry.sdk.metrics._internal.export import AggregationTemporality +from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.point import ( + ExponentialHistogram, + Gauge, + Histogram, + Metric, + MetricsData, + ResourceMetrics, + ScopeMetrics, + Sum, +) +from opentelemetry.sdk.metrics._internal.sdk_configuration import ( + SdkConfiguration, +) +from opentelemetry.sdk.metrics._internal.view import View +from opentelemetry.sdk.util.instrumentation import InstrumentationScope + +_logger = getLogger(__name__) + +_DEFAULT_VIEW = View(instrument_name="") + + +class MetricReaderStorage: + """The SDK's storage for a given reader""" + + def __init__( + self, + sdk_config: SdkConfiguration, + instrument_class_temporality: Dict[type, AggregationTemporality], + instrument_class_aggregation: Dict[type, Aggregation], + ) -> None: + self._lock = RLock() + self._sdk_config = sdk_config + self._instrument_view_instrument_matches: Dict[ + Instrument, List[_ViewInstrumentMatch] + ] = {} + self._instrument_class_temporality = instrument_class_temporality + self._instrument_class_aggregation = instrument_class_aggregation + + def _get_or_init_view_instrument_match( + self, instrument: Instrument + ) -> List[_ViewInstrumentMatch]: + # Optimistically get the relevant views for the given instrument. Once set for a given + # instrument, the mapping will never change + + if instrument in self._instrument_view_instrument_matches: + return self._instrument_view_instrument_matches[instrument] + + with self._lock: + # double check if it was set before we held the lock + if instrument in self._instrument_view_instrument_matches: + return self._instrument_view_instrument_matches[instrument] + + # not present, hold the lock and add a new mapping + view_instrument_matches = [] + + self._handle_view_instrument_match( + instrument, view_instrument_matches + ) + + # if no view targeted the instrument, use the default + if not view_instrument_matches: + view_instrument_matches.append( + _ViewInstrumentMatch( + view=_DEFAULT_VIEW, + instrument=instrument, + instrument_class_aggregation=( + self._instrument_class_aggregation + ), + ) + ) + self._instrument_view_instrument_matches[instrument] = ( + view_instrument_matches + ) + + return view_instrument_matches + + def consume_measurement( + self, measurement: Measurement, should_sample_exemplar: bool = True + ) -> None: + for view_instrument_match in self._get_or_init_view_instrument_match( + measurement.instrument + ): + view_instrument_match.consume_measurement( + measurement, should_sample_exemplar + ) + + def collect(self) -> Optional[MetricsData]: + # Use a list instead of yielding to prevent a slow reader from holding + # SDK locks + + # While holding the lock, new _ViewInstrumentMatch can't be added from + # another thread (so we are sure we collect all existing view). + # However, instruments can still send measurements that will make it + # into the individual aggregations; collection will acquire those locks + # iteratively to keep locking as fine-grained as possible. One side + # effect is that end times can be slightly skewed among the metric + # streams produced by the SDK, but we still align the output timestamps + # for a single instrument. + + collection_start_nanos = time_ns() + + with self._lock: + instrumentation_scope_scope_metrics: Dict[ + InstrumentationScope, ScopeMetrics + ] = {} + + for ( + instrument, + view_instrument_matches, + ) in self._instrument_view_instrument_matches.items(): + aggregation_temporality = self._instrument_class_temporality[ + instrument.__class__ + ] + + metrics: List[Metric] = [] + + for view_instrument_match in view_instrument_matches: + data_points = view_instrument_match.collect( + aggregation_temporality, collection_start_nanos + ) + + if data_points is None: + continue + + if isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _SumAggregation, + ): + data = Sum( + aggregation_temporality=aggregation_temporality, + data_points=data_points, + is_monotonic=isinstance( + instrument, (Counter, ObservableCounter) + ), + ) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _LastValueAggregation, + ): + data = Gauge(data_points=data_points) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _ExplicitBucketHistogramAggregation, + ): + data = Histogram( + data_points=data_points, + aggregation_temporality=aggregation_temporality, + ) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _DropAggregation, + ): + continue + + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _ExponentialBucketHistogramAggregation, + ): + data = ExponentialHistogram( + data_points=data_points, + aggregation_temporality=aggregation_temporality, + ) + + metrics.append( + Metric( + # pylint: disable=protected-access + # pylint: disable=possibly-used-before-assignment + name=view_instrument_match._name, + description=view_instrument_match._description, + unit=view_instrument_match._instrument.unit, + data=data, + ) + ) + + if metrics: + if instrument.instrumentation_scope not in ( + instrumentation_scope_scope_metrics + ): + instrumentation_scope_scope_metrics[ + instrument.instrumentation_scope + ] = ScopeMetrics( + scope=instrument.instrumentation_scope, + metrics=metrics, + schema_url=instrument.instrumentation_scope.schema_url, + ) + else: + instrumentation_scope_scope_metrics[ + instrument.instrumentation_scope + ].metrics.extend(metrics) + + if instrumentation_scope_scope_metrics: + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=self._sdk_config.resource, + scope_metrics=list( + instrumentation_scope_scope_metrics.values() + ), + schema_url=self._sdk_config.resource.schema_url, + ) + ] + ) + + return None + + def _handle_view_instrument_match( + self, + instrument: Instrument, + view_instrument_matches: List["_ViewInstrumentMatch"], + ) -> None: + for view in self._sdk_config.views: + # pylint: disable=protected-access + if not view._match(instrument): + continue + + if not self._check_view_instrument_compatibility(view, instrument): + continue + + new_view_instrument_match = _ViewInstrumentMatch( + view=view, + instrument=instrument, + instrument_class_aggregation=( + self._instrument_class_aggregation + ), + ) + + for ( + existing_view_instrument_matches + ) in self._instrument_view_instrument_matches.values(): + for ( + existing_view_instrument_match + ) in existing_view_instrument_matches: + if existing_view_instrument_match.conflicts( + new_view_instrument_match + ): + _logger.warning( + "Views %s and %s will cause conflicting " + "metrics identities", + existing_view_instrument_match._view, + new_view_instrument_match._view, + ) + + view_instrument_matches.append(new_view_instrument_match) + + @staticmethod + def _check_view_instrument_compatibility( + view: View, instrument: Instrument + ) -> bool: + """ + Checks if a view and an instrument are compatible. + + Returns `true` if they are compatible and a `_ViewInstrumentMatch` + object should be created, `false` otherwise. + """ + + result = True + + # pylint: disable=protected-access + if isinstance(instrument, Asynchronous) and isinstance( + view._aggregation, ExplicitBucketHistogramAggregation + ): + _logger.warning( + "View %s and instrument %s will produce " + "semantic errors when matched, the view " + "has not been applied.", + view, + instrument, + ) + result = False + + return result diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/point.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/point.py new file mode 100644 index 00000000..8c7e3469 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/point.py @@ -0,0 +1,277 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-import + +from dataclasses import asdict, dataclass, field +from json import dumps, loads +from typing import Optional, Sequence, Union + +# This kind of import is needed to avoid Sphinx errors. +import opentelemetry.sdk.metrics._internal +from opentelemetry.sdk.metrics._internal.exemplar import Exemplar +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.util.types import Attributes + + +@dataclass(frozen=True) +class NumberDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int + time_unix_nano: int + value: Union[int, float] + exemplars: Sequence[Exemplar] = field(default_factory=list) + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps(asdict(self), indent=indent) + + +@dataclass(frozen=True) +class HistogramDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int + time_unix_nano: int + count: int + sum: Union[int, float] + bucket_counts: Sequence[int] + explicit_bounds: Sequence[float] + min: float + max: float + exemplars: Sequence[Exemplar] = field(default_factory=list) + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps(asdict(self), indent=indent) + + +@dataclass(frozen=True) +class Buckets: + offset: int + bucket_counts: Sequence[int] + + +@dataclass(frozen=True) +class ExponentialHistogramDataPoint: + """Single data point in a timeseries whose boundaries are defined by an + exponential function. This timeseries describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int + time_unix_nano: int + count: int + sum: Union[int, float] + scale: int + zero_count: int + positive: Buckets + negative: Buckets + flags: int + min: float + max: float + exemplars: Sequence[Exemplar] = field(default_factory=list) + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps(asdict(self), indent=indent) + + +@dataclass(frozen=True) +class ExponentialHistogram: + """Represents the type of a metric that is calculated by aggregating as an + ExponentialHistogram of all reported measurements over a time interval. + """ + + data_points: Sequence[ExponentialHistogramDataPoint] + aggregation_temporality: ( + "opentelemetry.sdk.metrics.export.AggregationTemporality" + ) + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "data_points": [ + loads(data_point.to_json(indent=indent)) + for data_point in self.data_points + ], + "aggregation_temporality": self.aggregation_temporality, + }, + indent=indent, + ) + + +@dataclass(frozen=True) +class Sum: + """Represents the type of a scalar metric that is calculated as a sum of + all reported measurements over a time interval.""" + + data_points: Sequence[NumberDataPoint] + aggregation_temporality: ( + "opentelemetry.sdk.metrics.export.AggregationTemporality" + ) + is_monotonic: bool + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "data_points": [ + loads(data_point.to_json(indent=indent)) + for data_point in self.data_points + ], + "aggregation_temporality": self.aggregation_temporality, + "is_monotonic": self.is_monotonic, + }, + indent=indent, + ) + + +@dataclass(frozen=True) +class Gauge: + """Represents the type of a scalar metric that always exports the current + value for every data point. It should be used for an unknown + aggregation.""" + + data_points: Sequence[NumberDataPoint] + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "data_points": [ + loads(data_point.to_json(indent=indent)) + for data_point in self.data_points + ], + }, + indent=indent, + ) + + +@dataclass(frozen=True) +class Histogram: + """Represents the type of a metric that is calculated by aggregating as a + histogram of all reported measurements over a time interval.""" + + data_points: Sequence[HistogramDataPoint] + aggregation_temporality: ( + "opentelemetry.sdk.metrics.export.AggregationTemporality" + ) + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "data_points": [ + loads(data_point.to_json(indent=indent)) + for data_point in self.data_points + ], + "aggregation_temporality": self.aggregation_temporality, + }, + indent=indent, + ) + + +# pylint: disable=invalid-name +DataT = Union[Sum, Gauge, Histogram, ExponentialHistogram] +DataPointT = Union[ + NumberDataPoint, HistogramDataPoint, ExponentialHistogramDataPoint +] + + +@dataclass(frozen=True) +class Metric: + """Represents a metric point in the OpenTelemetry data model to be + exported.""" + + name: str + description: Optional[str] + unit: Optional[str] + data: DataT + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "name": self.name, + "description": self.description or "", + "unit": self.unit or "", + "data": loads(self.data.to_json(indent=indent)), + }, + indent=indent, + ) + + +@dataclass(frozen=True) +class ScopeMetrics: + """A collection of Metrics produced by a scope""" + + scope: InstrumentationScope + metrics: Sequence[Metric] + schema_url: str + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "scope": loads(self.scope.to_json(indent=indent)), + "metrics": [ + loads(metric.to_json(indent=indent)) + for metric in self.metrics + ], + "schema_url": self.schema_url, + }, + indent=indent, + ) + + +@dataclass(frozen=True) +class ResourceMetrics: + """A collection of ScopeMetrics from a Resource""" + + resource: Resource + scope_metrics: Sequence[ScopeMetrics] + schema_url: str + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "resource": loads(self.resource.to_json(indent=indent)), + "scope_metrics": [ + loads(scope_metrics.to_json(indent=indent)) + for scope_metrics in self.scope_metrics + ], + "schema_url": self.schema_url, + }, + indent=indent, + ) + + +@dataclass(frozen=True) +class MetricsData: + """An array of ResourceMetrics""" + + resource_metrics: Sequence[ResourceMetrics] + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "resource_metrics": [ + loads(resource_metrics.to_json(indent=indent)) + for resource_metrics in self.resource_metrics + ] + }, + indent=indent, + ) diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/sdk_configuration.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/sdk_configuration.py new file mode 100644 index 00000000..3d88facb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/sdk_configuration.py @@ -0,0 +1,30 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-import + +from dataclasses import dataclass +from typing import Sequence + +# This kind of import is needed to avoid Sphinx errors. +import opentelemetry.sdk.metrics +import opentelemetry.sdk.resources + + +@dataclass +class SdkConfiguration: + exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter" + resource: "opentelemetry.sdk.resources.Resource" + metric_readers: Sequence["opentelemetry.sdk.metrics.MetricReader"] + views: Sequence["opentelemetry.sdk.metrics.View"] diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/view.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/view.py new file mode 100644 index 00000000..b3fa029d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/view.py @@ -0,0 +1,195 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from fnmatch import fnmatch +from logging import getLogger +from typing import Callable, Optional, Set, Type + +from opentelemetry.metrics import Instrument +from opentelemetry.sdk.metrics._internal.aggregation import ( + Aggregation, + DefaultAggregation, + _Aggregation, + _ExplicitBucketHistogramAggregation, + _ExponentialBucketHistogramAggregation, +) +from opentelemetry.sdk.metrics._internal.exemplar import ( + AlignedHistogramBucketExemplarReservoir, + ExemplarReservoirBuilder, + SimpleFixedSizeExemplarReservoir, +) + +_logger = getLogger(__name__) + + +def _default_reservoir_factory( + aggregation_type: Type[_Aggregation], +) -> ExemplarReservoirBuilder: + """Default reservoir factory per aggregation.""" + if issubclass(aggregation_type, _ExplicitBucketHistogramAggregation): + return AlignedHistogramBucketExemplarReservoir + if issubclass(aggregation_type, _ExponentialBucketHistogramAggregation): + return SimpleFixedSizeExemplarReservoir + return SimpleFixedSizeExemplarReservoir + + +class View: + """ + A `View` configuration parameters can be used for the following + purposes: + + 1. Match instruments: When an instrument matches a view, measurements + received by that instrument will be processed. + 2. Customize metric streams: A metric stream is identified by a match + between a view and an instrument and a set of attributes. The metric + stream can be customized by certain attributes of the corresponding view. + + The attributes documented next serve one of the previous two purposes. + + Args: + instrument_type: This is an instrument matching attribute: the class the + instrument must be to match the view. + + instrument_name: This is an instrument matching attribute: the name the + instrument must have to match the view. Wild card characters are supported. Wild + card characters should not be used with this attribute if the view has also a + ``name`` defined. + + meter_name: This is an instrument matching attribute: the name the + instrument meter must have to match the view. + + meter_version: This is an instrument matching attribute: the version + the instrument meter must have to match the view. + + meter_schema_url: This is an instrument matching attribute: the schema + URL the instrument meter must have to match the view. + + name: This is a metric stream customizing attribute: the name of the + metric stream. If `None`, the name of the instrument will be used. + + description: This is a metric stream customizing attribute: the + description of the metric stream. If `None`, the description of the instrument will + be used. + + attribute_keys: This is a metric stream customizing attribute: this is + a set of attribute keys. If not `None` then only the measurement attributes that + are in ``attribute_keys`` will be used to identify the metric stream. + + aggregation: This is a metric stream customizing attribute: the + aggregation instance to use when data is aggregated for the + corresponding metrics stream. If `None` an instance of + `DefaultAggregation` will be used. + + exemplar_reservoir_factory: This is a metric stream customizing attribute: + the exemplar reservoir factory + + instrument_unit: This is an instrument matching attribute: the unit the + instrument must have to match the view. + + This class is not intended to be subclassed by the user. + """ + + _default_aggregation = DefaultAggregation() + + def __init__( + self, + instrument_type: Optional[Type[Instrument]] = None, + instrument_name: Optional[str] = None, + meter_name: Optional[str] = None, + meter_version: Optional[str] = None, + meter_schema_url: Optional[str] = None, + name: Optional[str] = None, + description: Optional[str] = None, + attribute_keys: Optional[Set[str]] = None, + aggregation: Optional[Aggregation] = None, + exemplar_reservoir_factory: Optional[ + Callable[[Type[_Aggregation]], ExemplarReservoirBuilder] + ] = None, + instrument_unit: Optional[str] = None, + ): + if ( + instrument_type + is instrument_name + is instrument_unit + is meter_name + is meter_version + is meter_schema_url + is None + ): + # pylint: disable=broad-exception-raised + raise Exception( + "Some instrument selection " + f"criteria must be provided for View {name}" + ) + + if ( + name is not None + and instrument_name is not None + and ("*" in instrument_name or "?" in instrument_name) + ): + # pylint: disable=broad-exception-raised + raise Exception( + f"View {name} declared with wildcard " + "characters in instrument_name" + ) + + # _name, _description, _aggregation, _exemplar_reservoir_factory and + # _attribute_keys will be accessed when instantiating a _ViewInstrumentMatch. + self._name = name + self._instrument_type = instrument_type + self._instrument_name = instrument_name + self._instrument_unit = instrument_unit + self._meter_name = meter_name + self._meter_version = meter_version + self._meter_schema_url = meter_schema_url + + self._description = description + self._attribute_keys = attribute_keys + self._aggregation = aggregation or self._default_aggregation + self._exemplar_reservoir_factory = ( + exemplar_reservoir_factory or _default_reservoir_factory + ) + + # pylint: disable=too-many-return-statements + # pylint: disable=too-many-branches + def _match(self, instrument: Instrument) -> bool: + if self._instrument_type is not None: + if not isinstance(instrument, self._instrument_type): + return False + + if self._instrument_name is not None: + if not fnmatch(instrument.name, self._instrument_name): + return False + + if self._instrument_unit is not None: + if not fnmatch(instrument.unit, self._instrument_unit): + return False + + if self._meter_name is not None: + if instrument.instrumentation_scope.name != self._meter_name: + return False + + if self._meter_version is not None: + if instrument.instrumentation_scope.version != self._meter_version: + return False + + if self._meter_schema_url is not None: + if ( + instrument.instrumentation_scope.schema_url + != self._meter_schema_url + ): + return False + + return True diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/export/__init__.py new file mode 100644 index 00000000..478237cd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/export/__init__.py @@ -0,0 +1,66 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from opentelemetry.sdk.metrics._internal.export import ( + AggregationTemporality, + ConsoleMetricExporter, + InMemoryMetricReader, + MetricExporter, + MetricExportResult, + MetricReader, + PeriodicExportingMetricReader, +) + +# The point module is not in the export directory to avoid a circular import. +from opentelemetry.sdk.metrics._internal.point import ( # noqa: F401 + Buckets, + DataPointT, + DataT, + ExponentialHistogram, + ExponentialHistogramDataPoint, + Gauge, + Histogram, + HistogramDataPoint, + Metric, + MetricsData, + NumberDataPoint, + ResourceMetrics, + ScopeMetrics, + Sum, +) + +__all__ = [ + "AggregationTemporality", + "Buckets", + "ConsoleMetricExporter", + "InMemoryMetricReader", + "MetricExporter", + "MetricExportResult", + "MetricReader", + "PeriodicExportingMetricReader", + "DataPointT", + "DataT", + "ExponentialHistogram", + "ExponentialHistogramDataPoint", + "Gauge", + "Histogram", + "HistogramDataPoint", + "Metric", + "MetricsData", + "NumberDataPoint", + "ResourceMetrics", + "ScopeMetrics", + "Sum", +] diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/view/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/view/__init__.py new file mode 100644 index 00000000..c07adf6c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/view/__init__.py @@ -0,0 +1,35 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry.sdk.metrics._internal.aggregation import ( + Aggregation, + DefaultAggregation, + DropAggregation, + ExplicitBucketHistogramAggregation, + ExponentialBucketHistogramAggregation, + LastValueAggregation, + SumAggregation, +) +from opentelemetry.sdk.metrics._internal.view import View + +__all__ = [ + "Aggregation", + "DefaultAggregation", + "DropAggregation", + "ExplicitBucketHistogramAggregation", + "ExponentialBucketHistogramAggregation", + "LastValueAggregation", + "SumAggregation", + "View", +] |