diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export')
2 files changed, 513 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py new file mode 100644 index 00000000..434dc745 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -0,0 +1,462 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import abc +import collections +import enum +import logging +import os +import sys +import threading +from os import environ, linesep +from time import time_ns +from typing import IO, Callable, Deque, List, Optional, Sequence + +from opentelemetry.context import ( + _SUPPRESS_INSTRUMENTATION_KEY, + attach, + detach, + set_value, +) +from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor +from opentelemetry.sdk.environment_variables import ( + OTEL_BLRP_EXPORT_TIMEOUT, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + OTEL_BLRP_MAX_QUEUE_SIZE, + OTEL_BLRP_SCHEDULE_DELAY, +) +from opentelemetry.util._once import Once + +_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000 +_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512 +_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000 +_DEFAULT_MAX_QUEUE_SIZE = 2048 +_ENV_VAR_INT_VALUE_ERROR_MESSAGE = ( + "Unable to parse value for %s as integer. Defaulting to %s." +) + +_logger = logging.getLogger(__name__) + + +class LogExportResult(enum.Enum): + SUCCESS = 0 + FAILURE = 1 + + +class LogExporter(abc.ABC): + """Interface for exporting logs. + + Interface to be implemented by services that want to export logs received + in their own format. + + To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a + log processor. + """ + + @abc.abstractmethod + def export(self, batch: Sequence[LogData]): + """Exports a batch of logs. + + Args: + batch: The list of `LogData` objects to be exported + + Returns: + The result of the export + """ + + @abc.abstractmethod + def shutdown(self): + """Shuts down the exporter. + + Called when the SDK is shut down. + """ + + +class ConsoleLogExporter(LogExporter): + """Implementation of :class:`LogExporter` that prints log records to the + console. + + This class can be used for diagnostic purposes. It prints the exported + log records to the console STDOUT. + """ + + def __init__( + self, + out: IO = sys.stdout, + formatter: Callable[[LogRecord], str] = lambda record: record.to_json() + + linesep, + ): + self.out = out + self.formatter = formatter + + def export(self, batch: Sequence[LogData]): + for data in batch: + self.out.write(self.formatter(data.log_record)) + self.out.flush() + return LogExportResult.SUCCESS + + def shutdown(self): + pass + + +class SimpleLogRecordProcessor(LogRecordProcessor): + """This is an implementation of LogRecordProcessor which passes + received logs in the export-friendly LogData representation to the + configured LogExporter, as soon as they are emitted. + """ + + def __init__(self, exporter: LogExporter): + self._exporter = exporter + self._shutdown = False + + def emit(self, log_data: LogData): + if self._shutdown: + _logger.warning("Processor is already shutdown, ignoring call") + return + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + self._exporter.export((log_data,)) + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + detach(token) + + def shutdown(self): + self._shutdown = True + self._exporter.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=no-self-use + return True + + +class _FlushRequest: + __slots__ = ["event", "num_log_records"] + + def __init__(self): + self.event = threading.Event() + self.num_log_records = 0 + + +_BSP_RESET_ONCE = Once() + + +class BatchLogRecordProcessor(LogRecordProcessor): + """This is an implementation of LogRecordProcessor which creates batches of + received logs in the export-friendly LogData representation and + send to the configured LogExporter, as soon as they are emitted. + + `BatchLogRecordProcessor` is configurable with the following environment + variables which correspond to constructor parameters: + + - :envvar:`OTEL_BLRP_SCHEDULE_DELAY` + - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` + - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` + - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` + """ + + _queue: Deque[LogData] + _flush_request: _FlushRequest | None + _log_records: List[LogData | None] + + def __init__( + self, + exporter: LogExporter, + schedule_delay_millis: float | None = None, + max_export_batch_size: int | None = None, + export_timeout_millis: float | None = None, + max_queue_size: int | None = None, + ): + if max_queue_size is None: + max_queue_size = BatchLogRecordProcessor._default_max_queue_size() + + if schedule_delay_millis is None: + schedule_delay_millis = ( + BatchLogRecordProcessor._default_schedule_delay_millis() + ) + + if max_export_batch_size is None: + max_export_batch_size = ( + BatchLogRecordProcessor._default_max_export_batch_size() + ) + + if export_timeout_millis is None: + export_timeout_millis = ( + BatchLogRecordProcessor._default_export_timeout_millis() + ) + + BatchLogRecordProcessor._validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ) + + self._exporter = exporter + self._max_queue_size = max_queue_size + self._schedule_delay_millis = schedule_delay_millis + self._max_export_batch_size = max_export_batch_size + self._export_timeout_millis = export_timeout_millis + self._queue = collections.deque([], max_queue_size) + self._worker_thread = threading.Thread( + name="OtelBatchLogRecordProcessor", + target=self.worker, + daemon=True, + ) + self._condition = threading.Condition(threading.Lock()) + self._shutdown = False + self._flush_request = None + self._log_records = [None] * self._max_export_batch_size + self._worker_thread.start() + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + self._pid = os.getpid() + + def _at_fork_reinit(self): + self._condition = threading.Condition(threading.Lock()) + self._queue.clear() + self._worker_thread = threading.Thread( + name="OtelBatchLogRecordProcessor", + target=self.worker, + daemon=True, + ) + self._worker_thread.start() + self._pid = os.getpid() + + def worker(self): + timeout = self._schedule_delay_millis / 1e3 + flush_request: Optional[_FlushRequest] = None + while not self._shutdown: + with self._condition: + if self._shutdown: + # shutdown may have been called, avoid further processing + break + flush_request = self._get_and_unset_flush_request() + if ( + len(self._queue) < self._max_export_batch_size + and flush_request is None + ): + self._condition.wait(timeout) + + flush_request = self._get_and_unset_flush_request() + if not self._queue: + timeout = self._schedule_delay_millis / 1e3 + self._notify_flush_request_finished(flush_request) + flush_request = None + continue + if self._shutdown: + break + + start_ns = time_ns() + self._export(flush_request) + end_ns = time_ns() + # subtract the duration of this export call to the next timeout + timeout = self._schedule_delay_millis / 1e3 - ( + (end_ns - start_ns) / 1e9 + ) + + self._notify_flush_request_finished(flush_request) + flush_request = None + + # there might have been a new flush request while export was running + # and before the done flag switched to true + with self._condition: + shutdown_flush_request = self._get_and_unset_flush_request() + + # flush the remaining logs + self._drain_queue() + self._notify_flush_request_finished(flush_request) + self._notify_flush_request_finished(shutdown_flush_request) + + def _export(self, flush_request: Optional[_FlushRequest] = None): + """Exports logs considering the given flush_request. + + If flush_request is not None then logs are exported in batches + until the number of exported logs reached or exceeded the num of logs in + flush_request, otherwise exports at max max_export_batch_size logs. + """ + if flush_request is None: + self._export_batch() + return + + num_log_records = flush_request.num_log_records + while self._queue: + exported = self._export_batch() + num_log_records -= exported + + if num_log_records <= 0: + break + + def _export_batch(self) -> int: + """Exports at most max_export_batch_size logs and returns the number of + exported logs. + """ + idx = 0 + while idx < self._max_export_batch_size and self._queue: + record = self._queue.pop() + self._log_records[idx] = record + idx += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + self._exporter.export(self._log_records[:idx]) # type: ignore + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + detach(token) + + for index in range(idx): + self._log_records[index] = None + return idx + + def _drain_queue(self): + """Export all elements until queue is empty. + + Can only be called from the worker thread context because it invokes + `export` that is not thread safe. + """ + while self._queue: + self._export_batch() + + def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]: + flush_request = self._flush_request + self._flush_request = None + if flush_request is not None: + flush_request.num_log_records = len(self._queue) + return flush_request + + @staticmethod + def _notify_flush_request_finished( + flush_request: Optional[_FlushRequest] = None, + ): + if flush_request is not None: + flush_request.event.set() + + def _get_or_create_flush_request(self) -> _FlushRequest: + if self._flush_request is None: + self._flush_request = _FlushRequest() + return self._flush_request + + def emit(self, log_data: LogData) -> None: + """Adds the `LogData` to queue and notifies the waiting threads + when size of queue reaches max_export_batch_size. + """ + if self._shutdown: + return + if self._pid != os.getpid(): + _BSP_RESET_ONCE.do_once(self._at_fork_reinit) + + self._queue.appendleft(log_data) + if len(self._queue) >= self._max_export_batch_size: + with self._condition: + self._condition.notify() + + def shutdown(self): + self._shutdown = True + with self._condition: + self._condition.notify_all() + self._worker_thread.join() + self._exporter.shutdown() + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + if timeout_millis is None: + timeout_millis = self._export_timeout_millis + if self._shutdown: + return True + + with self._condition: + flush_request = self._get_or_create_flush_request() + self._condition.notify_all() + + ret = flush_request.event.wait(timeout_millis / 1e3) + if not ret: + _logger.warning("Timeout was exceeded in force_flush().") + return ret + + @staticmethod + def _default_max_queue_size(): + try: + return int( + environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_MAX_QUEUE_SIZE, + _DEFAULT_MAX_QUEUE_SIZE, + ) + return _DEFAULT_MAX_QUEUE_SIZE + + @staticmethod + def _default_schedule_delay_millis(): + try: + return int( + environ.get( + OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_SCHEDULE_DELAY, + _DEFAULT_SCHEDULE_DELAY_MILLIS, + ) + return _DEFAULT_SCHEDULE_DELAY_MILLIS + + @staticmethod + def _default_max_export_batch_size(): + try: + return int( + environ.get( + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + return _DEFAULT_MAX_EXPORT_BATCH_SIZE + + @staticmethod + def _default_export_timeout_millis(): + try: + return int( + environ.get( + OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_EXPORT_TIMEOUT, + _DEFAULT_EXPORT_TIMEOUT_MILLIS, + ) + return _DEFAULT_EXPORT_TIMEOUT_MILLIS + + @staticmethod + def _validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ): + if max_queue_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + if max_export_batch_size <= 0: + raise ValueError( + "max_export_batch_size must be a positive integer." + ) + + if max_export_batch_size > max_queue_size: + raise ValueError( + "max_export_batch_size must be less than or equal to max_queue_size." + ) diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py new file mode 100644 index 00000000..68cb6b73 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py @@ -0,0 +1,51 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import typing + +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import LogExporter, LogExportResult + + +class InMemoryLogExporter(LogExporter): + """Implementation of :class:`.LogExporter` that stores logs in memory. + + This class can be used for testing purposes. It stores the exported logs + in a list in memory that can be retrieved using the + :func:`.get_finished_logs` method. + """ + + def __init__(self): + self._logs = [] + self._lock = threading.Lock() + self._stopped = False + + def clear(self) -> None: + with self._lock: + self._logs.clear() + + def get_finished_logs(self) -> typing.Tuple[LogData, ...]: + with self._lock: + return tuple(self._logs) + + def export(self, batch: typing.Sequence[LogData]) -> LogExportResult: + if self._stopped: + return LogExportResult.FAILURE + with self._lock: + self._logs.extend(batch) + return LogExportResult.SUCCESS + + def shutdown(self) -> None: + self._stopped = True |