aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py462
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py51
2 files changed, 513 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py
new file mode 100644
index 00000000..434dc745
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py
@@ -0,0 +1,462 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import abc
+import collections
+import enum
+import logging
+import os
+import sys
+import threading
+from os import environ, linesep
+from time import time_ns
+from typing import IO, Callable, Deque, List, Optional, Sequence
+
+from opentelemetry.context import (
+ _SUPPRESS_INSTRUMENTATION_KEY,
+ attach,
+ detach,
+ set_value,
+)
+from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
+from opentelemetry.sdk.environment_variables import (
+ OTEL_BLRP_EXPORT_TIMEOUT,
+ OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+ OTEL_BLRP_MAX_QUEUE_SIZE,
+ OTEL_BLRP_SCHEDULE_DELAY,
+)
+from opentelemetry.util._once import Once
+
+_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
+_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
+_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
+_DEFAULT_MAX_QUEUE_SIZE = 2048
+_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
+ "Unable to parse value for %s as integer. Defaulting to %s."
+)
+
+_logger = logging.getLogger(__name__)
+
+
+class LogExportResult(enum.Enum):
+ SUCCESS = 0
+ FAILURE = 1
+
+
+class LogExporter(abc.ABC):
+ """Interface for exporting logs.
+
+ Interface to be implemented by services that want to export logs received
+ in their own format.
+
+ To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
+ log processor.
+ """
+
+ @abc.abstractmethod
+ def export(self, batch: Sequence[LogData]):
+ """Exports a batch of logs.
+
+ Args:
+ batch: The list of `LogData` objects to be exported
+
+ Returns:
+ The result of the export
+ """
+
+ @abc.abstractmethod
+ def shutdown(self):
+ """Shuts down the exporter.
+
+ Called when the SDK is shut down.
+ """
+
+
+class ConsoleLogExporter(LogExporter):
+ """Implementation of :class:`LogExporter` that prints log records to the
+ console.
+
+ This class can be used for diagnostic purposes. It prints the exported
+ log records to the console STDOUT.
+ """
+
+ def __init__(
+ self,
+ out: IO = sys.stdout,
+ formatter: Callable[[LogRecord], str] = lambda record: record.to_json()
+ + linesep,
+ ):
+ self.out = out
+ self.formatter = formatter
+
+ def export(self, batch: Sequence[LogData]):
+ for data in batch:
+ self.out.write(self.formatter(data.log_record))
+ self.out.flush()
+ return LogExportResult.SUCCESS
+
+ def shutdown(self):
+ pass
+
+
+class SimpleLogRecordProcessor(LogRecordProcessor):
+ """This is an implementation of LogRecordProcessor which passes
+ received logs in the export-friendly LogData representation to the
+ configured LogExporter, as soon as they are emitted.
+ """
+
+ def __init__(self, exporter: LogExporter):
+ self._exporter = exporter
+ self._shutdown = False
+
+ def emit(self, log_data: LogData):
+ if self._shutdown:
+ _logger.warning("Processor is already shutdown, ignoring call")
+ return
+ token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+ try:
+ self._exporter.export((log_data,))
+ except Exception: # pylint: disable=broad-exception-caught
+ _logger.exception("Exception while exporting logs.")
+ detach(token)
+
+ def shutdown(self):
+ self._shutdown = True
+ self._exporter.shutdown()
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=no-self-use
+ return True
+
+
+class _FlushRequest:
+ __slots__ = ["event", "num_log_records"]
+
+ def __init__(self):
+ self.event = threading.Event()
+ self.num_log_records = 0
+
+
+_BSP_RESET_ONCE = Once()
+
+
+class BatchLogRecordProcessor(LogRecordProcessor):
+ """This is an implementation of LogRecordProcessor which creates batches of
+ received logs in the export-friendly LogData representation and
+ send to the configured LogExporter, as soon as they are emitted.
+
+ `BatchLogRecordProcessor` is configurable with the following environment
+ variables which correspond to constructor parameters:
+
+ - :envvar:`OTEL_BLRP_SCHEDULE_DELAY`
+ - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
+ - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
+ - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
+ """
+
+ _queue: Deque[LogData]
+ _flush_request: _FlushRequest | None
+ _log_records: List[LogData | None]
+
+ def __init__(
+ self,
+ exporter: LogExporter,
+ schedule_delay_millis: float | None = None,
+ max_export_batch_size: int | None = None,
+ export_timeout_millis: float | None = None,
+ max_queue_size: int | None = None,
+ ):
+ if max_queue_size is None:
+ max_queue_size = BatchLogRecordProcessor._default_max_queue_size()
+
+ if schedule_delay_millis is None:
+ schedule_delay_millis = (
+ BatchLogRecordProcessor._default_schedule_delay_millis()
+ )
+
+ if max_export_batch_size is None:
+ max_export_batch_size = (
+ BatchLogRecordProcessor._default_max_export_batch_size()
+ )
+
+ if export_timeout_millis is None:
+ export_timeout_millis = (
+ BatchLogRecordProcessor._default_export_timeout_millis()
+ )
+
+ BatchLogRecordProcessor._validate_arguments(
+ max_queue_size, schedule_delay_millis, max_export_batch_size
+ )
+
+ self._exporter = exporter
+ self._max_queue_size = max_queue_size
+ self._schedule_delay_millis = schedule_delay_millis
+ self._max_export_batch_size = max_export_batch_size
+ self._export_timeout_millis = export_timeout_millis
+ self._queue = collections.deque([], max_queue_size)
+ self._worker_thread = threading.Thread(
+ name="OtelBatchLogRecordProcessor",
+ target=self.worker,
+ daemon=True,
+ )
+ self._condition = threading.Condition(threading.Lock())
+ self._shutdown = False
+ self._flush_request = None
+ self._log_records = [None] * self._max_export_batch_size
+ self._worker_thread.start()
+ if hasattr(os, "register_at_fork"):
+ os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
+ self._pid = os.getpid()
+
+ def _at_fork_reinit(self):
+ self._condition = threading.Condition(threading.Lock())
+ self._queue.clear()
+ self._worker_thread = threading.Thread(
+ name="OtelBatchLogRecordProcessor",
+ target=self.worker,
+ daemon=True,
+ )
+ self._worker_thread.start()
+ self._pid = os.getpid()
+
+ def worker(self):
+ timeout = self._schedule_delay_millis / 1e3
+ flush_request: Optional[_FlushRequest] = None
+ while not self._shutdown:
+ with self._condition:
+ if self._shutdown:
+ # shutdown may have been called, avoid further processing
+ break
+ flush_request = self._get_and_unset_flush_request()
+ if (
+ len(self._queue) < self._max_export_batch_size
+ and flush_request is None
+ ):
+ self._condition.wait(timeout)
+
+ flush_request = self._get_and_unset_flush_request()
+ if not self._queue:
+ timeout = self._schedule_delay_millis / 1e3
+ self._notify_flush_request_finished(flush_request)
+ flush_request = None
+ continue
+ if self._shutdown:
+ break
+
+ start_ns = time_ns()
+ self._export(flush_request)
+ end_ns = time_ns()
+ # subtract the duration of this export call to the next timeout
+ timeout = self._schedule_delay_millis / 1e3 - (
+ (end_ns - start_ns) / 1e9
+ )
+
+ self._notify_flush_request_finished(flush_request)
+ flush_request = None
+
+ # there might have been a new flush request while export was running
+ # and before the done flag switched to true
+ with self._condition:
+ shutdown_flush_request = self._get_and_unset_flush_request()
+
+ # flush the remaining logs
+ self._drain_queue()
+ self._notify_flush_request_finished(flush_request)
+ self._notify_flush_request_finished(shutdown_flush_request)
+
+ def _export(self, flush_request: Optional[_FlushRequest] = None):
+ """Exports logs considering the given flush_request.
+
+ If flush_request is not None then logs are exported in batches
+ until the number of exported logs reached or exceeded the num of logs in
+ flush_request, otherwise exports at max max_export_batch_size logs.
+ """
+ if flush_request is None:
+ self._export_batch()
+ return
+
+ num_log_records = flush_request.num_log_records
+ while self._queue:
+ exported = self._export_batch()
+ num_log_records -= exported
+
+ if num_log_records <= 0:
+ break
+
+ def _export_batch(self) -> int:
+ """Exports at most max_export_batch_size logs and returns the number of
+ exported logs.
+ """
+ idx = 0
+ while idx < self._max_export_batch_size and self._queue:
+ record = self._queue.pop()
+ self._log_records[idx] = record
+ idx += 1
+ token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+ try:
+ self._exporter.export(self._log_records[:idx]) # type: ignore
+ except Exception: # pylint: disable=broad-exception-caught
+ _logger.exception("Exception while exporting logs.")
+ detach(token)
+
+ for index in range(idx):
+ self._log_records[index] = None
+ return idx
+
+ def _drain_queue(self):
+ """Export all elements until queue is empty.
+
+ Can only be called from the worker thread context because it invokes
+ `export` that is not thread safe.
+ """
+ while self._queue:
+ self._export_batch()
+
+ def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
+ flush_request = self._flush_request
+ self._flush_request = None
+ if flush_request is not None:
+ flush_request.num_log_records = len(self._queue)
+ return flush_request
+
+ @staticmethod
+ def _notify_flush_request_finished(
+ flush_request: Optional[_FlushRequest] = None,
+ ):
+ if flush_request is not None:
+ flush_request.event.set()
+
+ def _get_or_create_flush_request(self) -> _FlushRequest:
+ if self._flush_request is None:
+ self._flush_request = _FlushRequest()
+ return self._flush_request
+
+ def emit(self, log_data: LogData) -> None:
+ """Adds the `LogData` to queue and notifies the waiting threads
+ when size of queue reaches max_export_batch_size.
+ """
+ if self._shutdown:
+ return
+ if self._pid != os.getpid():
+ _BSP_RESET_ONCE.do_once(self._at_fork_reinit)
+
+ self._queue.appendleft(log_data)
+ if len(self._queue) >= self._max_export_batch_size:
+ with self._condition:
+ self._condition.notify()
+
+ def shutdown(self):
+ self._shutdown = True
+ with self._condition:
+ self._condition.notify_all()
+ self._worker_thread.join()
+ self._exporter.shutdown()
+
+ def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
+ if timeout_millis is None:
+ timeout_millis = self._export_timeout_millis
+ if self._shutdown:
+ return True
+
+ with self._condition:
+ flush_request = self._get_or_create_flush_request()
+ self._condition.notify_all()
+
+ ret = flush_request.event.wait(timeout_millis / 1e3)
+ if not ret:
+ _logger.warning("Timeout was exceeded in force_flush().")
+ return ret
+
+ @staticmethod
+ def _default_max_queue_size():
+ try:
+ return int(
+ environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE)
+ )
+ except ValueError:
+ _logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BLRP_MAX_QUEUE_SIZE,
+ _DEFAULT_MAX_QUEUE_SIZE,
+ )
+ return _DEFAULT_MAX_QUEUE_SIZE
+
+ @staticmethod
+ def _default_schedule_delay_millis():
+ try:
+ return int(
+ environ.get(
+ OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS
+ )
+ )
+ except ValueError:
+ _logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BLRP_SCHEDULE_DELAY,
+ _DEFAULT_SCHEDULE_DELAY_MILLIS,
+ )
+ return _DEFAULT_SCHEDULE_DELAY_MILLIS
+
+ @staticmethod
+ def _default_max_export_batch_size():
+ try:
+ return int(
+ environ.get(
+ OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+ _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+ )
+ )
+ except ValueError:
+ _logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+ _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+ )
+ return _DEFAULT_MAX_EXPORT_BATCH_SIZE
+
+ @staticmethod
+ def _default_export_timeout_millis():
+ try:
+ return int(
+ environ.get(
+ OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS
+ )
+ )
+ except ValueError:
+ _logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BLRP_EXPORT_TIMEOUT,
+ _DEFAULT_EXPORT_TIMEOUT_MILLIS,
+ )
+ return _DEFAULT_EXPORT_TIMEOUT_MILLIS
+
+ @staticmethod
+ def _validate_arguments(
+ max_queue_size, schedule_delay_millis, max_export_batch_size
+ ):
+ if max_queue_size <= 0:
+ raise ValueError("max_queue_size must be a positive integer.")
+
+ if schedule_delay_millis <= 0:
+ raise ValueError("schedule_delay_millis must be positive.")
+
+ if max_export_batch_size <= 0:
+ raise ValueError(
+ "max_export_batch_size must be a positive integer."
+ )
+
+ if max_export_batch_size > max_queue_size:
+ raise ValueError(
+ "max_export_batch_size must be less than or equal to max_queue_size."
+ )
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py
new file mode 100644
index 00000000..68cb6b73
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py
@@ -0,0 +1,51 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import typing
+
+from opentelemetry.sdk._logs import LogData
+from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
+
+
+class InMemoryLogExporter(LogExporter):
+ """Implementation of :class:`.LogExporter` that stores logs in memory.
+
+ This class can be used for testing purposes. It stores the exported logs
+ in a list in memory that can be retrieved using the
+ :func:`.get_finished_logs` method.
+ """
+
+ def __init__(self):
+ self._logs = []
+ self._lock = threading.Lock()
+ self._stopped = False
+
+ def clear(self) -> None:
+ with self._lock:
+ self._logs.clear()
+
+ def get_finished_logs(self) -> typing.Tuple[LogData, ...]:
+ with self._lock:
+ return tuple(self._logs)
+
+ def export(self, batch: typing.Sequence[LogData]) -> LogExportResult:
+ if self._stopped:
+ return LogExportResult.FAILURE
+ with self._lock:
+ self._logs.extend(batch)
+ return LogExportResult.SUCCESS
+
+ def shutdown(self) -> None:
+ self._stopped = True