diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py | 582 |
1 files changed, 582 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py new file mode 100644 index 00000000..faa0959f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py @@ -0,0 +1,582 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import weakref +from atexit import register, unregister +from logging import getLogger +from os import environ +from threading import Lock +from time import time_ns +from typing import Optional, Sequence + +# This kind of import is needed to avoid Sphinx errors. +import opentelemetry.sdk.metrics +from opentelemetry.metrics import Counter as APICounter +from opentelemetry.metrics import Histogram as APIHistogram +from opentelemetry.metrics import Meter as APIMeter +from opentelemetry.metrics import MeterProvider as APIMeterProvider +from opentelemetry.metrics import NoOpMeter +from opentelemetry.metrics import ObservableCounter as APIObservableCounter +from opentelemetry.metrics import ObservableGauge as APIObservableGauge +from opentelemetry.metrics import ( + ObservableUpDownCounter as APIObservableUpDownCounter, +) +from opentelemetry.metrics import UpDownCounter as APIUpDownCounter +from opentelemetry.metrics import _Gauge as APIGauge +from opentelemetry.sdk.environment_variables import ( + OTEL_METRICS_EXEMPLAR_FILTER, + OTEL_SDK_DISABLED, +) +from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError +from opentelemetry.sdk.metrics._internal.exemplar import ( + AlwaysOffExemplarFilter, + AlwaysOnExemplarFilter, + ExemplarFilter, + TraceBasedExemplarFilter, +) +from opentelemetry.sdk.metrics._internal.instrument import ( + _Counter, + _Gauge, + _Histogram, + _ObservableCounter, + _ObservableGauge, + _ObservableUpDownCounter, + _UpDownCounter, +) +from opentelemetry.sdk.metrics._internal.measurement_consumer import ( + MeasurementConsumer, + SynchronousMeasurementConsumer, +) +from opentelemetry.sdk.metrics._internal.sdk_configuration import ( + SdkConfiguration, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.util._once import Once +from opentelemetry.util.types import ( + Attributes, +) + +_logger = getLogger(__name__) + + +class Meter(APIMeter): + """See `opentelemetry.metrics.Meter`.""" + + def __init__( + self, + instrumentation_scope: InstrumentationScope, + measurement_consumer: MeasurementConsumer, + ): + super().__init__( + name=instrumentation_scope.name, + version=instrumentation_scope.version, + schema_url=instrumentation_scope.schema_url, + ) + self._instrumentation_scope = instrumentation_scope + self._measurement_consumer = measurement_consumer + self._instrument_id_instrument = {} + self._instrument_id_instrument_lock = Lock() + + def create_counter(self, name, unit="", description="") -> APICounter: + status = self._register_instrument(name, _Counter, unit, description) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APICounter.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _Counter( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_up_down_counter( + self, name, unit="", description="" + ) -> APIUpDownCounter: + status = self._register_instrument( + name, _UpDownCounter, unit, description + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIUpDownCounter.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _UpDownCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_observable_counter( + self, + name, + callbacks=None, + unit="", + description="", + ) -> APIObservableCounter: + status = self._register_instrument( + name, _ObservableCounter, unit, description + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIObservableCounter.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _ObservableCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + + self._measurement_consumer.register_asynchronous_instrument(instrument) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_histogram( + self, + name: str, + unit: str = "", + description: str = "", + *, + explicit_bucket_boundaries_advisory: Optional[Sequence[float]] = None, + ) -> APIHistogram: + if explicit_bucket_boundaries_advisory is not None: + invalid_advisory = False + if isinstance(explicit_bucket_boundaries_advisory, Sequence): + try: + invalid_advisory = not ( + all( + isinstance(e, (float, int)) + for e in explicit_bucket_boundaries_advisory + ) + ) + except (KeyError, TypeError): + invalid_advisory = True + else: + invalid_advisory = True + + if invalid_advisory: + explicit_bucket_boundaries_advisory = None + _logger.warning( + "explicit_bucket_boundaries_advisory must be a sequence of numbers" + ) + + status = self._register_instrument( + name, + _Histogram, + unit, + description, + explicit_bucket_boundaries_advisory, + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIHistogram.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _Histogram( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + explicit_bucket_boundaries_advisory, + ) + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_gauge(self, name, unit="", description="") -> APIGauge: + status = self._register_instrument(name, _Gauge, unit, description) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIGauge.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _Gauge( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_observable_gauge( + self, name, callbacks=None, unit="", description="" + ) -> APIObservableGauge: + status = self._register_instrument( + name, _ObservableGauge, unit, description + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIObservableGauge.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _ObservableGauge( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + + self._measurement_consumer.register_asynchronous_instrument(instrument) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + def create_observable_up_down_counter( + self, name, callbacks=None, unit="", description="" + ) -> APIObservableUpDownCounter: + status = self._register_instrument( + name, _ObservableUpDownCounter, unit, description + ) + + if status.conflict: + # FIXME #2558 go through all views here and check if this + # instrument registration conflict can be fixed. If it can be, do + # not log the following warning. + self._log_instrument_registration_conflict( + name, + APIObservableUpDownCounter.__name__, + unit, + description, + status, + ) + if status.already_registered: + with self._instrument_id_instrument_lock: + return self._instrument_id_instrument[status.instrument_id] + + instrument = _ObservableUpDownCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + + self._measurement_consumer.register_asynchronous_instrument(instrument) + + with self._instrument_id_instrument_lock: + self._instrument_id_instrument[status.instrument_id] = instrument + return instrument + + +def _get_exemplar_filter(exemplar_filter: str) -> ExemplarFilter: + if exemplar_filter == "trace_based": + return TraceBasedExemplarFilter() + if exemplar_filter == "always_on": + return AlwaysOnExemplarFilter() + if exemplar_filter == "always_off": + return AlwaysOffExemplarFilter() + msg = f"Unknown exemplar filter '{exemplar_filter}'." + raise ValueError(msg) + + +class MeterProvider(APIMeterProvider): + r"""See `opentelemetry.metrics.MeterProvider`. + + Args: + metric_readers: Register metric readers to collect metrics from the SDK + on demand. Each :class:`opentelemetry.sdk.metrics.export.MetricReader` is + completely independent and will collect separate streams of + metrics. TODO: reference ``PeriodicExportingMetricReader`` usage with push + exporters here. + resource: The resource representing what the metrics emitted from the SDK pertain to. + shutdown_on_exit: If true, registers an `atexit` handler to call + `MeterProvider.shutdown` + views: The views to configure the metric output the SDK + + By default, instruments which do not match any :class:`opentelemetry.sdk.metrics.view.View` (or if no :class:`opentelemetry.sdk.metrics.view.View`\ s + are provided) will report metrics with the default aggregation for the + instrument's kind. To disable instruments by default, configure a match-all + :class:`opentelemetry.sdk.metrics.view.View` with `DropAggregation` and then create :class:`opentelemetry.sdk.metrics.view.View`\ s to re-enable + individual instruments: + + .. code-block:: python + :caption: Disable default views + + MeterProvider( + views=[ + View(instrument_name="*", aggregation=DropAggregation()), + View(instrument_name="mycounter"), + ], + # ... + ) + """ + + _all_metric_readers_lock = Lock() + _all_metric_readers = weakref.WeakSet() + + def __init__( + self, + metric_readers: Sequence[ + "opentelemetry.sdk.metrics.export.MetricReader" + ] = (), + resource: Optional[Resource] = None, + exemplar_filter: Optional[ExemplarFilter] = None, + shutdown_on_exit: bool = True, + views: Sequence["opentelemetry.sdk.metrics.view.View"] = (), + ): + self._lock = Lock() + self._meter_lock = Lock() + self._atexit_handler = None + if resource is None: + resource = Resource.create({}) + self._sdk_config = SdkConfiguration( + exemplar_filter=( + exemplar_filter + or _get_exemplar_filter( + environ.get(OTEL_METRICS_EXEMPLAR_FILTER, "trace_based") + ) + ), + resource=resource, + metric_readers=metric_readers, + views=views, + ) + self._measurement_consumer = SynchronousMeasurementConsumer( + sdk_config=self._sdk_config + ) + disabled = environ.get(OTEL_SDK_DISABLED, "") + self._disabled = disabled.lower().strip() == "true" + + if shutdown_on_exit: + self._atexit_handler = register(self.shutdown) + + self._meters = {} + self._shutdown_once = Once() + self._shutdown = False + + for metric_reader in self._sdk_config.metric_readers: + with self._all_metric_readers_lock: + if metric_reader in self._all_metric_readers: + # pylint: disable=broad-exception-raised + raise Exception( + f"MetricReader {metric_reader} has been registered " + "already in other MeterProvider instance" + ) + + self._all_metric_readers.add(metric_reader) + + metric_reader._set_collect_callback( + self._measurement_consumer.collect + ) + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + deadline_ns = time_ns() + timeout_millis * 10**6 + + metric_reader_error = {} + + for metric_reader in self._sdk_config.metric_readers: + current_ts = time_ns() + try: + if current_ts >= deadline_ns: + raise MetricsTimeoutError( + "Timed out while flushing metric readers" + ) + metric_reader.force_flush( + timeout_millis=(deadline_ns - current_ts) / 10**6 + ) + + # pylint: disable=broad-exception-caught + except Exception as error: + metric_reader_error[metric_reader] = error + + if metric_reader_error: + metric_reader_error_string = "\n".join( + [ + f"{metric_reader.__class__.__name__}: {repr(error)}" + for metric_reader, error in metric_reader_error.items() + ] + ) + + # pylint: disable=broad-exception-raised + raise Exception( + "MeterProvider.force_flush failed because the following " + "metric readers failed during collect:\n" + f"{metric_reader_error_string}" + ) + return True + + def shutdown(self, timeout_millis: float = 30_000): + deadline_ns = time_ns() + timeout_millis * 10**6 + + def _shutdown(): + self._shutdown = True + + did_shutdown = self._shutdown_once.do_once(_shutdown) + + if not did_shutdown: + _logger.warning("shutdown can only be called once") + return + + metric_reader_error = {} + + for metric_reader in self._sdk_config.metric_readers: + current_ts = time_ns() + try: + if current_ts >= deadline_ns: + # pylint: disable=broad-exception-raised + raise Exception( + "Didn't get to execute, deadline already exceeded" + ) + metric_reader.shutdown( + timeout_millis=(deadline_ns - current_ts) / 10**6 + ) + + # pylint: disable=broad-exception-caught + except Exception as error: + metric_reader_error[metric_reader] = error + + if self._atexit_handler is not None: + unregister(self._atexit_handler) + self._atexit_handler = None + + if metric_reader_error: + metric_reader_error_string = "\n".join( + [ + f"{metric_reader.__class__.__name__}: {repr(error)}" + for metric_reader, error in metric_reader_error.items() + ] + ) + + # pylint: disable=broad-exception-raised + raise Exception( + ( + "MeterProvider.shutdown failed because the following " + "metric readers failed during shutdown:\n" + f"{metric_reader_error_string}" + ) + ) + + def get_meter( + self, + name: str, + version: Optional[str] = None, + schema_url: Optional[str] = None, + attributes: Optional[Attributes] = None, + ) -> Meter: + if self._disabled: + return NoOpMeter(name, version=version, schema_url=schema_url) + + if self._shutdown: + _logger.warning( + "A shutdown `MeterProvider` can not provide a `Meter`" + ) + return NoOpMeter(name, version=version, schema_url=schema_url) + + if not name: + _logger.warning("Meter name cannot be None or empty.") + return NoOpMeter(name, version=version, schema_url=schema_url) + + info = InstrumentationScope(name, version, schema_url, attributes) + with self._meter_lock: + if not self._meters.get(info): + # FIXME #2558 pass SDKConfig object to meter so that the meter + # has access to views. + self._meters[info] = Meter( + info, + self._measurement_consumer, + ) + return self._meters[info] |