about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py582
1 files changed, 582 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py
new file mode 100644
index 00000000..faa0959f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py
@@ -0,0 +1,582 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import weakref
+from atexit import register, unregister
+from logging import getLogger
+from os import environ
+from threading import Lock
+from time import time_ns
+from typing import Optional, Sequence
+
+# This kind of import is needed to avoid Sphinx errors.
+import opentelemetry.sdk.metrics
+from opentelemetry.metrics import Counter as APICounter
+from opentelemetry.metrics import Histogram as APIHistogram
+from opentelemetry.metrics import Meter as APIMeter
+from opentelemetry.metrics import MeterProvider as APIMeterProvider
+from opentelemetry.metrics import NoOpMeter
+from opentelemetry.metrics import ObservableCounter as APIObservableCounter
+from opentelemetry.metrics import ObservableGauge as APIObservableGauge
+from opentelemetry.metrics import (
+    ObservableUpDownCounter as APIObservableUpDownCounter,
+)
+from opentelemetry.metrics import UpDownCounter as APIUpDownCounter
+from opentelemetry.metrics import _Gauge as APIGauge
+from opentelemetry.sdk.environment_variables import (
+    OTEL_METRICS_EXEMPLAR_FILTER,
+    OTEL_SDK_DISABLED,
+)
+from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
+from opentelemetry.sdk.metrics._internal.exemplar import (
+    AlwaysOffExemplarFilter,
+    AlwaysOnExemplarFilter,
+    ExemplarFilter,
+    TraceBasedExemplarFilter,
+)
+from opentelemetry.sdk.metrics._internal.instrument import (
+    _Counter,
+    _Gauge,
+    _Histogram,
+    _ObservableCounter,
+    _ObservableGauge,
+    _ObservableUpDownCounter,
+    _UpDownCounter,
+)
+from opentelemetry.sdk.metrics._internal.measurement_consumer import (
+    MeasurementConsumer,
+    SynchronousMeasurementConsumer,
+)
+from opentelemetry.sdk.metrics._internal.sdk_configuration import (
+    SdkConfiguration,
+)
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.util.instrumentation import InstrumentationScope
+from opentelemetry.util._once import Once
+from opentelemetry.util.types import (
+    Attributes,
+)
+
+_logger = getLogger(__name__)
+
+
+class Meter(APIMeter):
+    """See `opentelemetry.metrics.Meter`."""
+
+    def __init__(
+        self,
+        instrumentation_scope: InstrumentationScope,
+        measurement_consumer: MeasurementConsumer,
+    ):
+        super().__init__(
+            name=instrumentation_scope.name,
+            version=instrumentation_scope.version,
+            schema_url=instrumentation_scope.schema_url,
+        )
+        self._instrumentation_scope = instrumentation_scope
+        self._measurement_consumer = measurement_consumer
+        self._instrument_id_instrument = {}
+        self._instrument_id_instrument_lock = Lock()
+
+    def create_counter(self, name, unit="", description="") -> APICounter:
+        status = self._register_instrument(name, _Counter, unit, description)
+
+        if status.conflict:
+            # FIXME #2558 go through all views here and check if this
+            # instrument registration conflict can be fixed. If it can be, do
+            # not log the following warning.
+            self._log_instrument_registration_conflict(
+                name,
+                APICounter.__name__,
+                unit,
+                description,
+                status,
+            )
+        if status.already_registered:
+            with self._instrument_id_instrument_lock:
+                return self._instrument_id_instrument[status.instrument_id]
+
+        instrument = _Counter(
+            name,
+            self._instrumentation_scope,
+            self._measurement_consumer,
+            unit,
+            description,
+        )
+
+        with self._instrument_id_instrument_lock:
+            self._instrument_id_instrument[status.instrument_id] = instrument
+            return instrument
+
+    def create_up_down_counter(
+        self, name, unit="", description=""
+    ) -> APIUpDownCounter:
+        status = self._register_instrument(
+            name, _UpDownCounter, unit, description
+        )
+
+        if status.conflict:
+            # FIXME #2558 go through all views here and check if this
+            # instrument registration conflict can be fixed. If it can be, do
+            # not log the following warning.
+            self._log_instrument_registration_conflict(
+                name,
+                APIUpDownCounter.__name__,
+                unit,
+                description,
+                status,
+            )
+        if status.already_registered:
+            with self._instrument_id_instrument_lock:
+                return self._instrument_id_instrument[status.instrument_id]
+
+        instrument = _UpDownCounter(
+            name,
+            self._instrumentation_scope,
+            self._measurement_consumer,
+            unit,
+            description,
+        )
+
+        with self._instrument_id_instrument_lock:
+            self._instrument_id_instrument[status.instrument_id] = instrument
+            return instrument
+
+    def create_observable_counter(
+        self,
+        name,
+        callbacks=None,
+        unit="",
+        description="",
+    ) -> APIObservableCounter:
+        status = self._register_instrument(
+            name, _ObservableCounter, unit, description
+        )
+
+        if status.conflict:
+            # FIXME #2558 go through all views here and check if this
+            # instrument registration conflict can be fixed. If it can be, do
+            # not log the following warning.
+            self._log_instrument_registration_conflict(
+                name,
+                APIObservableCounter.__name__,
+                unit,
+                description,
+                status,
+            )
+        if status.already_registered:
+            with self._instrument_id_instrument_lock:
+                return self._instrument_id_instrument[status.instrument_id]
+
+        instrument = _ObservableCounter(
+            name,
+            self._instrumentation_scope,
+            self._measurement_consumer,
+            callbacks,
+            unit,
+            description,
+        )
+
+        self._measurement_consumer.register_asynchronous_instrument(instrument)
+
+        with self._instrument_id_instrument_lock:
+            self._instrument_id_instrument[status.instrument_id] = instrument
+            return instrument
+
+    def create_histogram(
+        self,
+        name: str,
+        unit: str = "",
+        description: str = "",
+        *,
+        explicit_bucket_boundaries_advisory: Optional[Sequence[float]] = None,
+    ) -> APIHistogram:
+        if explicit_bucket_boundaries_advisory is not None:
+            invalid_advisory = False
+            if isinstance(explicit_bucket_boundaries_advisory, Sequence):
+                try:
+                    invalid_advisory = not (
+                        all(
+                            isinstance(e, (float, int))
+                            for e in explicit_bucket_boundaries_advisory
+                        )
+                    )
+                except (KeyError, TypeError):
+                    invalid_advisory = True
+            else:
+                invalid_advisory = True
+
+            if invalid_advisory:
+                explicit_bucket_boundaries_advisory = None
+                _logger.warning(
+                    "explicit_bucket_boundaries_advisory must be a sequence of numbers"
+                )
+
+        status = self._register_instrument(
+            name,
+            _Histogram,
+            unit,
+            description,
+            explicit_bucket_boundaries_advisory,
+        )
+
+        if status.conflict:
+            # FIXME #2558 go through all views here and check if this
+            # instrument registration conflict can be fixed. If it can be, do
+            # not log the following warning.
+            self._log_instrument_registration_conflict(
+                name,
+                APIHistogram.__name__,
+                unit,
+                description,
+                status,
+            )
+        if status.already_registered:
+            with self._instrument_id_instrument_lock:
+                return self._instrument_id_instrument[status.instrument_id]
+
+        instrument = _Histogram(
+            name,
+            self._instrumentation_scope,
+            self._measurement_consumer,
+            unit,
+            description,
+            explicit_bucket_boundaries_advisory,
+        )
+        with self._instrument_id_instrument_lock:
+            self._instrument_id_instrument[status.instrument_id] = instrument
+            return instrument
+
+    def create_gauge(self, name, unit="", description="") -> APIGauge:
+        status = self._register_instrument(name, _Gauge, unit, description)
+
+        if status.conflict:
+            # FIXME #2558 go through all views here and check if this
+            # instrument registration conflict can be fixed. If it can be, do
+            # not log the following warning.
+            self._log_instrument_registration_conflict(
+                name,
+                APIGauge.__name__,
+                unit,
+                description,
+                status,
+            )
+        if status.already_registered:
+            with self._instrument_id_instrument_lock:
+                return self._instrument_id_instrument[status.instrument_id]
+
+        instrument = _Gauge(
+            name,
+            self._instrumentation_scope,
+            self._measurement_consumer,
+            unit,
+            description,
+        )
+
+        with self._instrument_id_instrument_lock:
+            self._instrument_id_instrument[status.instrument_id] = instrument
+            return instrument
+
+    def create_observable_gauge(
+        self, name, callbacks=None, unit="", description=""
+    ) -> APIObservableGauge:
+        status = self._register_instrument(
+            name, _ObservableGauge, unit, description
+        )
+
+        if status.conflict:
+            # FIXME #2558 go through all views here and check if this
+            # instrument registration conflict can be fixed. If it can be, do
+            # not log the following warning.
+            self._log_instrument_registration_conflict(
+                name,
+                APIObservableGauge.__name__,
+                unit,
+                description,
+                status,
+            )
+        if status.already_registered:
+            with self._instrument_id_instrument_lock:
+                return self._instrument_id_instrument[status.instrument_id]
+
+        instrument = _ObservableGauge(
+            name,
+            self._instrumentation_scope,
+            self._measurement_consumer,
+            callbacks,
+            unit,
+            description,
+        )
+
+        self._measurement_consumer.register_asynchronous_instrument(instrument)
+
+        with self._instrument_id_instrument_lock:
+            self._instrument_id_instrument[status.instrument_id] = instrument
+            return instrument
+
+    def create_observable_up_down_counter(
+        self, name, callbacks=None, unit="", description=""
+    ) -> APIObservableUpDownCounter:
+        status = self._register_instrument(
+            name, _ObservableUpDownCounter, unit, description
+        )
+
+        if status.conflict:
+            # FIXME #2558 go through all views here and check if this
+            # instrument registration conflict can be fixed. If it can be, do
+            # not log the following warning.
+            self._log_instrument_registration_conflict(
+                name,
+                APIObservableUpDownCounter.__name__,
+                unit,
+                description,
+                status,
+            )
+        if status.already_registered:
+            with self._instrument_id_instrument_lock:
+                return self._instrument_id_instrument[status.instrument_id]
+
+        instrument = _ObservableUpDownCounter(
+            name,
+            self._instrumentation_scope,
+            self._measurement_consumer,
+            callbacks,
+            unit,
+            description,
+        )
+
+        self._measurement_consumer.register_asynchronous_instrument(instrument)
+
+        with self._instrument_id_instrument_lock:
+            self._instrument_id_instrument[status.instrument_id] = instrument
+            return instrument
+
+
+def _get_exemplar_filter(exemplar_filter: str) -> ExemplarFilter:
+    if exemplar_filter == "trace_based":
+        return TraceBasedExemplarFilter()
+    if exemplar_filter == "always_on":
+        return AlwaysOnExemplarFilter()
+    if exemplar_filter == "always_off":
+        return AlwaysOffExemplarFilter()
+    msg = f"Unknown exemplar filter '{exemplar_filter}'."
+    raise ValueError(msg)
+
+
+class MeterProvider(APIMeterProvider):
+    r"""See `opentelemetry.metrics.MeterProvider`.
+
+    Args:
+        metric_readers: Register metric readers to collect metrics from the SDK
+            on demand. Each :class:`opentelemetry.sdk.metrics.export.MetricReader` is
+            completely independent and will collect separate streams of
+            metrics. TODO: reference ``PeriodicExportingMetricReader`` usage with push
+            exporters here.
+        resource: The resource representing what the metrics emitted from the SDK pertain to.
+        shutdown_on_exit: If true, registers an `atexit` handler to call
+            `MeterProvider.shutdown`
+        views: The views to configure the metric output the SDK
+
+    By default, instruments which do not match any :class:`opentelemetry.sdk.metrics.view.View` (or if no :class:`opentelemetry.sdk.metrics.view.View`\ s
+    are provided) will report metrics with the default aggregation for the
+    instrument's kind. To disable instruments by default, configure a match-all
+    :class:`opentelemetry.sdk.metrics.view.View` with `DropAggregation` and then create :class:`opentelemetry.sdk.metrics.view.View`\ s to re-enable
+    individual instruments:
+
+    .. code-block:: python
+        :caption: Disable default views
+
+        MeterProvider(
+            views=[
+                View(instrument_name="*", aggregation=DropAggregation()),
+                View(instrument_name="mycounter"),
+            ],
+            # ...
+        )
+    """
+
+    _all_metric_readers_lock = Lock()
+    _all_metric_readers = weakref.WeakSet()
+
+    def __init__(
+        self,
+        metric_readers: Sequence[
+            "opentelemetry.sdk.metrics.export.MetricReader"
+        ] = (),
+        resource: Optional[Resource] = None,
+        exemplar_filter: Optional[ExemplarFilter] = None,
+        shutdown_on_exit: bool = True,
+        views: Sequence["opentelemetry.sdk.metrics.view.View"] = (),
+    ):
+        self._lock = Lock()
+        self._meter_lock = Lock()
+        self._atexit_handler = None
+        if resource is None:
+            resource = Resource.create({})
+        self._sdk_config = SdkConfiguration(
+            exemplar_filter=(
+                exemplar_filter
+                or _get_exemplar_filter(
+                    environ.get(OTEL_METRICS_EXEMPLAR_FILTER, "trace_based")
+                )
+            ),
+            resource=resource,
+            metric_readers=metric_readers,
+            views=views,
+        )
+        self._measurement_consumer = SynchronousMeasurementConsumer(
+            sdk_config=self._sdk_config
+        )
+        disabled = environ.get(OTEL_SDK_DISABLED, "")
+        self._disabled = disabled.lower().strip() == "true"
+
+        if shutdown_on_exit:
+            self._atexit_handler = register(self.shutdown)
+
+        self._meters = {}
+        self._shutdown_once = Once()
+        self._shutdown = False
+
+        for metric_reader in self._sdk_config.metric_readers:
+            with self._all_metric_readers_lock:
+                if metric_reader in self._all_metric_readers:
+                    # pylint: disable=broad-exception-raised
+                    raise Exception(
+                        f"MetricReader {metric_reader} has been registered "
+                        "already in other MeterProvider instance"
+                    )
+
+                self._all_metric_readers.add(metric_reader)
+
+            metric_reader._set_collect_callback(
+                self._measurement_consumer.collect
+            )
+
+    def force_flush(self, timeout_millis: float = 10_000) -> bool:
+        deadline_ns = time_ns() + timeout_millis * 10**6
+
+        metric_reader_error = {}
+
+        for metric_reader in self._sdk_config.metric_readers:
+            current_ts = time_ns()
+            try:
+                if current_ts >= deadline_ns:
+                    raise MetricsTimeoutError(
+                        "Timed out while flushing metric readers"
+                    )
+                metric_reader.force_flush(
+                    timeout_millis=(deadline_ns - current_ts) / 10**6
+                )
+
+            # pylint: disable=broad-exception-caught
+            except Exception as error:
+                metric_reader_error[metric_reader] = error
+
+        if metric_reader_error:
+            metric_reader_error_string = "\n".join(
+                [
+                    f"{metric_reader.__class__.__name__}: {repr(error)}"
+                    for metric_reader, error in metric_reader_error.items()
+                ]
+            )
+
+            # pylint: disable=broad-exception-raised
+            raise Exception(
+                "MeterProvider.force_flush failed because the following "
+                "metric readers failed during collect:\n"
+                f"{metric_reader_error_string}"
+            )
+        return True
+
+    def shutdown(self, timeout_millis: float = 30_000):
+        deadline_ns = time_ns() + timeout_millis * 10**6
+
+        def _shutdown():
+            self._shutdown = True
+
+        did_shutdown = self._shutdown_once.do_once(_shutdown)
+
+        if not did_shutdown:
+            _logger.warning("shutdown can only be called once")
+            return
+
+        metric_reader_error = {}
+
+        for metric_reader in self._sdk_config.metric_readers:
+            current_ts = time_ns()
+            try:
+                if current_ts >= deadline_ns:
+                    # pylint: disable=broad-exception-raised
+                    raise Exception(
+                        "Didn't get to execute, deadline already exceeded"
+                    )
+                metric_reader.shutdown(
+                    timeout_millis=(deadline_ns - current_ts) / 10**6
+                )
+
+            # pylint: disable=broad-exception-caught
+            except Exception as error:
+                metric_reader_error[metric_reader] = error
+
+        if self._atexit_handler is not None:
+            unregister(self._atexit_handler)
+            self._atexit_handler = None
+
+        if metric_reader_error:
+            metric_reader_error_string = "\n".join(
+                [
+                    f"{metric_reader.__class__.__name__}: {repr(error)}"
+                    for metric_reader, error in metric_reader_error.items()
+                ]
+            )
+
+            # pylint: disable=broad-exception-raised
+            raise Exception(
+                (
+                    "MeterProvider.shutdown failed because the following "
+                    "metric readers failed during shutdown:\n"
+                    f"{metric_reader_error_string}"
+                )
+            )
+
+    def get_meter(
+        self,
+        name: str,
+        version: Optional[str] = None,
+        schema_url: Optional[str] = None,
+        attributes: Optional[Attributes] = None,
+    ) -> Meter:
+        if self._disabled:
+            return NoOpMeter(name, version=version, schema_url=schema_url)
+
+        if self._shutdown:
+            _logger.warning(
+                "A shutdown `MeterProvider` can not provide a `Meter`"
+            )
+            return NoOpMeter(name, version=version, schema_url=schema_url)
+
+        if not name:
+            _logger.warning("Meter name cannot be None or empty.")
+            return NoOpMeter(name, version=version, schema_url=schema_url)
+
+        info = InstrumentationScope(name, version, schema_url, attributes)
+        with self._meter_lock:
+            if not self._meters.get(info):
+                # FIXME #2558 pass SDKConfig object to meter so that the meter
+                # has access to views.
+                self._meters[info] = Meter(
+                    info,
+                    self._measurement_consumer,
+                )
+            return self._meters[info]