about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py334
1 files changed, 334 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py
new file mode 100644
index 00000000..b01578f4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py
@@ -0,0 +1,334 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=too-many-ancestors, unused-import
+from __future__ import annotations
+
+from logging import getLogger
+from time import time_ns
+from typing import Generator, Iterable, List, Sequence, Union
+
+# This kind of import is needed to avoid Sphinx errors.
+import opentelemetry.sdk.metrics
+from opentelemetry.context import Context, get_current
+from opentelemetry.metrics import CallbackT
+from opentelemetry.metrics import Counter as APICounter
+from opentelemetry.metrics import Histogram as APIHistogram
+from opentelemetry.metrics import ObservableCounter as APIObservableCounter
+from opentelemetry.metrics import ObservableGauge as APIObservableGauge
+from opentelemetry.metrics import (
+    ObservableUpDownCounter as APIObservableUpDownCounter,
+)
+from opentelemetry.metrics import UpDownCounter as APIUpDownCounter
+from opentelemetry.metrics import _Gauge as APIGauge
+from opentelemetry.metrics._internal.instrument import (
+    CallbackOptions,
+    _MetricsHistogramAdvisory,
+)
+from opentelemetry.sdk.metrics._internal.measurement import Measurement
+from opentelemetry.sdk.util.instrumentation import InstrumentationScope
+
+_logger = getLogger(__name__)
+
+
+_ERROR_MESSAGE = (
+    "Expected ASCII string of maximum length 63 characters but got {}"
+)
+
+
+class _Synchronous:
+    def __init__(
+        self,
+        name: str,
+        instrumentation_scope: InstrumentationScope,
+        measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer",
+        unit: str = "",
+        description: str = "",
+    ):
+        # pylint: disable=no-member
+        result = self._check_name_unit_description(name, unit, description)
+
+        if result["name"] is None:
+            # pylint: disable=broad-exception-raised
+            raise Exception(_ERROR_MESSAGE.format(name))
+
+        if result["unit"] is None:
+            # pylint: disable=broad-exception-raised
+            raise Exception(_ERROR_MESSAGE.format(unit))
+
+        name = result["name"]
+        unit = result["unit"]
+        description = result["description"]
+
+        self.name = name.lower()
+        self.unit = unit
+        self.description = description
+        self.instrumentation_scope = instrumentation_scope
+        self._measurement_consumer = measurement_consumer
+        super().__init__(name, unit=unit, description=description)
+
+
+class _Asynchronous:
+    def __init__(
+        self,
+        name: str,
+        instrumentation_scope: InstrumentationScope,
+        measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer",
+        callbacks: Iterable[CallbackT] | None = None,
+        unit: str = "",
+        description: str = "",
+    ):
+        # pylint: disable=no-member
+        result = self._check_name_unit_description(name, unit, description)
+
+        if result["name"] is None:
+            # pylint: disable=broad-exception-raised
+            raise Exception(_ERROR_MESSAGE.format(name))
+
+        if result["unit"] is None:
+            # pylint: disable=broad-exception-raised
+            raise Exception(_ERROR_MESSAGE.format(unit))
+
+        name = result["name"]
+        unit = result["unit"]
+        description = result["description"]
+
+        self.name = name.lower()
+        self.unit = unit
+        self.description = description
+        self.instrumentation_scope = instrumentation_scope
+        self._measurement_consumer = measurement_consumer
+        super().__init__(name, callbacks, unit=unit, description=description)
+
+        self._callbacks: List[CallbackT] = []
+
+        if callbacks is not None:
+            for callback in callbacks:
+                if isinstance(callback, Generator):
+                    # advance generator to it's first yield
+                    next(callback)
+
+                    def inner(
+                        options: CallbackOptions,
+                        callback=callback,
+                    ) -> Iterable[Measurement]:
+                        try:
+                            return callback.send(options)
+                        except StopIteration:
+                            return []
+
+                    self._callbacks.append(inner)
+                else:
+                    self._callbacks.append(callback)
+
+    def callback(
+        self, callback_options: CallbackOptions
+    ) -> Iterable[Measurement]:
+        for callback in self._callbacks:
+            try:
+                for api_measurement in callback(callback_options):
+                    yield Measurement(
+                        api_measurement.value,
+                        time_unix_nano=time_ns(),
+                        instrument=self,
+                        context=api_measurement.context or get_current(),
+                        attributes=api_measurement.attributes,
+                    )
+            except Exception:  # pylint: disable=broad-exception-caught
+                _logger.exception(
+                    "Callback failed for instrument %s.", self.name
+                )
+
+
+class Counter(_Synchronous, APICounter):
+    def __new__(cls, *args, **kwargs):
+        if cls is Counter:
+            raise TypeError("Counter must be instantiated via a meter.")
+        return super().__new__(cls)
+
+    def add(
+        self,
+        amount: Union[int, float],
+        attributes: dict[str, str] | None = None,
+        context: Context | None = None,
+    ):
+        if amount < 0:
+            _logger.warning(
+                "Add amount must be non-negative on Counter %s.", self.name
+            )
+            return
+        time_unix_nano = time_ns()
+        self._measurement_consumer.consume_measurement(
+            Measurement(
+                amount,
+                time_unix_nano,
+                self,
+                context or get_current(),
+                attributes,
+            )
+        )
+
+
+class UpDownCounter(_Synchronous, APIUpDownCounter):
+    def __new__(cls, *args, **kwargs):
+        if cls is UpDownCounter:
+            raise TypeError("UpDownCounter must be instantiated via a meter.")
+        return super().__new__(cls)
+
+    def add(
+        self,
+        amount: Union[int, float],
+        attributes: dict[str, str] | None = None,
+        context: Context | None = None,
+    ):
+        time_unix_nano = time_ns()
+        self._measurement_consumer.consume_measurement(
+            Measurement(
+                amount,
+                time_unix_nano,
+                self,
+                context or get_current(),
+                attributes,
+            )
+        )
+
+
+class ObservableCounter(_Asynchronous, APIObservableCounter):
+    def __new__(cls, *args, **kwargs):
+        if cls is ObservableCounter:
+            raise TypeError(
+                "ObservableCounter must be instantiated via a meter."
+            )
+        return super().__new__(cls)
+
+
+class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter):
+    def __new__(cls, *args, **kwargs):
+        if cls is ObservableUpDownCounter:
+            raise TypeError(
+                "ObservableUpDownCounter must be instantiated via a meter."
+            )
+        return super().__new__(cls)
+
+
+class Histogram(_Synchronous, APIHistogram):
+    def __init__(
+        self,
+        name: str,
+        instrumentation_scope: InstrumentationScope,
+        measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer",
+        unit: str = "",
+        description: str = "",
+        explicit_bucket_boundaries_advisory: Sequence[float] | None = None,
+    ):
+        super().__init__(
+            name,
+            unit=unit,
+            description=description,
+            instrumentation_scope=instrumentation_scope,
+            measurement_consumer=measurement_consumer,
+        )
+        self._advisory = _MetricsHistogramAdvisory(
+            explicit_bucket_boundaries=explicit_bucket_boundaries_advisory
+        )
+
+    def __new__(cls, *args, **kwargs):
+        if cls is Histogram:
+            raise TypeError("Histogram must be instantiated via a meter.")
+        return super().__new__(cls)
+
+    def record(
+        self,
+        amount: Union[int, float],
+        attributes: dict[str, str] | None = None,
+        context: Context | None = None,
+    ):
+        if amount < 0:
+            _logger.warning(
+                "Record amount must be non-negative on Histogram %s.",
+                self.name,
+            )
+            return
+        time_unix_nano = time_ns()
+        self._measurement_consumer.consume_measurement(
+            Measurement(
+                amount,
+                time_unix_nano,
+                self,
+                context or get_current(),
+                attributes,
+            )
+        )
+
+
+class Gauge(_Synchronous, APIGauge):
+    def __new__(cls, *args, **kwargs):
+        if cls is Gauge:
+            raise TypeError("Gauge must be instantiated via a meter.")
+        return super().__new__(cls)
+
+    def set(
+        self,
+        amount: Union[int, float],
+        attributes: dict[str, str] | None = None,
+        context: Context | None = None,
+    ):
+        time_unix_nano = time_ns()
+        self._measurement_consumer.consume_measurement(
+            Measurement(
+                amount,
+                time_unix_nano,
+                self,
+                context or get_current(),
+                attributes,
+            )
+        )
+
+
+class ObservableGauge(_Asynchronous, APIObservableGauge):
+    def __new__(cls, *args, **kwargs):
+        if cls is ObservableGauge:
+            raise TypeError(
+                "ObservableGauge must be instantiated via a meter."
+            )
+        return super().__new__(cls)
+
+
+# Below classes exist to prevent the direct instantiation
+class _Counter(Counter):
+    pass
+
+
+class _UpDownCounter(UpDownCounter):
+    pass
+
+
+class _ObservableCounter(ObservableCounter):
+    pass
+
+
+class _ObservableUpDownCounter(ObservableUpDownCounter):
+    pass
+
+
+class _Histogram(Histogram):
+    pass
+
+
+class _Gauge(Gauge):
+    pass
+
+
+class _ObservableGauge(ObservableGauge):
+    pass