about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py462
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py51
2 files changed, 513 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py
new file mode 100644
index 00000000..434dc745
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py
@@ -0,0 +1,462 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import abc
+import collections
+import enum
+import logging
+import os
+import sys
+import threading
+from os import environ, linesep
+from time import time_ns
+from typing import IO, Callable, Deque, List, Optional, Sequence
+
+from opentelemetry.context import (
+    _SUPPRESS_INSTRUMENTATION_KEY,
+    attach,
+    detach,
+    set_value,
+)
+from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
+from opentelemetry.sdk.environment_variables import (
+    OTEL_BLRP_EXPORT_TIMEOUT,
+    OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+    OTEL_BLRP_MAX_QUEUE_SIZE,
+    OTEL_BLRP_SCHEDULE_DELAY,
+)
+from opentelemetry.util._once import Once
+
+_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
+_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
+_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
+_DEFAULT_MAX_QUEUE_SIZE = 2048
+_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
+    "Unable to parse value for %s as integer. Defaulting to %s."
+)
+
+_logger = logging.getLogger(__name__)
+
+
+class LogExportResult(enum.Enum):
+    SUCCESS = 0
+    FAILURE = 1
+
+
+class LogExporter(abc.ABC):
+    """Interface for exporting logs.
+
+    Interface to be implemented by services that want to export logs received
+    in their own format.
+
+    To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
+    log processor.
+    """
+
+    @abc.abstractmethod
+    def export(self, batch: Sequence[LogData]):
+        """Exports a batch of logs.
+
+        Args:
+            batch: The list of `LogData` objects to be exported
+
+        Returns:
+            The result of the export
+        """
+
+    @abc.abstractmethod
+    def shutdown(self):
+        """Shuts down the exporter.
+
+        Called when the SDK is shut down.
+        """
+
+
+class ConsoleLogExporter(LogExporter):
+    """Implementation of :class:`LogExporter` that prints log records to the
+    console.
+
+    This class can be used for diagnostic purposes. It prints the exported
+    log records to the console STDOUT.
+    """
+
+    def __init__(
+        self,
+        out: IO = sys.stdout,
+        formatter: Callable[[LogRecord], str] = lambda record: record.to_json()
+        + linesep,
+    ):
+        self.out = out
+        self.formatter = formatter
+
+    def export(self, batch: Sequence[LogData]):
+        for data in batch:
+            self.out.write(self.formatter(data.log_record))
+        self.out.flush()
+        return LogExportResult.SUCCESS
+
+    def shutdown(self):
+        pass
+
+
+class SimpleLogRecordProcessor(LogRecordProcessor):
+    """This is an implementation of LogRecordProcessor which passes
+    received logs in the export-friendly LogData representation to the
+    configured LogExporter, as soon as they are emitted.
+    """
+
+    def __init__(self, exporter: LogExporter):
+        self._exporter = exporter
+        self._shutdown = False
+
+    def emit(self, log_data: LogData):
+        if self._shutdown:
+            _logger.warning("Processor is already shutdown, ignoring call")
+            return
+        token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+        try:
+            self._exporter.export((log_data,))
+        except Exception:  # pylint: disable=broad-exception-caught
+            _logger.exception("Exception while exporting logs.")
+        detach(token)
+
+    def shutdown(self):
+        self._shutdown = True
+        self._exporter.shutdown()
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:  # pylint: disable=no-self-use
+        return True
+
+
+class _FlushRequest:
+    __slots__ = ["event", "num_log_records"]
+
+    def __init__(self):
+        self.event = threading.Event()
+        self.num_log_records = 0
+
+
+_BSP_RESET_ONCE = Once()
+
+
+class BatchLogRecordProcessor(LogRecordProcessor):
+    """This is an implementation of LogRecordProcessor which creates batches of
+    received logs in the export-friendly LogData representation and
+    send to the configured LogExporter, as soon as they are emitted.
+
+    `BatchLogRecordProcessor` is configurable with the following environment
+    variables which correspond to constructor parameters:
+
+    - :envvar:`OTEL_BLRP_SCHEDULE_DELAY`
+    - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
+    - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
+    - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
+    """
+
+    _queue: Deque[LogData]
+    _flush_request: _FlushRequest | None
+    _log_records: List[LogData | None]
+
+    def __init__(
+        self,
+        exporter: LogExporter,
+        schedule_delay_millis: float | None = None,
+        max_export_batch_size: int | None = None,
+        export_timeout_millis: float | None = None,
+        max_queue_size: int | None = None,
+    ):
+        if max_queue_size is None:
+            max_queue_size = BatchLogRecordProcessor._default_max_queue_size()
+
+        if schedule_delay_millis is None:
+            schedule_delay_millis = (
+                BatchLogRecordProcessor._default_schedule_delay_millis()
+            )
+
+        if max_export_batch_size is None:
+            max_export_batch_size = (
+                BatchLogRecordProcessor._default_max_export_batch_size()
+            )
+
+        if export_timeout_millis is None:
+            export_timeout_millis = (
+                BatchLogRecordProcessor._default_export_timeout_millis()
+            )
+
+        BatchLogRecordProcessor._validate_arguments(
+            max_queue_size, schedule_delay_millis, max_export_batch_size
+        )
+
+        self._exporter = exporter
+        self._max_queue_size = max_queue_size
+        self._schedule_delay_millis = schedule_delay_millis
+        self._max_export_batch_size = max_export_batch_size
+        self._export_timeout_millis = export_timeout_millis
+        self._queue = collections.deque([], max_queue_size)
+        self._worker_thread = threading.Thread(
+            name="OtelBatchLogRecordProcessor",
+            target=self.worker,
+            daemon=True,
+        )
+        self._condition = threading.Condition(threading.Lock())
+        self._shutdown = False
+        self._flush_request = None
+        self._log_records = [None] * self._max_export_batch_size
+        self._worker_thread.start()
+        if hasattr(os, "register_at_fork"):
+            os.register_at_fork(after_in_child=self._at_fork_reinit)  # pylint: disable=protected-access
+        self._pid = os.getpid()
+
+    def _at_fork_reinit(self):
+        self._condition = threading.Condition(threading.Lock())
+        self._queue.clear()
+        self._worker_thread = threading.Thread(
+            name="OtelBatchLogRecordProcessor",
+            target=self.worker,
+            daemon=True,
+        )
+        self._worker_thread.start()
+        self._pid = os.getpid()
+
+    def worker(self):
+        timeout = self._schedule_delay_millis / 1e3
+        flush_request: Optional[_FlushRequest] = None
+        while not self._shutdown:
+            with self._condition:
+                if self._shutdown:
+                    # shutdown may have been called, avoid further processing
+                    break
+                flush_request = self._get_and_unset_flush_request()
+                if (
+                    len(self._queue) < self._max_export_batch_size
+                    and flush_request is None
+                ):
+                    self._condition.wait(timeout)
+
+                    flush_request = self._get_and_unset_flush_request()
+                    if not self._queue:
+                        timeout = self._schedule_delay_millis / 1e3
+                        self._notify_flush_request_finished(flush_request)
+                        flush_request = None
+                        continue
+                    if self._shutdown:
+                        break
+
+            start_ns = time_ns()
+            self._export(flush_request)
+            end_ns = time_ns()
+            # subtract the duration of this export call to the next timeout
+            timeout = self._schedule_delay_millis / 1e3 - (
+                (end_ns - start_ns) / 1e9
+            )
+
+            self._notify_flush_request_finished(flush_request)
+            flush_request = None
+
+        # there might have been a new flush request while export was running
+        # and before the done flag switched to true
+        with self._condition:
+            shutdown_flush_request = self._get_and_unset_flush_request()
+
+        # flush the remaining logs
+        self._drain_queue()
+        self._notify_flush_request_finished(flush_request)
+        self._notify_flush_request_finished(shutdown_flush_request)
+
+    def _export(self, flush_request: Optional[_FlushRequest] = None):
+        """Exports logs considering the given flush_request.
+
+        If flush_request is not None then logs are exported in batches
+        until the number of exported logs reached or exceeded the num of logs in
+        flush_request, otherwise exports at max max_export_batch_size logs.
+        """
+        if flush_request is None:
+            self._export_batch()
+            return
+
+        num_log_records = flush_request.num_log_records
+        while self._queue:
+            exported = self._export_batch()
+            num_log_records -= exported
+
+            if num_log_records <= 0:
+                break
+
+    def _export_batch(self) -> int:
+        """Exports at most max_export_batch_size logs and returns the number of
+        exported logs.
+        """
+        idx = 0
+        while idx < self._max_export_batch_size and self._queue:
+            record = self._queue.pop()
+            self._log_records[idx] = record
+            idx += 1
+        token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+        try:
+            self._exporter.export(self._log_records[:idx])  # type: ignore
+        except Exception:  # pylint: disable=broad-exception-caught
+            _logger.exception("Exception while exporting logs.")
+        detach(token)
+
+        for index in range(idx):
+            self._log_records[index] = None
+        return idx
+
+    def _drain_queue(self):
+        """Export all elements until queue is empty.
+
+        Can only be called from the worker thread context because it invokes
+        `export` that is not thread safe.
+        """
+        while self._queue:
+            self._export_batch()
+
+    def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
+        flush_request = self._flush_request
+        self._flush_request = None
+        if flush_request is not None:
+            flush_request.num_log_records = len(self._queue)
+        return flush_request
+
+    @staticmethod
+    def _notify_flush_request_finished(
+        flush_request: Optional[_FlushRequest] = None,
+    ):
+        if flush_request is not None:
+            flush_request.event.set()
+
+    def _get_or_create_flush_request(self) -> _FlushRequest:
+        if self._flush_request is None:
+            self._flush_request = _FlushRequest()
+        return self._flush_request
+
+    def emit(self, log_data: LogData) -> None:
+        """Adds the `LogData` to queue and notifies the waiting threads
+        when size of queue reaches max_export_batch_size.
+        """
+        if self._shutdown:
+            return
+        if self._pid != os.getpid():
+            _BSP_RESET_ONCE.do_once(self._at_fork_reinit)
+
+        self._queue.appendleft(log_data)
+        if len(self._queue) >= self._max_export_batch_size:
+            with self._condition:
+                self._condition.notify()
+
+    def shutdown(self):
+        self._shutdown = True
+        with self._condition:
+            self._condition.notify_all()
+        self._worker_thread.join()
+        self._exporter.shutdown()
+
+    def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
+        if timeout_millis is None:
+            timeout_millis = self._export_timeout_millis
+        if self._shutdown:
+            return True
+
+        with self._condition:
+            flush_request = self._get_or_create_flush_request()
+            self._condition.notify_all()
+
+        ret = flush_request.event.wait(timeout_millis / 1e3)
+        if not ret:
+            _logger.warning("Timeout was exceeded in force_flush().")
+        return ret
+
+    @staticmethod
+    def _default_max_queue_size():
+        try:
+            return int(
+                environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE)
+            )
+        except ValueError:
+            _logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BLRP_MAX_QUEUE_SIZE,
+                _DEFAULT_MAX_QUEUE_SIZE,
+            )
+            return _DEFAULT_MAX_QUEUE_SIZE
+
+    @staticmethod
+    def _default_schedule_delay_millis():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS
+                )
+            )
+        except ValueError:
+            _logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BLRP_SCHEDULE_DELAY,
+                _DEFAULT_SCHEDULE_DELAY_MILLIS,
+            )
+            return _DEFAULT_SCHEDULE_DELAY_MILLIS
+
+    @staticmethod
+    def _default_max_export_batch_size():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+                    _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+                )
+            )
+        except ValueError:
+            _logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+                _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+            )
+            return _DEFAULT_MAX_EXPORT_BATCH_SIZE
+
+    @staticmethod
+    def _default_export_timeout_millis():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS
+                )
+            )
+        except ValueError:
+            _logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BLRP_EXPORT_TIMEOUT,
+                _DEFAULT_EXPORT_TIMEOUT_MILLIS,
+            )
+            return _DEFAULT_EXPORT_TIMEOUT_MILLIS
+
+    @staticmethod
+    def _validate_arguments(
+        max_queue_size, schedule_delay_millis, max_export_batch_size
+    ):
+        if max_queue_size <= 0:
+            raise ValueError("max_queue_size must be a positive integer.")
+
+        if schedule_delay_millis <= 0:
+            raise ValueError("schedule_delay_millis must be positive.")
+
+        if max_export_batch_size <= 0:
+            raise ValueError(
+                "max_export_batch_size must be a positive integer."
+            )
+
+        if max_export_batch_size > max_queue_size:
+            raise ValueError(
+                "max_export_batch_size must be less than or equal to max_queue_size."
+            )
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py
new file mode 100644
index 00000000..68cb6b73
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py
@@ -0,0 +1,51 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import typing
+
+from opentelemetry.sdk._logs import LogData
+from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
+
+
+class InMemoryLogExporter(LogExporter):
+    """Implementation of :class:`.LogExporter` that stores logs in memory.
+
+    This class can be used for testing purposes. It stores the exported logs
+    in a list in memory that can be retrieved using the
+    :func:`.get_finished_logs` method.
+    """
+
+    def __init__(self):
+        self._logs = []
+        self._lock = threading.Lock()
+        self._stopped = False
+
+    def clear(self) -> None:
+        with self._lock:
+            self._logs.clear()
+
+    def get_finished_logs(self) -> typing.Tuple[LogData, ...]:
+        with self._lock:
+            return tuple(self._logs)
+
+    def export(self, batch: typing.Sequence[LogData]) -> LogExportResult:
+        if self._stopped:
+            return LogExportResult.FAILURE
+        with self._lock:
+            self._logs.extend(batch)
+        return LogExportResult.SUCCESS
+
+    def shutdown(self) -> None:
+        self._stopped = True