aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py517
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py61
2 files changed, 578 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py
new file mode 100644
index 00000000..47d1769a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py
@@ -0,0 +1,517 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import collections
+import logging
+import os
+import sys
+import threading
+import typing
+from enum import Enum
+from os import environ, linesep
+from time import time_ns
+
+from opentelemetry.context import (
+ _SUPPRESS_INSTRUMENTATION_KEY,
+ Context,
+ attach,
+ detach,
+ set_value,
+)
+from opentelemetry.sdk.environment_variables import (
+ OTEL_BSP_EXPORT_TIMEOUT,
+ OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+ OTEL_BSP_MAX_QUEUE_SIZE,
+ OTEL_BSP_SCHEDULE_DELAY,
+)
+from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
+from opentelemetry.util._once import Once
+
+_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
+_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
+_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
+_DEFAULT_MAX_QUEUE_SIZE = 2048
+_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
+ "Unable to parse value for %s as integer. Defaulting to %s."
+)
+
+logger = logging.getLogger(__name__)
+
+
+class SpanExportResult(Enum):
+ SUCCESS = 0
+ FAILURE = 1
+
+
+class SpanExporter:
+ """Interface for exporting spans.
+
+ Interface to be implemented by services that want to export spans recorded
+ in their own format.
+
+ To export data this MUST be registered to the :class`opentelemetry.sdk.trace.Tracer` using a
+ `SimpleSpanProcessor` or a `BatchSpanProcessor`.
+ """
+
+ def export(
+ self, spans: typing.Sequence[ReadableSpan]
+ ) -> "SpanExportResult":
+ """Exports a batch of telemetry data.
+
+ Args:
+ spans: The list of `opentelemetry.trace.Span` objects to be exported
+
+ Returns:
+ The result of the export
+ """
+
+ def shutdown(self) -> None:
+ """Shuts down the exporter.
+
+ Called when the SDK is shut down.
+ """
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Hint to ensure that the export of any spans the exporter has received
+ prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably
+ before returning from this method.
+ """
+
+
+class SimpleSpanProcessor(SpanProcessor):
+ """Simple SpanProcessor implementation.
+
+ SimpleSpanProcessor is an implementation of `SpanProcessor` that
+ passes ended spans directly to the configured `SpanExporter`.
+ """
+
+ def __init__(self, span_exporter: SpanExporter):
+ self.span_exporter = span_exporter
+
+ def on_start(
+ self, span: Span, parent_context: typing.Optional[Context] = None
+ ) -> None:
+ pass
+
+ def on_end(self, span: ReadableSpan) -> None:
+ if not span.context.trace_flags.sampled:
+ return
+ token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+ try:
+ self.span_exporter.export((span,))
+ # pylint: disable=broad-exception-caught
+ except Exception:
+ logger.exception("Exception while exporting Span.")
+ detach(token)
+
+ def shutdown(self) -> None:
+ self.span_exporter.shutdown()
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ # pylint: disable=unused-argument
+ return True
+
+
+class _FlushRequest:
+ """Represents a request for the BatchSpanProcessor to flush spans."""
+
+ __slots__ = ["event", "num_spans"]
+
+ def __init__(self):
+ self.event = threading.Event()
+ self.num_spans = 0
+
+
+_BSP_RESET_ONCE = Once()
+
+
+class BatchSpanProcessor(SpanProcessor):
+ """Batch span processor implementation.
+
+ `BatchSpanProcessor` is an implementation of `SpanProcessor` that
+ batches ended spans and pushes them to the configured `SpanExporter`.
+
+ `BatchSpanProcessor` is configurable with the following environment
+ variables which correspond to constructor parameters:
+
+ - :envvar:`OTEL_BSP_SCHEDULE_DELAY`
+ - :envvar:`OTEL_BSP_MAX_QUEUE_SIZE`
+ - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
+ - :envvar:`OTEL_BSP_EXPORT_TIMEOUT`
+ """
+
+ def __init__(
+ self,
+ span_exporter: SpanExporter,
+ max_queue_size: int | None = None,
+ schedule_delay_millis: float | None = None,
+ max_export_batch_size: int | None = None,
+ export_timeout_millis: float | None = None,
+ ):
+ if max_queue_size is None:
+ max_queue_size = BatchSpanProcessor._default_max_queue_size()
+
+ if schedule_delay_millis is None:
+ schedule_delay_millis = (
+ BatchSpanProcessor._default_schedule_delay_millis()
+ )
+
+ if max_export_batch_size is None:
+ max_export_batch_size = (
+ BatchSpanProcessor._default_max_export_batch_size()
+ )
+
+ if export_timeout_millis is None:
+ export_timeout_millis = (
+ BatchSpanProcessor._default_export_timeout_millis()
+ )
+
+ BatchSpanProcessor._validate_arguments(
+ max_queue_size, schedule_delay_millis, max_export_batch_size
+ )
+
+ self.span_exporter = span_exporter
+ self.queue = collections.deque([], max_queue_size) # type: typing.Deque[Span]
+ self.worker_thread = threading.Thread(
+ name="OtelBatchSpanProcessor", target=self.worker, daemon=True
+ )
+ self.condition = threading.Condition(threading.Lock())
+ self._flush_request = None # type: typing.Optional[_FlushRequest]
+ self.schedule_delay_millis = schedule_delay_millis
+ self.max_export_batch_size = max_export_batch_size
+ self.max_queue_size = max_queue_size
+ self.export_timeout_millis = export_timeout_millis
+ self.done = False
+ # flag that indicates that spans are being dropped
+ self._spans_dropped = False
+ # precallocated list to send spans to exporter
+ self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
+ self.worker_thread.start()
+ if hasattr(os, "register_at_fork"):
+ os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
+ self._pid = os.getpid()
+
+ def on_start(
+ self, span: Span, parent_context: Context | None = None
+ ) -> None:
+ pass
+
+ def on_end(self, span: ReadableSpan) -> None:
+ if self.done:
+ logger.warning("Already shutdown, dropping span.")
+ return
+ if not span.context.trace_flags.sampled:
+ return
+ if self._pid != os.getpid():
+ _BSP_RESET_ONCE.do_once(self._at_fork_reinit)
+
+ if len(self.queue) == self.max_queue_size:
+ if not self._spans_dropped:
+ logger.warning("Queue is full, likely spans will be dropped.")
+ self._spans_dropped = True
+
+ self.queue.appendleft(span)
+
+ if len(self.queue) >= self.max_export_batch_size:
+ with self.condition:
+ self.condition.notify()
+
+ def _at_fork_reinit(self):
+ self.condition = threading.Condition(threading.Lock())
+ self.queue.clear()
+
+ # worker_thread is local to a process, only the thread that issued fork continues
+ # to exist. A new worker thread must be started in child process.
+ self.worker_thread = threading.Thread(
+ name="OtelBatchSpanProcessor", target=self.worker, daemon=True
+ )
+ self.worker_thread.start()
+ self._pid = os.getpid()
+
+ def worker(self):
+ timeout = self.schedule_delay_millis / 1e3
+ flush_request = None # type: typing.Optional[_FlushRequest]
+ while not self.done:
+ with self.condition:
+ if self.done:
+ # done flag may have changed, avoid waiting
+ break
+ flush_request = self._get_and_unset_flush_request()
+ if (
+ len(self.queue) < self.max_export_batch_size
+ and flush_request is None
+ ):
+ self.condition.wait(timeout)
+ flush_request = self._get_and_unset_flush_request()
+ if not self.queue:
+ # spurious notification, let's wait again, reset timeout
+ timeout = self.schedule_delay_millis / 1e3
+ self._notify_flush_request_finished(flush_request)
+ flush_request = None
+ continue
+ if self.done:
+ # missing spans will be sent when calling flush
+ break
+
+ # subtract the duration of this export call to the next timeout
+ start = time_ns()
+ self._export(flush_request)
+ end = time_ns()
+ duration = (end - start) / 1e9
+ timeout = self.schedule_delay_millis / 1e3 - duration
+
+ self._notify_flush_request_finished(flush_request)
+ flush_request = None
+
+ # there might have been a new flush request while export was running
+ # and before the done flag switched to true
+ with self.condition:
+ shutdown_flush_request = self._get_and_unset_flush_request()
+
+ # be sure that all spans are sent
+ self._drain_queue()
+ self._notify_flush_request_finished(flush_request)
+ self._notify_flush_request_finished(shutdown_flush_request)
+
+ def _get_and_unset_flush_request(
+ self,
+ ) -> typing.Optional[_FlushRequest]:
+ """Returns the current flush request and makes it invisible to the
+ worker thread for subsequent calls.
+ """
+ flush_request = self._flush_request
+ self._flush_request = None
+ if flush_request is not None:
+ flush_request.num_spans = len(self.queue)
+ return flush_request
+
+ @staticmethod
+ def _notify_flush_request_finished(
+ flush_request: typing.Optional[_FlushRequest],
+ ):
+ """Notifies the flush initiator(s) waiting on the given request/event
+ that the flush operation was finished.
+ """
+ if flush_request is not None:
+ flush_request.event.set()
+
+ def _get_or_create_flush_request(self) -> _FlushRequest:
+ """Either returns the current active flush event or creates a new one.
+
+ The flush event will be visible and read by the worker thread before an
+ export operation starts. Callers of a flush operation may wait on the
+ returned event to be notified when the flush/export operation was
+ finished.
+
+ This method is not thread-safe, i.e. callers need to take care about
+ synchronization/locking.
+ """
+ if self._flush_request is None:
+ self._flush_request = _FlushRequest()
+ return self._flush_request
+
+ def _export(self, flush_request: typing.Optional[_FlushRequest]):
+ """Exports spans considering the given flush_request.
+
+ In case of a given flush_requests spans are exported in batches until
+ the number of exported spans reached or exceeded the number of spans in
+ the flush request.
+ In no flush_request was given at most max_export_batch_size spans are
+ exported.
+ """
+ if not flush_request:
+ self._export_batch()
+ return
+
+ num_spans = flush_request.num_spans
+ while self.queue:
+ num_exported = self._export_batch()
+ num_spans -= num_exported
+
+ if num_spans <= 0:
+ break
+
+ def _export_batch(self) -> int:
+ """Exports at most max_export_batch_size spans and returns the number of
+ exported spans.
+ """
+ idx = 0
+ # currently only a single thread acts as consumer, so queue.pop() will
+ # not raise an exception
+ while idx < self.max_export_batch_size and self.queue:
+ self.spans_list[idx] = self.queue.pop()
+ idx += 1
+ token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+ try:
+ # Ignore type b/c the Optional[None]+slicing is too "clever"
+ # for mypy
+ self.span_exporter.export(self.spans_list[:idx]) # type: ignore
+ except Exception: # pylint: disable=broad-exception-caught
+ logger.exception("Exception while exporting Span batch.")
+ detach(token)
+
+ # clean up list
+ for index in range(idx):
+ self.spans_list[index] = None
+ return idx
+
+ def _drain_queue(self):
+ """Export all elements until queue is empty.
+
+ Can only be called from the worker thread context because it invokes
+ `export` that is not thread safe.
+ """
+ while self.queue:
+ self._export_batch()
+
+ def force_flush(self, timeout_millis: int | None = None) -> bool:
+ if timeout_millis is None:
+ timeout_millis = self.export_timeout_millis
+
+ if self.done:
+ logger.warning("Already shutdown, ignoring call to force_flush().")
+ return True
+
+ with self.condition:
+ flush_request = self._get_or_create_flush_request()
+ # signal the worker thread to flush and wait for it to finish
+ self.condition.notify_all()
+
+ # wait for token to be processed
+ ret = flush_request.event.wait(timeout_millis / 1e3)
+ if not ret:
+ logger.warning("Timeout was exceeded in force_flush().")
+ return ret
+
+ def shutdown(self) -> None:
+ # signal the worker thread to finish and then wait for it
+ self.done = True
+ with self.condition:
+ self.condition.notify_all()
+ self.worker_thread.join()
+ self.span_exporter.shutdown()
+
+ @staticmethod
+ def _default_max_queue_size():
+ try:
+ return int(
+ environ.get(OTEL_BSP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE)
+ )
+ except ValueError:
+ logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BSP_MAX_QUEUE_SIZE,
+ _DEFAULT_MAX_QUEUE_SIZE,
+ )
+ return _DEFAULT_MAX_QUEUE_SIZE
+
+ @staticmethod
+ def _default_schedule_delay_millis():
+ try:
+ return int(
+ environ.get(
+ OTEL_BSP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS
+ )
+ )
+ except ValueError:
+ logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BSP_SCHEDULE_DELAY,
+ _DEFAULT_SCHEDULE_DELAY_MILLIS,
+ )
+ return _DEFAULT_SCHEDULE_DELAY_MILLIS
+
+ @staticmethod
+ def _default_max_export_batch_size():
+ try:
+ return int(
+ environ.get(
+ OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+ _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+ )
+ )
+ except ValueError:
+ logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+ _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+ )
+ return _DEFAULT_MAX_EXPORT_BATCH_SIZE
+
+ @staticmethod
+ def _default_export_timeout_millis():
+ try:
+ return int(
+ environ.get(
+ OTEL_BSP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS
+ )
+ )
+ except ValueError:
+ logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BSP_EXPORT_TIMEOUT,
+ _DEFAULT_EXPORT_TIMEOUT_MILLIS,
+ )
+ return _DEFAULT_EXPORT_TIMEOUT_MILLIS
+
+ @staticmethod
+ def _validate_arguments(
+ max_queue_size, schedule_delay_millis, max_export_batch_size
+ ):
+ if max_queue_size <= 0:
+ raise ValueError("max_queue_size must be a positive integer.")
+
+ if schedule_delay_millis <= 0:
+ raise ValueError("schedule_delay_millis must be positive.")
+
+ if max_export_batch_size <= 0:
+ raise ValueError(
+ "max_export_batch_size must be a positive integer."
+ )
+
+ if max_export_batch_size > max_queue_size:
+ raise ValueError(
+ "max_export_batch_size must be less than or equal to max_queue_size."
+ )
+
+
+class ConsoleSpanExporter(SpanExporter):
+ """Implementation of :class:`SpanExporter` that prints spans to the
+ console.
+
+ This class can be used for diagnostic purposes. It prints the exported
+ spans to the console STDOUT.
+ """
+
+ def __init__(
+ self,
+ service_name: str | None = None,
+ out: typing.IO = sys.stdout,
+ formatter: typing.Callable[
+ [ReadableSpan], str
+ ] = lambda span: span.to_json() + linesep,
+ ):
+ self.out = out
+ self.formatter = formatter
+ self.service_name = service_name
+
+ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
+ for span in spans:
+ self.out.write(self.formatter(span))
+ self.out.flush()
+ return SpanExportResult.SUCCESS
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ return True
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py
new file mode 100644
index 00000000..c28ecfd2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py
@@ -0,0 +1,61 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import typing
+
+from opentelemetry.sdk.trace import ReadableSpan
+from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
+
+
+class InMemorySpanExporter(SpanExporter):
+ """Implementation of :class:`.SpanExporter` that stores spans in memory.
+
+ This class can be used for testing purposes. It stores the exported spans
+ in a list in memory that can be retrieved using the
+ :func:`.get_finished_spans` method.
+ """
+
+ def __init__(self) -> None:
+ self._finished_spans: typing.List[ReadableSpan] = []
+ self._stopped = False
+ self._lock = threading.Lock()
+
+ def clear(self) -> None:
+ """Clear list of collected spans."""
+ with self._lock:
+ self._finished_spans.clear()
+
+ def get_finished_spans(self) -> typing.Tuple[ReadableSpan, ...]:
+ """Get list of collected spans."""
+ with self._lock:
+ return tuple(self._finished_spans)
+
+ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
+ """Stores a list of spans in memory."""
+ if self._stopped:
+ return SpanExportResult.FAILURE
+ with self._lock:
+ self._finished_spans.extend(spans)
+ return SpanExportResult.SUCCESS
+
+ def shutdown(self) -> None:
+ """Shut downs the exporter.
+
+ Calls to export after the exporter has been shut down will fail.
+ """
+ self._stopped = True
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ return True