about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py517
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py61
2 files changed, 578 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py
new file mode 100644
index 00000000..47d1769a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py
@@ -0,0 +1,517 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import collections
+import logging
+import os
+import sys
+import threading
+import typing
+from enum import Enum
+from os import environ, linesep
+from time import time_ns
+
+from opentelemetry.context import (
+    _SUPPRESS_INSTRUMENTATION_KEY,
+    Context,
+    attach,
+    detach,
+    set_value,
+)
+from opentelemetry.sdk.environment_variables import (
+    OTEL_BSP_EXPORT_TIMEOUT,
+    OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+    OTEL_BSP_MAX_QUEUE_SIZE,
+    OTEL_BSP_SCHEDULE_DELAY,
+)
+from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
+from opentelemetry.util._once import Once
+
+_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
+_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
+_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
+_DEFAULT_MAX_QUEUE_SIZE = 2048
+_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
+    "Unable to parse value for %s as integer. Defaulting to %s."
+)
+
+logger = logging.getLogger(__name__)
+
+
+class SpanExportResult(Enum):
+    SUCCESS = 0
+    FAILURE = 1
+
+
+class SpanExporter:
+    """Interface for exporting spans.
+
+    Interface to be implemented by services that want to export spans recorded
+    in their own format.
+
+    To export data this MUST be registered to the :class`opentelemetry.sdk.trace.Tracer` using a
+    `SimpleSpanProcessor` or a `BatchSpanProcessor`.
+    """
+
+    def export(
+        self, spans: typing.Sequence[ReadableSpan]
+    ) -> "SpanExportResult":
+        """Exports a batch of telemetry data.
+
+        Args:
+            spans: The list of `opentelemetry.trace.Span` objects to be exported
+
+        Returns:
+            The result of the export
+        """
+
+    def shutdown(self) -> None:
+        """Shuts down the exporter.
+
+        Called when the SDK is shut down.
+        """
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:
+        """Hint to ensure that the export of any spans the exporter has received
+        prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably
+        before returning from this method.
+        """
+
+
+class SimpleSpanProcessor(SpanProcessor):
+    """Simple SpanProcessor implementation.
+
+    SimpleSpanProcessor is an implementation of `SpanProcessor` that
+    passes ended spans directly to the configured `SpanExporter`.
+    """
+
+    def __init__(self, span_exporter: SpanExporter):
+        self.span_exporter = span_exporter
+
+    def on_start(
+        self, span: Span, parent_context: typing.Optional[Context] = None
+    ) -> None:
+        pass
+
+    def on_end(self, span: ReadableSpan) -> None:
+        if not span.context.trace_flags.sampled:
+            return
+        token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+        try:
+            self.span_exporter.export((span,))
+        # pylint: disable=broad-exception-caught
+        except Exception:
+            logger.exception("Exception while exporting Span.")
+        detach(token)
+
+    def shutdown(self) -> None:
+        self.span_exporter.shutdown()
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:
+        # pylint: disable=unused-argument
+        return True
+
+
+class _FlushRequest:
+    """Represents a request for the BatchSpanProcessor to flush spans."""
+
+    __slots__ = ["event", "num_spans"]
+
+    def __init__(self):
+        self.event = threading.Event()
+        self.num_spans = 0
+
+
+_BSP_RESET_ONCE = Once()
+
+
+class BatchSpanProcessor(SpanProcessor):
+    """Batch span processor implementation.
+
+    `BatchSpanProcessor` is an implementation of `SpanProcessor` that
+    batches ended spans and pushes them to the configured `SpanExporter`.
+
+    `BatchSpanProcessor` is configurable with the following environment
+    variables which correspond to constructor parameters:
+
+    - :envvar:`OTEL_BSP_SCHEDULE_DELAY`
+    - :envvar:`OTEL_BSP_MAX_QUEUE_SIZE`
+    - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
+    - :envvar:`OTEL_BSP_EXPORT_TIMEOUT`
+    """
+
+    def __init__(
+        self,
+        span_exporter: SpanExporter,
+        max_queue_size: int | None = None,
+        schedule_delay_millis: float | None = None,
+        max_export_batch_size: int | None = None,
+        export_timeout_millis: float | None = None,
+    ):
+        if max_queue_size is None:
+            max_queue_size = BatchSpanProcessor._default_max_queue_size()
+
+        if schedule_delay_millis is None:
+            schedule_delay_millis = (
+                BatchSpanProcessor._default_schedule_delay_millis()
+            )
+
+        if max_export_batch_size is None:
+            max_export_batch_size = (
+                BatchSpanProcessor._default_max_export_batch_size()
+            )
+
+        if export_timeout_millis is None:
+            export_timeout_millis = (
+                BatchSpanProcessor._default_export_timeout_millis()
+            )
+
+        BatchSpanProcessor._validate_arguments(
+            max_queue_size, schedule_delay_millis, max_export_batch_size
+        )
+
+        self.span_exporter = span_exporter
+        self.queue = collections.deque([], max_queue_size)  # type: typing.Deque[Span]
+        self.worker_thread = threading.Thread(
+            name="OtelBatchSpanProcessor", target=self.worker, daemon=True
+        )
+        self.condition = threading.Condition(threading.Lock())
+        self._flush_request = None  # type: typing.Optional[_FlushRequest]
+        self.schedule_delay_millis = schedule_delay_millis
+        self.max_export_batch_size = max_export_batch_size
+        self.max_queue_size = max_queue_size
+        self.export_timeout_millis = export_timeout_millis
+        self.done = False
+        # flag that indicates that spans are being dropped
+        self._spans_dropped = False
+        # precallocated list to send spans to exporter
+        self.spans_list = [None] * self.max_export_batch_size  # type: typing.List[typing.Optional[Span]]
+        self.worker_thread.start()
+        if hasattr(os, "register_at_fork"):
+            os.register_at_fork(after_in_child=self._at_fork_reinit)  # pylint: disable=protected-access
+        self._pid = os.getpid()
+
+    def on_start(
+        self, span: Span, parent_context: Context | None = None
+    ) -> None:
+        pass
+
+    def on_end(self, span: ReadableSpan) -> None:
+        if self.done:
+            logger.warning("Already shutdown, dropping span.")
+            return
+        if not span.context.trace_flags.sampled:
+            return
+        if self._pid != os.getpid():
+            _BSP_RESET_ONCE.do_once(self._at_fork_reinit)
+
+        if len(self.queue) == self.max_queue_size:
+            if not self._spans_dropped:
+                logger.warning("Queue is full, likely spans will be dropped.")
+                self._spans_dropped = True
+
+        self.queue.appendleft(span)
+
+        if len(self.queue) >= self.max_export_batch_size:
+            with self.condition:
+                self.condition.notify()
+
+    def _at_fork_reinit(self):
+        self.condition = threading.Condition(threading.Lock())
+        self.queue.clear()
+
+        # worker_thread is local to a process, only the thread that issued fork continues
+        # to exist. A new worker thread must be started in child process.
+        self.worker_thread = threading.Thread(
+            name="OtelBatchSpanProcessor", target=self.worker, daemon=True
+        )
+        self.worker_thread.start()
+        self._pid = os.getpid()
+
+    def worker(self):
+        timeout = self.schedule_delay_millis / 1e3
+        flush_request = None  # type: typing.Optional[_FlushRequest]
+        while not self.done:
+            with self.condition:
+                if self.done:
+                    # done flag may have changed, avoid waiting
+                    break
+                flush_request = self._get_and_unset_flush_request()
+                if (
+                    len(self.queue) < self.max_export_batch_size
+                    and flush_request is None
+                ):
+                    self.condition.wait(timeout)
+                    flush_request = self._get_and_unset_flush_request()
+                    if not self.queue:
+                        # spurious notification, let's wait again, reset timeout
+                        timeout = self.schedule_delay_millis / 1e3
+                        self._notify_flush_request_finished(flush_request)
+                        flush_request = None
+                        continue
+                    if self.done:
+                        # missing spans will be sent when calling flush
+                        break
+
+            # subtract the duration of this export call to the next timeout
+            start = time_ns()
+            self._export(flush_request)
+            end = time_ns()
+            duration = (end - start) / 1e9
+            timeout = self.schedule_delay_millis / 1e3 - duration
+
+            self._notify_flush_request_finished(flush_request)
+            flush_request = None
+
+        # there might have been a new flush request while export was running
+        # and before the done flag switched to true
+        with self.condition:
+            shutdown_flush_request = self._get_and_unset_flush_request()
+
+        # be sure that all spans are sent
+        self._drain_queue()
+        self._notify_flush_request_finished(flush_request)
+        self._notify_flush_request_finished(shutdown_flush_request)
+
+    def _get_and_unset_flush_request(
+        self,
+    ) -> typing.Optional[_FlushRequest]:
+        """Returns the current flush request and makes it invisible to the
+        worker thread for subsequent calls.
+        """
+        flush_request = self._flush_request
+        self._flush_request = None
+        if flush_request is not None:
+            flush_request.num_spans = len(self.queue)
+        return flush_request
+
+    @staticmethod
+    def _notify_flush_request_finished(
+        flush_request: typing.Optional[_FlushRequest],
+    ):
+        """Notifies the flush initiator(s) waiting on the given request/event
+        that the flush operation was finished.
+        """
+        if flush_request is not None:
+            flush_request.event.set()
+
+    def _get_or_create_flush_request(self) -> _FlushRequest:
+        """Either returns the current active flush event or creates a new one.
+
+        The flush event will be visible and read by the worker thread before an
+        export operation starts. Callers of a flush operation may wait on the
+        returned event to be notified when the flush/export operation was
+        finished.
+
+        This method is not thread-safe, i.e. callers need to take care about
+        synchronization/locking.
+        """
+        if self._flush_request is None:
+            self._flush_request = _FlushRequest()
+        return self._flush_request
+
+    def _export(self, flush_request: typing.Optional[_FlushRequest]):
+        """Exports spans considering the given flush_request.
+
+        In case of a given flush_requests spans are exported in batches until
+        the number of exported spans reached or exceeded the number of spans in
+        the flush request.
+        In no flush_request was given at most max_export_batch_size spans are
+        exported.
+        """
+        if not flush_request:
+            self._export_batch()
+            return
+
+        num_spans = flush_request.num_spans
+        while self.queue:
+            num_exported = self._export_batch()
+            num_spans -= num_exported
+
+            if num_spans <= 0:
+                break
+
+    def _export_batch(self) -> int:
+        """Exports at most max_export_batch_size spans and returns the number of
+        exported spans.
+        """
+        idx = 0
+        # currently only a single thread acts as consumer, so queue.pop() will
+        # not raise an exception
+        while idx < self.max_export_batch_size and self.queue:
+            self.spans_list[idx] = self.queue.pop()
+            idx += 1
+        token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+        try:
+            # Ignore type b/c the Optional[None]+slicing is too "clever"
+            # for mypy
+            self.span_exporter.export(self.spans_list[:idx])  # type: ignore
+        except Exception:  # pylint: disable=broad-exception-caught
+            logger.exception("Exception while exporting Span batch.")
+        detach(token)
+
+        # clean up list
+        for index in range(idx):
+            self.spans_list[index] = None
+        return idx
+
+    def _drain_queue(self):
+        """Export all elements until queue is empty.
+
+        Can only be called from the worker thread context because it invokes
+        `export` that is not thread safe.
+        """
+        while self.queue:
+            self._export_batch()
+
+    def force_flush(self, timeout_millis: int | None = None) -> bool:
+        if timeout_millis is None:
+            timeout_millis = self.export_timeout_millis
+
+        if self.done:
+            logger.warning("Already shutdown, ignoring call to force_flush().")
+            return True
+
+        with self.condition:
+            flush_request = self._get_or_create_flush_request()
+            # signal the worker thread to flush and wait for it to finish
+            self.condition.notify_all()
+
+        # wait for token to be processed
+        ret = flush_request.event.wait(timeout_millis / 1e3)
+        if not ret:
+            logger.warning("Timeout was exceeded in force_flush().")
+        return ret
+
+    def shutdown(self) -> None:
+        # signal the worker thread to finish and then wait for it
+        self.done = True
+        with self.condition:
+            self.condition.notify_all()
+        self.worker_thread.join()
+        self.span_exporter.shutdown()
+
+    @staticmethod
+    def _default_max_queue_size():
+        try:
+            return int(
+                environ.get(OTEL_BSP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE)
+            )
+        except ValueError:
+            logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BSP_MAX_QUEUE_SIZE,
+                _DEFAULT_MAX_QUEUE_SIZE,
+            )
+            return _DEFAULT_MAX_QUEUE_SIZE
+
+    @staticmethod
+    def _default_schedule_delay_millis():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BSP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS
+                )
+            )
+        except ValueError:
+            logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BSP_SCHEDULE_DELAY,
+                _DEFAULT_SCHEDULE_DELAY_MILLIS,
+            )
+            return _DEFAULT_SCHEDULE_DELAY_MILLIS
+
+    @staticmethod
+    def _default_max_export_batch_size():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+                    _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+                )
+            )
+        except ValueError:
+            logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+                _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+            )
+            return _DEFAULT_MAX_EXPORT_BATCH_SIZE
+
+    @staticmethod
+    def _default_export_timeout_millis():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BSP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS
+                )
+            )
+        except ValueError:
+            logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BSP_EXPORT_TIMEOUT,
+                _DEFAULT_EXPORT_TIMEOUT_MILLIS,
+            )
+            return _DEFAULT_EXPORT_TIMEOUT_MILLIS
+
+    @staticmethod
+    def _validate_arguments(
+        max_queue_size, schedule_delay_millis, max_export_batch_size
+    ):
+        if max_queue_size <= 0:
+            raise ValueError("max_queue_size must be a positive integer.")
+
+        if schedule_delay_millis <= 0:
+            raise ValueError("schedule_delay_millis must be positive.")
+
+        if max_export_batch_size <= 0:
+            raise ValueError(
+                "max_export_batch_size must be a positive integer."
+            )
+
+        if max_export_batch_size > max_queue_size:
+            raise ValueError(
+                "max_export_batch_size must be less than or equal to max_queue_size."
+            )
+
+
+class ConsoleSpanExporter(SpanExporter):
+    """Implementation of :class:`SpanExporter` that prints spans to the
+    console.
+
+    This class can be used for diagnostic purposes. It prints the exported
+    spans to the console STDOUT.
+    """
+
+    def __init__(
+        self,
+        service_name: str | None = None,
+        out: typing.IO = sys.stdout,
+        formatter: typing.Callable[
+            [ReadableSpan], str
+        ] = lambda span: span.to_json() + linesep,
+    ):
+        self.out = out
+        self.formatter = formatter
+        self.service_name = service_name
+
+    def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
+        for span in spans:
+            self.out.write(self.formatter(span))
+        self.out.flush()
+        return SpanExportResult.SUCCESS
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:
+        return True
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py
new file mode 100644
index 00000000..c28ecfd2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py
@@ -0,0 +1,61 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import typing
+
+from opentelemetry.sdk.trace import ReadableSpan
+from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
+
+
+class InMemorySpanExporter(SpanExporter):
+    """Implementation of :class:`.SpanExporter` that stores spans in memory.
+
+    This class can be used for testing purposes. It stores the exported spans
+    in a list in memory that can be retrieved using the
+    :func:`.get_finished_spans` method.
+    """
+
+    def __init__(self) -> None:
+        self._finished_spans: typing.List[ReadableSpan] = []
+        self._stopped = False
+        self._lock = threading.Lock()
+
+    def clear(self) -> None:
+        """Clear list of collected spans."""
+        with self._lock:
+            self._finished_spans.clear()
+
+    def get_finished_spans(self) -> typing.Tuple[ReadableSpan, ...]:
+        """Get list of collected spans."""
+        with self._lock:
+            return tuple(self._finished_spans)
+
+    def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
+        """Stores a list of spans in memory."""
+        if self._stopped:
+            return SpanExportResult.FAILURE
+        with self._lock:
+            self._finished_spans.extend(spans)
+        return SpanExportResult.SUCCESS
+
+    def shutdown(self) -> None:
+        """Shut downs the exporter.
+
+        Calls to export after the exporter has been shut down will fail.
+        """
+        self._stopped = True
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:
+        return True