diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal')
3 files changed, 1225 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py new file mode 100644 index 00000000..302ca1ed --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py @@ -0,0 +1,712 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import abc +import atexit +import concurrent.futures +import json +import logging +import threading +import traceback +import warnings +from os import environ +from threading import Lock +from time import time_ns +from typing import Any, Callable, Tuple, Union # noqa + +from opentelemetry._logs import Logger as APILogger +from opentelemetry._logs import LoggerProvider as APILoggerProvider +from opentelemetry._logs import LogRecord as APILogRecord +from opentelemetry._logs import ( + NoOpLogger, + SeverityNumber, + get_logger, + get_logger_provider, + std_to_otel, +) +from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk.environment_variables import ( + OTEL_ATTRIBUTE_COUNT_LIMIT, + OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, + OTEL_SDK_DISABLED, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util import ns_to_iso_str +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import ( + format_span_id, + format_trace_id, + get_current_span, +) +from opentelemetry.trace.span import TraceFlags +from opentelemetry.util.types import AnyValue, Attributes + +_logger = logging.getLogger(__name__) + +_DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT = 128 +_ENV_VALUE_UNSET = "" + + +class LogDroppedAttributesWarning(UserWarning): + """Custom warning to indicate dropped log attributes due to limits. + + This class is used to filter and handle these specific warnings separately + from other warnings, ensuring that they are only shown once without + interfering with default user warnings. + """ + + +warnings.simplefilter("once", LogDroppedAttributesWarning) + + +class LogLimits: + """This class is based on a SpanLimits class in the Tracing module. + + This class represents the limits that should be enforced on recorded data such as events, links, attributes etc. + + This class does not enforce any limits itself. It only provides a way to read limits from env, + default values and from user provided arguments. + + All limit arguments must be either a non-negative integer, ``None`` or ``LogLimits.UNSET``. + + - All limit arguments are optional. + - If a limit argument is not set, the class will try to read its value from the corresponding + environment variable. + - If the environment variable is not set, the default value, if any, will be used. + + Limit precedence: + + - If a model specific limit is set, it will be used. + - Else if the corresponding global limit is set, it will be used. + - Else if the model specific limit has a default value, the default value will be used. + - Else if the global limit has a default value, the default value will be used. + + Args: + max_attributes: Maximum number of attributes that can be added to a span, event, and link. + Environment variable: ``OTEL_ATTRIBUTE_COUNT_LIMIT`` + Default: {_DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT} + max_attribute_length: Maximum length an attribute value can have. Values longer than + the specified length will be truncated. + """ + + UNSET = -1 + + def __init__( + self, + max_attributes: int | None = None, + max_attribute_length: int | None = None, + ): + # attribute count + global_max_attributes = self._from_env_if_absent( + max_attributes, OTEL_ATTRIBUTE_COUNT_LIMIT + ) + self.max_attributes = ( + global_max_attributes + if global_max_attributes is not None + else _DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT + ) + + # attribute length + self.max_attribute_length = self._from_env_if_absent( + max_attribute_length, + OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, + ) + + def __repr__(self): + return f"{type(self).__name__}(max_attributes={self.max_attributes}, max_attribute_length={self.max_attribute_length})" + + @classmethod + def _from_env_if_absent( + cls, value: int | None, env_var: str, default: int | None = None + ) -> int | None: + if value == cls.UNSET: + return None + + err_msg = "{} must be a non-negative integer but got {}" + + # if no value is provided for the limit, try to load it from env + if value is None: + # return default value if env var is not set + if env_var not in environ: + return default + + str_value = environ.get(env_var, "").strip().lower() + if str_value == _ENV_VALUE_UNSET: + return None + + try: + value = int(str_value) + except ValueError: + raise ValueError(err_msg.format(env_var, str_value)) + + if value < 0: + raise ValueError(err_msg.format(env_var, value)) + return value + + +_UnsetLogLimits = LogLimits( + max_attributes=LogLimits.UNSET, + max_attribute_length=LogLimits.UNSET, +) + + +class LogRecord(APILogRecord): + """A LogRecord instance represents an event being logged. + + LogRecord instances are created and emitted via `Logger` + every time something is logged. They contain all the information + pertinent to the event being logged. + """ + + def __init__( + self, + timestamp: int | None = None, + observed_timestamp: int | None = None, + trace_id: int | None = None, + span_id: int | None = None, + trace_flags: TraceFlags | None = None, + severity_text: str | None = None, + severity_number: SeverityNumber | None = None, + body: AnyValue | None = None, + resource: Resource | None = None, + attributes: Attributes | None = None, + limits: LogLimits | None = _UnsetLogLimits, + ): + super().__init__( + **{ + "timestamp": timestamp, + "observed_timestamp": observed_timestamp, + "trace_id": trace_id, + "span_id": span_id, + "trace_flags": trace_flags, + "severity_text": severity_text, + "severity_number": severity_number, + "body": body, + "attributes": BoundedAttributes( + maxlen=limits.max_attributes, + attributes=attributes if bool(attributes) else None, + immutable=False, + max_value_len=limits.max_attribute_length, + ), + } + ) + self.resource = ( + resource if isinstance(resource, Resource) else Resource.create({}) + ) + if self.dropped_attributes > 0: + warnings.warn( + "Log record attributes were dropped due to limits", + LogDroppedAttributesWarning, + stacklevel=2, + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, LogRecord): + return NotImplemented + return self.__dict__ == other.__dict__ + + def to_json(self, indent: int | None = 4) -> str: + return json.dumps( + { + "body": self.body, + "severity_number": self.severity_number.value + if self.severity_number is not None + else None, + "severity_text": self.severity_text, + "attributes": ( + dict(self.attributes) if bool(self.attributes) else None + ), + "dropped_attributes": self.dropped_attributes, + "timestamp": ns_to_iso_str(self.timestamp), + "observed_timestamp": ns_to_iso_str(self.observed_timestamp), + "trace_id": ( + f"0x{format_trace_id(self.trace_id)}" + if self.trace_id is not None + else "" + ), + "span_id": ( + f"0x{format_span_id(self.span_id)}" + if self.span_id is not None + else "" + ), + "trace_flags": self.trace_flags, + "resource": json.loads(self.resource.to_json()), + }, + indent=indent, + ) + + @property + def dropped_attributes(self) -> int: + if self.attributes: + return self.attributes.dropped + return 0 + + +class LogData: + """Readable LogRecord data plus associated InstrumentationLibrary.""" + + def __init__( + self, + log_record: LogRecord, + instrumentation_scope: InstrumentationScope, + ): + self.log_record = log_record + self.instrumentation_scope = instrumentation_scope + + +class LogRecordProcessor(abc.ABC): + """Interface to hook the log record emitting action. + + Log processors can be registered directly using + :func:`LoggerProvider.add_log_record_processor` and they are invoked + in the same order as they were registered. + """ + + @abc.abstractmethod + def emit(self, log_data: LogData): + """Emits the `LogData`""" + + @abc.abstractmethod + def shutdown(self): + """Called when a :class:`opentelemetry.sdk._logs.Logger` is shutdown""" + + @abc.abstractmethod + def force_flush(self, timeout_millis: int = 30000): + """Export all the received logs to the configured Exporter that have not yet + been exported. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + False if the timeout is exceeded, True otherwise. + """ + + +# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved +# pylint:disable=no-member +class SynchronousMultiLogRecordProcessor(LogRecordProcessor): + """Implementation of class:`LogRecordProcessor` that forwards all received + events to a list of log processors sequentially. + + The underlying log processors are called in sequential order as they were + added. + """ + + def __init__(self): + # use a tuple to avoid race conditions when adding a new log and + # iterating through it on "emit". + self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...] + self._lock = threading.Lock() + + def add_log_record_processor( + self, log_record_processor: LogRecordProcessor + ) -> None: + """Adds a Logprocessor to the list of log processors handled by this instance""" + with self._lock: + self._log_record_processors += (log_record_processor,) + + def emit(self, log_data: LogData) -> None: + for lp in self._log_record_processors: + lp.emit(log_data) + + def shutdown(self) -> None: + """Shutdown the log processors one by one""" + for lp in self._log_record_processors: + lp.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors one by one + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. If the first n log processors exceeded the timeout + then remaining log processors will not be flushed. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + deadline_ns = time_ns() + timeout_millis * 1000000 + for lp in self._log_record_processors: + current_ts = time_ns() + if current_ts >= deadline_ns: + return False + + if not lp.force_flush((deadline_ns - current_ts) // 1000000): + return False + + return True + + +class ConcurrentMultiLogRecordProcessor(LogRecordProcessor): + """Implementation of :class:`LogRecordProcessor` that forwards all received + events to a list of log processors in parallel. + + Calls to the underlying log processors are forwarded in parallel by + submitting them to a thread pool executor and waiting until each log + processor finished its work. + + Args: + max_workers: The number of threads managed by the thread pool executor + and thus defining how many log processors can work in parallel. + """ + + def __init__(self, max_workers: int = 2): + # use a tuple to avoid race conditions when adding a new log and + # iterating through it on "emit". + self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...] + self._lock = threading.Lock() + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) + + def add_log_record_processor( + self, log_record_processor: LogRecordProcessor + ): + with self._lock: + self._log_record_processors += (log_record_processor,) + + def _submit_and_wait( + self, + func: Callable[[LogRecordProcessor], Callable[..., None]], + *args: Any, + **kwargs: Any, + ): + futures = [] + for lp in self._log_record_processors: + future = self._executor.submit(func(lp), *args, **kwargs) + futures.append(future) + for future in futures: + future.result() + + def emit(self, log_data: LogData): + self._submit_and_wait(lambda lp: lp.emit, log_data) + + def shutdown(self): + self._submit_and_wait(lambda lp: lp.shutdown) + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors in parallel. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + futures = [] + for lp in self._log_record_processors: + future = self._executor.submit(lp.force_flush, timeout_millis) + futures.append(future) + + done_futures, not_done_futures = concurrent.futures.wait( + futures, timeout_millis / 1e3 + ) + + if not_done_futures: + return False + + for future in done_futures: + if not future.result(): + return False + + return True + + +# skip natural LogRecord attributes +# http://docs.python.org/library/logging.html#logrecord-attributes +_RESERVED_ATTRS = frozenset( + ( + "asctime", + "args", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "getMessage", + "message", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", + "taskName", + ) +) + + +class LoggingHandler(logging.Handler): + """A handler class which writes logging records, in OTLP format, to + a network destination or file. Supports signals from the `logging` module. + https://docs.python.org/3/library/logging.html + """ + + def __init__( + self, + level=logging.NOTSET, + logger_provider=None, + ) -> None: + super().__init__(level=level) + self._logger_provider = logger_provider or get_logger_provider() + + @staticmethod + def _get_attributes(record: logging.LogRecord) -> Attributes: + attributes = { + k: v for k, v in vars(record).items() if k not in _RESERVED_ATTRS + } + + # Add standard code attributes for logs. + attributes[SpanAttributes.CODE_FILEPATH] = record.pathname + attributes[SpanAttributes.CODE_FUNCTION] = record.funcName + attributes[SpanAttributes.CODE_LINENO] = record.lineno + + if record.exc_info: + exctype, value, tb = record.exc_info + if exctype is not None: + attributes[SpanAttributes.EXCEPTION_TYPE] = exctype.__name__ + if value is not None and value.args: + attributes[SpanAttributes.EXCEPTION_MESSAGE] = str( + value.args[0] + ) + if tb is not None: + # https://github.com/open-telemetry/opentelemetry-specification/blob/9fa7c656b26647b27e485a6af7e38dc716eba98a/specification/trace/semantic_conventions/exceptions.md#stacktrace-representation + attributes[SpanAttributes.EXCEPTION_STACKTRACE] = "".join( + traceback.format_exception(*record.exc_info) + ) + return attributes + + def _translate(self, record: logging.LogRecord) -> LogRecord: + timestamp = int(record.created * 1e9) + observered_timestamp = time_ns() + span_context = get_current_span().get_span_context() + attributes = self._get_attributes(record) + severity_number = std_to_otel(record.levelno) + if self.formatter: + body = self.format(record) + else: + # `record.getMessage()` uses `record.msg` as a template to format + # `record.args` into. There is a special case in `record.getMessage()` + # where it will only attempt formatting if args are provided, + # otherwise, it just stringifies `record.msg`. + # + # Since the OTLP body field has a type of 'any' and the logging module + # is sometimes used in such a way that objects incorrectly end up + # set as record.msg, in those cases we would like to bypass + # `record.getMessage()` completely and set the body to the object + # itself instead of its string representation. + # For more background, see: https://github.com/open-telemetry/opentelemetry-python/pull/4216 + if not record.args and not isinstance(record.msg, str): + # no args are provided so it's *mostly* safe to use the message template as the body + body = record.msg + else: + body = record.getMessage() + + # related to https://github.com/open-telemetry/opentelemetry-python/issues/3548 + # Severity Text = WARN as defined in https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#displaying-severity. + level_name = ( + "WARN" if record.levelname == "WARNING" else record.levelname + ) + + logger = get_logger(record.name, logger_provider=self._logger_provider) + return LogRecord( + timestamp=timestamp, + observed_timestamp=observered_timestamp, + trace_id=span_context.trace_id, + span_id=span_context.span_id, + trace_flags=span_context.trace_flags, + severity_text=level_name, + severity_number=severity_number, + body=body, + resource=logger.resource, + attributes=attributes, + ) + + def emit(self, record: logging.LogRecord) -> None: + """ + Emit a record. Skip emitting if logger is NoOp. + + The record is translated to OTel format, and then sent across the pipeline. + """ + logger = get_logger(record.name, logger_provider=self._logger_provider) + if not isinstance(logger, NoOpLogger): + logger.emit(self._translate(record)) + + def flush(self) -> None: + """ + Flushes the logging output. Skip flushing if logging_provider has no force_flush method. + """ + if hasattr(self._logger_provider, "force_flush") and callable( + self._logger_provider.force_flush + ): + self._logger_provider.force_flush() + + +class Logger(APILogger): + def __init__( + self, + resource: Resource, + multi_log_record_processor: Union[ + SynchronousMultiLogRecordProcessor, + ConcurrentMultiLogRecordProcessor, + ], + instrumentation_scope: InstrumentationScope, + ): + super().__init__( + instrumentation_scope.name, + instrumentation_scope.version, + instrumentation_scope.schema_url, + instrumentation_scope.attributes, + ) + self._resource = resource + self._multi_log_record_processor = multi_log_record_processor + self._instrumentation_scope = instrumentation_scope + + @property + def resource(self): + return self._resource + + def emit(self, record: LogRecord): + """Emits the :class:`LogData` by associating :class:`LogRecord` + and instrumentation info. + """ + log_data = LogData(record, self._instrumentation_scope) + self._multi_log_record_processor.emit(log_data) + + +class LoggerProvider(APILoggerProvider): + def __init__( + self, + resource: Resource | None = None, + shutdown_on_exit: bool = True, + multi_log_record_processor: SynchronousMultiLogRecordProcessor + | ConcurrentMultiLogRecordProcessor + | None = None, + ): + if resource is None: + self._resource = Resource.create({}) + else: + self._resource = resource + self._multi_log_record_processor = ( + multi_log_record_processor or SynchronousMultiLogRecordProcessor() + ) + disabled = environ.get(OTEL_SDK_DISABLED, "") + self._disabled = disabled.lower().strip() == "true" + self._at_exit_handler = None + if shutdown_on_exit: + self._at_exit_handler = atexit.register(self.shutdown) + self._logger_cache = {} + self._logger_cache_lock = Lock() + + @property + def resource(self): + return self._resource + + def _get_logger_no_cache( + self, + name: str, + version: str | None = None, + schema_url: str | None = None, + attributes: Attributes | None = None, + ) -> Logger: + return Logger( + self._resource, + self._multi_log_record_processor, + InstrumentationScope( + name, + version, + schema_url, + attributes, + ), + ) + + def _get_logger_cached( + self, + name: str, + version: str | None = None, + schema_url: str | None = None, + ) -> Logger: + with self._logger_cache_lock: + key = (name, version, schema_url) + if key in self._logger_cache: + return self._logger_cache[key] + + self._logger_cache[key] = self._get_logger_no_cache( + name, version, schema_url + ) + return self._logger_cache[key] + + def get_logger( + self, + name: str, + version: str | None = None, + schema_url: str | None = None, + attributes: Attributes | None = None, + ) -> Logger: + if self._disabled: + return NoOpLogger( + name, + version=version, + schema_url=schema_url, + attributes=attributes, + ) + if attributes is None: + return self._get_logger_cached(name, version, schema_url) + return self._get_logger_no_cache(name, version, schema_url, attributes) + + def add_log_record_processor( + self, log_record_processor: LogRecordProcessor + ): + """Registers a new :class:`LogRecordProcessor` for this `LoggerProvider` instance. + + The log processors are invoked in the same order they are registered. + """ + self._multi_log_record_processor.add_log_record_processor( + log_record_processor + ) + + def shutdown(self): + """Shuts down the log processors.""" + self._multi_log_record_processor.shutdown() + if self._at_exit_handler is not None: + atexit.unregister(self._at_exit_handler) + self._at_exit_handler = None + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + return self._multi_log_record_processor.force_flush(timeout_millis) diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py new file mode 100644 index 00000000..434dc745 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -0,0 +1,462 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import abc +import collections +import enum +import logging +import os +import sys +import threading +from os import environ, linesep +from time import time_ns +from typing import IO, Callable, Deque, List, Optional, Sequence + +from opentelemetry.context import ( + _SUPPRESS_INSTRUMENTATION_KEY, + attach, + detach, + set_value, +) +from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor +from opentelemetry.sdk.environment_variables import ( + OTEL_BLRP_EXPORT_TIMEOUT, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + OTEL_BLRP_MAX_QUEUE_SIZE, + OTEL_BLRP_SCHEDULE_DELAY, +) +from opentelemetry.util._once import Once + +_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000 +_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512 +_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000 +_DEFAULT_MAX_QUEUE_SIZE = 2048 +_ENV_VAR_INT_VALUE_ERROR_MESSAGE = ( + "Unable to parse value for %s as integer. Defaulting to %s." +) + +_logger = logging.getLogger(__name__) + + +class LogExportResult(enum.Enum): + SUCCESS = 0 + FAILURE = 1 + + +class LogExporter(abc.ABC): + """Interface for exporting logs. + + Interface to be implemented by services that want to export logs received + in their own format. + + To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a + log processor. + """ + + @abc.abstractmethod + def export(self, batch: Sequence[LogData]): + """Exports a batch of logs. + + Args: + batch: The list of `LogData` objects to be exported + + Returns: + The result of the export + """ + + @abc.abstractmethod + def shutdown(self): + """Shuts down the exporter. + + Called when the SDK is shut down. + """ + + +class ConsoleLogExporter(LogExporter): + """Implementation of :class:`LogExporter` that prints log records to the + console. + + This class can be used for diagnostic purposes. It prints the exported + log records to the console STDOUT. + """ + + def __init__( + self, + out: IO = sys.stdout, + formatter: Callable[[LogRecord], str] = lambda record: record.to_json() + + linesep, + ): + self.out = out + self.formatter = formatter + + def export(self, batch: Sequence[LogData]): + for data in batch: + self.out.write(self.formatter(data.log_record)) + self.out.flush() + return LogExportResult.SUCCESS + + def shutdown(self): + pass + + +class SimpleLogRecordProcessor(LogRecordProcessor): + """This is an implementation of LogRecordProcessor which passes + received logs in the export-friendly LogData representation to the + configured LogExporter, as soon as they are emitted. + """ + + def __init__(self, exporter: LogExporter): + self._exporter = exporter + self._shutdown = False + + def emit(self, log_data: LogData): + if self._shutdown: + _logger.warning("Processor is already shutdown, ignoring call") + return + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + self._exporter.export((log_data,)) + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + detach(token) + + def shutdown(self): + self._shutdown = True + self._exporter.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=no-self-use + return True + + +class _FlushRequest: + __slots__ = ["event", "num_log_records"] + + def __init__(self): + self.event = threading.Event() + self.num_log_records = 0 + + +_BSP_RESET_ONCE = Once() + + +class BatchLogRecordProcessor(LogRecordProcessor): + """This is an implementation of LogRecordProcessor which creates batches of + received logs in the export-friendly LogData representation and + send to the configured LogExporter, as soon as they are emitted. + + `BatchLogRecordProcessor` is configurable with the following environment + variables which correspond to constructor parameters: + + - :envvar:`OTEL_BLRP_SCHEDULE_DELAY` + - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` + - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` + - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` + """ + + _queue: Deque[LogData] + _flush_request: _FlushRequest | None + _log_records: List[LogData | None] + + def __init__( + self, + exporter: LogExporter, + schedule_delay_millis: float | None = None, + max_export_batch_size: int | None = None, + export_timeout_millis: float | None = None, + max_queue_size: int | None = None, + ): + if max_queue_size is None: + max_queue_size = BatchLogRecordProcessor._default_max_queue_size() + + if schedule_delay_millis is None: + schedule_delay_millis = ( + BatchLogRecordProcessor._default_schedule_delay_millis() + ) + + if max_export_batch_size is None: + max_export_batch_size = ( + BatchLogRecordProcessor._default_max_export_batch_size() + ) + + if export_timeout_millis is None: + export_timeout_millis = ( + BatchLogRecordProcessor._default_export_timeout_millis() + ) + + BatchLogRecordProcessor._validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ) + + self._exporter = exporter + self._max_queue_size = max_queue_size + self._schedule_delay_millis = schedule_delay_millis + self._max_export_batch_size = max_export_batch_size + self._export_timeout_millis = export_timeout_millis + self._queue = collections.deque([], max_queue_size) + self._worker_thread = threading.Thread( + name="OtelBatchLogRecordProcessor", + target=self.worker, + daemon=True, + ) + self._condition = threading.Condition(threading.Lock()) + self._shutdown = False + self._flush_request = None + self._log_records = [None] * self._max_export_batch_size + self._worker_thread.start() + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + self._pid = os.getpid() + + def _at_fork_reinit(self): + self._condition = threading.Condition(threading.Lock()) + self._queue.clear() + self._worker_thread = threading.Thread( + name="OtelBatchLogRecordProcessor", + target=self.worker, + daemon=True, + ) + self._worker_thread.start() + self._pid = os.getpid() + + def worker(self): + timeout = self._schedule_delay_millis / 1e3 + flush_request: Optional[_FlushRequest] = None + while not self._shutdown: + with self._condition: + if self._shutdown: + # shutdown may have been called, avoid further processing + break + flush_request = self._get_and_unset_flush_request() + if ( + len(self._queue) < self._max_export_batch_size + and flush_request is None + ): + self._condition.wait(timeout) + + flush_request = self._get_and_unset_flush_request() + if not self._queue: + timeout = self._schedule_delay_millis / 1e3 + self._notify_flush_request_finished(flush_request) + flush_request = None + continue + if self._shutdown: + break + + start_ns = time_ns() + self._export(flush_request) + end_ns = time_ns() + # subtract the duration of this export call to the next timeout + timeout = self._schedule_delay_millis / 1e3 - ( + (end_ns - start_ns) / 1e9 + ) + + self._notify_flush_request_finished(flush_request) + flush_request = None + + # there might have been a new flush request while export was running + # and before the done flag switched to true + with self._condition: + shutdown_flush_request = self._get_and_unset_flush_request() + + # flush the remaining logs + self._drain_queue() + self._notify_flush_request_finished(flush_request) + self._notify_flush_request_finished(shutdown_flush_request) + + def _export(self, flush_request: Optional[_FlushRequest] = None): + """Exports logs considering the given flush_request. + + If flush_request is not None then logs are exported in batches + until the number of exported logs reached or exceeded the num of logs in + flush_request, otherwise exports at max max_export_batch_size logs. + """ + if flush_request is None: + self._export_batch() + return + + num_log_records = flush_request.num_log_records + while self._queue: + exported = self._export_batch() + num_log_records -= exported + + if num_log_records <= 0: + break + + def _export_batch(self) -> int: + """Exports at most max_export_batch_size logs and returns the number of + exported logs. + """ + idx = 0 + while idx < self._max_export_batch_size and self._queue: + record = self._queue.pop() + self._log_records[idx] = record + idx += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + self._exporter.export(self._log_records[:idx]) # type: ignore + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + detach(token) + + for index in range(idx): + self._log_records[index] = None + return idx + + def _drain_queue(self): + """Export all elements until queue is empty. + + Can only be called from the worker thread context because it invokes + `export` that is not thread safe. + """ + while self._queue: + self._export_batch() + + def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]: + flush_request = self._flush_request + self._flush_request = None + if flush_request is not None: + flush_request.num_log_records = len(self._queue) + return flush_request + + @staticmethod + def _notify_flush_request_finished( + flush_request: Optional[_FlushRequest] = None, + ): + if flush_request is not None: + flush_request.event.set() + + def _get_or_create_flush_request(self) -> _FlushRequest: + if self._flush_request is None: + self._flush_request = _FlushRequest() + return self._flush_request + + def emit(self, log_data: LogData) -> None: + """Adds the `LogData` to queue and notifies the waiting threads + when size of queue reaches max_export_batch_size. + """ + if self._shutdown: + return + if self._pid != os.getpid(): + _BSP_RESET_ONCE.do_once(self._at_fork_reinit) + + self._queue.appendleft(log_data) + if len(self._queue) >= self._max_export_batch_size: + with self._condition: + self._condition.notify() + + def shutdown(self): + self._shutdown = True + with self._condition: + self._condition.notify_all() + self._worker_thread.join() + self._exporter.shutdown() + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + if timeout_millis is None: + timeout_millis = self._export_timeout_millis + if self._shutdown: + return True + + with self._condition: + flush_request = self._get_or_create_flush_request() + self._condition.notify_all() + + ret = flush_request.event.wait(timeout_millis / 1e3) + if not ret: + _logger.warning("Timeout was exceeded in force_flush().") + return ret + + @staticmethod + def _default_max_queue_size(): + try: + return int( + environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_MAX_QUEUE_SIZE, + _DEFAULT_MAX_QUEUE_SIZE, + ) + return _DEFAULT_MAX_QUEUE_SIZE + + @staticmethod + def _default_schedule_delay_millis(): + try: + return int( + environ.get( + OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_SCHEDULE_DELAY, + _DEFAULT_SCHEDULE_DELAY_MILLIS, + ) + return _DEFAULT_SCHEDULE_DELAY_MILLIS + + @staticmethod + def _default_max_export_batch_size(): + try: + return int( + environ.get( + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + return _DEFAULT_MAX_EXPORT_BATCH_SIZE + + @staticmethod + def _default_export_timeout_millis(): + try: + return int( + environ.get( + OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_EXPORT_TIMEOUT, + _DEFAULT_EXPORT_TIMEOUT_MILLIS, + ) + return _DEFAULT_EXPORT_TIMEOUT_MILLIS + + @staticmethod + def _validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ): + if max_queue_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + if max_export_batch_size <= 0: + raise ValueError( + "max_export_batch_size must be a positive integer." + ) + + if max_export_batch_size > max_queue_size: + raise ValueError( + "max_export_batch_size must be less than or equal to max_queue_size." + ) diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py new file mode 100644 index 00000000..68cb6b73 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py @@ -0,0 +1,51 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import typing + +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import LogExporter, LogExportResult + + +class InMemoryLogExporter(LogExporter): + """Implementation of :class:`.LogExporter` that stores logs in memory. + + This class can be used for testing purposes. It stores the exported logs + in a list in memory that can be retrieved using the + :func:`.get_finished_logs` method. + """ + + def __init__(self): + self._logs = [] + self._lock = threading.Lock() + self._stopped = False + + def clear(self) -> None: + with self._lock: + self._logs.clear() + + def get_finished_logs(self) -> typing.Tuple[LogData, ...]: + with self._lock: + return tuple(self._logs) + + def export(self, batch: typing.Sequence[LogData]) -> LogExportResult: + if self._stopped: + return LogExportResult.FAILURE + with self._lock: + self._logs.extend(batch) + return LogExportResult.SUCCESS + + def shutdown(self) -> None: + self._stopped = True |