about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/__init__.py36
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py712
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py462
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py51
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/export/__init__.py35
5 files changed, 1296 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/__init__.py
new file mode 100644
index 00000000..0254c135
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/__init__.py
@@ -0,0 +1,36 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from opentelemetry.sdk._logs._internal import (
+    LogData,
+    LogDroppedAttributesWarning,
+    Logger,
+    LoggerProvider,
+    LoggingHandler,
+    LogLimits,
+    LogRecord,
+    LogRecordProcessor,
+)
+
+__all__ = [
+    "LogData",
+    "Logger",
+    "LoggerProvider",
+    "LoggingHandler",
+    "LogLimits",
+    "LogRecord",
+    "LogRecordProcessor",
+    "LogDroppedAttributesWarning",
+]
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py
new file mode 100644
index 00000000..302ca1ed
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py
@@ -0,0 +1,712 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import abc
+import atexit
+import concurrent.futures
+import json
+import logging
+import threading
+import traceback
+import warnings
+from os import environ
+from threading import Lock
+from time import time_ns
+from typing import Any, Callable, Tuple, Union  # noqa
+
+from opentelemetry._logs import Logger as APILogger
+from opentelemetry._logs import LoggerProvider as APILoggerProvider
+from opentelemetry._logs import LogRecord as APILogRecord
+from opentelemetry._logs import (
+    NoOpLogger,
+    SeverityNumber,
+    get_logger,
+    get_logger_provider,
+    std_to_otel,
+)
+from opentelemetry.attributes import BoundedAttributes
+from opentelemetry.sdk.environment_variables import (
+    OTEL_ATTRIBUTE_COUNT_LIMIT,
+    OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT,
+    OTEL_SDK_DISABLED,
+)
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.util import ns_to_iso_str
+from opentelemetry.sdk.util.instrumentation import InstrumentationScope
+from opentelemetry.semconv.trace import SpanAttributes
+from opentelemetry.trace import (
+    format_span_id,
+    format_trace_id,
+    get_current_span,
+)
+from opentelemetry.trace.span import TraceFlags
+from opentelemetry.util.types import AnyValue, Attributes
+
+_logger = logging.getLogger(__name__)
+
+_DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT = 128
+_ENV_VALUE_UNSET = ""
+
+
+class LogDroppedAttributesWarning(UserWarning):
+    """Custom warning to indicate dropped log attributes due to limits.
+
+    This class is used to filter and handle these specific warnings separately
+    from other warnings, ensuring that they are only shown once without
+    interfering with default user warnings.
+    """
+
+
+warnings.simplefilter("once", LogDroppedAttributesWarning)
+
+
+class LogLimits:
+    """This class is based on a SpanLimits class in the Tracing module.
+
+    This class represents the limits that should be enforced on recorded data such as events, links, attributes etc.
+
+    This class does not enforce any limits itself. It only provides a way to read limits from env,
+    default values and from user provided arguments.
+
+    All limit arguments must be either a non-negative integer, ``None`` or ``LogLimits.UNSET``.
+
+    - All limit arguments are optional.
+    - If a limit argument is not set, the class will try to read its value from the corresponding
+      environment variable.
+    - If the environment variable is not set, the default value, if any, will be used.
+
+    Limit precedence:
+
+    - If a model specific limit is set, it will be used.
+    - Else if the corresponding global limit is set, it will be used.
+    - Else if the model specific limit has a default value, the default value will be used.
+    - Else if the global limit has a default value, the default value will be used.
+
+    Args:
+        max_attributes: Maximum number of attributes that can be added to a span, event, and link.
+            Environment variable: ``OTEL_ATTRIBUTE_COUNT_LIMIT``
+            Default: {_DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT}
+        max_attribute_length: Maximum length an attribute value can have. Values longer than
+            the specified length will be truncated.
+    """
+
+    UNSET = -1
+
+    def __init__(
+        self,
+        max_attributes: int | None = None,
+        max_attribute_length: int | None = None,
+    ):
+        # attribute count
+        global_max_attributes = self._from_env_if_absent(
+            max_attributes, OTEL_ATTRIBUTE_COUNT_LIMIT
+        )
+        self.max_attributes = (
+            global_max_attributes
+            if global_max_attributes is not None
+            else _DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT
+        )
+
+        # attribute length
+        self.max_attribute_length = self._from_env_if_absent(
+            max_attribute_length,
+            OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT,
+        )
+
+    def __repr__(self):
+        return f"{type(self).__name__}(max_attributes={self.max_attributes}, max_attribute_length={self.max_attribute_length})"
+
+    @classmethod
+    def _from_env_if_absent(
+        cls, value: int | None, env_var: str, default: int | None = None
+    ) -> int | None:
+        if value == cls.UNSET:
+            return None
+
+        err_msg = "{} must be a non-negative integer but got {}"
+
+        # if no value is provided for the limit, try to load it from env
+        if value is None:
+            # return default value if env var is not set
+            if env_var not in environ:
+                return default
+
+            str_value = environ.get(env_var, "").strip().lower()
+            if str_value == _ENV_VALUE_UNSET:
+                return None
+
+            try:
+                value = int(str_value)
+            except ValueError:
+                raise ValueError(err_msg.format(env_var, str_value))
+
+        if value < 0:
+            raise ValueError(err_msg.format(env_var, value))
+        return value
+
+
+_UnsetLogLimits = LogLimits(
+    max_attributes=LogLimits.UNSET,
+    max_attribute_length=LogLimits.UNSET,
+)
+
+
+class LogRecord(APILogRecord):
+    """A LogRecord instance represents an event being logged.
+
+    LogRecord instances are created and emitted via `Logger`
+    every time something is logged. They contain all the information
+    pertinent to the event being logged.
+    """
+
+    def __init__(
+        self,
+        timestamp: int | None = None,
+        observed_timestamp: int | None = None,
+        trace_id: int | None = None,
+        span_id: int | None = None,
+        trace_flags: TraceFlags | None = None,
+        severity_text: str | None = None,
+        severity_number: SeverityNumber | None = None,
+        body: AnyValue | None = None,
+        resource: Resource | None = None,
+        attributes: Attributes | None = None,
+        limits: LogLimits | None = _UnsetLogLimits,
+    ):
+        super().__init__(
+            **{
+                "timestamp": timestamp,
+                "observed_timestamp": observed_timestamp,
+                "trace_id": trace_id,
+                "span_id": span_id,
+                "trace_flags": trace_flags,
+                "severity_text": severity_text,
+                "severity_number": severity_number,
+                "body": body,
+                "attributes": BoundedAttributes(
+                    maxlen=limits.max_attributes,
+                    attributes=attributes if bool(attributes) else None,
+                    immutable=False,
+                    max_value_len=limits.max_attribute_length,
+                ),
+            }
+        )
+        self.resource = (
+            resource if isinstance(resource, Resource) else Resource.create({})
+        )
+        if self.dropped_attributes > 0:
+            warnings.warn(
+                "Log record attributes were dropped due to limits",
+                LogDroppedAttributesWarning,
+                stacklevel=2,
+            )
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, LogRecord):
+            return NotImplemented
+        return self.__dict__ == other.__dict__
+
+    def to_json(self, indent: int | None = 4) -> str:
+        return json.dumps(
+            {
+                "body": self.body,
+                "severity_number": self.severity_number.value
+                if self.severity_number is not None
+                else None,
+                "severity_text": self.severity_text,
+                "attributes": (
+                    dict(self.attributes) if bool(self.attributes) else None
+                ),
+                "dropped_attributes": self.dropped_attributes,
+                "timestamp": ns_to_iso_str(self.timestamp),
+                "observed_timestamp": ns_to_iso_str(self.observed_timestamp),
+                "trace_id": (
+                    f"0x{format_trace_id(self.trace_id)}"
+                    if self.trace_id is not None
+                    else ""
+                ),
+                "span_id": (
+                    f"0x{format_span_id(self.span_id)}"
+                    if self.span_id is not None
+                    else ""
+                ),
+                "trace_flags": self.trace_flags,
+                "resource": json.loads(self.resource.to_json()),
+            },
+            indent=indent,
+        )
+
+    @property
+    def dropped_attributes(self) -> int:
+        if self.attributes:
+            return self.attributes.dropped
+        return 0
+
+
+class LogData:
+    """Readable LogRecord data plus associated InstrumentationLibrary."""
+
+    def __init__(
+        self,
+        log_record: LogRecord,
+        instrumentation_scope: InstrumentationScope,
+    ):
+        self.log_record = log_record
+        self.instrumentation_scope = instrumentation_scope
+
+
+class LogRecordProcessor(abc.ABC):
+    """Interface to hook the log record emitting action.
+
+    Log processors can be registered directly using
+    :func:`LoggerProvider.add_log_record_processor` and they are invoked
+    in the same order as they were registered.
+    """
+
+    @abc.abstractmethod
+    def emit(self, log_data: LogData):
+        """Emits the `LogData`"""
+
+    @abc.abstractmethod
+    def shutdown(self):
+        """Called when a :class:`opentelemetry.sdk._logs.Logger` is shutdown"""
+
+    @abc.abstractmethod
+    def force_flush(self, timeout_millis: int = 30000):
+        """Export all the received logs to the configured Exporter that have not yet
+        been exported.
+
+        Args:
+            timeout_millis: The maximum amount of time to wait for logs to be
+                exported.
+
+        Returns:
+            False if the timeout is exceeded, True otherwise.
+        """
+
+
+# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
+# pylint:disable=no-member
+class SynchronousMultiLogRecordProcessor(LogRecordProcessor):
+    """Implementation of class:`LogRecordProcessor` that forwards all received
+    events to a list of log processors sequentially.
+
+    The underlying log processors are called in sequential order as they were
+    added.
+    """
+
+    def __init__(self):
+        # use a tuple to avoid race conditions when adding a new log and
+        # iterating through it on "emit".
+        self._log_record_processors = ()  # type: Tuple[LogRecordProcessor, ...]
+        self._lock = threading.Lock()
+
+    def add_log_record_processor(
+        self, log_record_processor: LogRecordProcessor
+    ) -> None:
+        """Adds a Logprocessor to the list of log processors handled by this instance"""
+        with self._lock:
+            self._log_record_processors += (log_record_processor,)
+
+    def emit(self, log_data: LogData) -> None:
+        for lp in self._log_record_processors:
+            lp.emit(log_data)
+
+    def shutdown(self) -> None:
+        """Shutdown the log processors one by one"""
+        for lp in self._log_record_processors:
+            lp.shutdown()
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:
+        """Force flush the log processors one by one
+
+        Args:
+            timeout_millis: The maximum amount of time to wait for logs to be
+                exported. If the first n log processors exceeded the timeout
+                then remaining log processors will not be flushed.
+
+        Returns:
+            True if all the log processors flushes the logs within timeout,
+            False otherwise.
+        """
+        deadline_ns = time_ns() + timeout_millis * 1000000
+        for lp in self._log_record_processors:
+            current_ts = time_ns()
+            if current_ts >= deadline_ns:
+                return False
+
+            if not lp.force_flush((deadline_ns - current_ts) // 1000000):
+                return False
+
+        return True
+
+
+class ConcurrentMultiLogRecordProcessor(LogRecordProcessor):
+    """Implementation of :class:`LogRecordProcessor` that forwards all received
+    events to a list of log processors in parallel.
+
+    Calls to the underlying log processors are forwarded in parallel by
+    submitting them to a thread pool executor and waiting until each log
+    processor finished its work.
+
+    Args:
+        max_workers: The number of threads managed by the thread pool executor
+            and thus defining how many log processors can work in parallel.
+    """
+
+    def __init__(self, max_workers: int = 2):
+        # use a tuple to avoid race conditions when adding a new log and
+        # iterating through it on "emit".
+        self._log_record_processors = ()  # type: Tuple[LogRecordProcessor, ...]
+        self._lock = threading.Lock()
+        self._executor = concurrent.futures.ThreadPoolExecutor(
+            max_workers=max_workers
+        )
+
+    def add_log_record_processor(
+        self, log_record_processor: LogRecordProcessor
+    ):
+        with self._lock:
+            self._log_record_processors += (log_record_processor,)
+
+    def _submit_and_wait(
+        self,
+        func: Callable[[LogRecordProcessor], Callable[..., None]],
+        *args: Any,
+        **kwargs: Any,
+    ):
+        futures = []
+        for lp in self._log_record_processors:
+            future = self._executor.submit(func(lp), *args, **kwargs)
+            futures.append(future)
+        for future in futures:
+            future.result()
+
+    def emit(self, log_data: LogData):
+        self._submit_and_wait(lambda lp: lp.emit, log_data)
+
+    def shutdown(self):
+        self._submit_and_wait(lambda lp: lp.shutdown)
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:
+        """Force flush the log processors in parallel.
+
+        Args:
+            timeout_millis: The maximum amount of time to wait for logs to be
+                exported.
+
+        Returns:
+            True if all the log processors flushes the logs within timeout,
+            False otherwise.
+        """
+        futures = []
+        for lp in self._log_record_processors:
+            future = self._executor.submit(lp.force_flush, timeout_millis)
+            futures.append(future)
+
+        done_futures, not_done_futures = concurrent.futures.wait(
+            futures, timeout_millis / 1e3
+        )
+
+        if not_done_futures:
+            return False
+
+        for future in done_futures:
+            if not future.result():
+                return False
+
+        return True
+
+
+# skip natural LogRecord attributes
+# http://docs.python.org/library/logging.html#logrecord-attributes
+_RESERVED_ATTRS = frozenset(
+    (
+        "asctime",
+        "args",
+        "created",
+        "exc_info",
+        "exc_text",
+        "filename",
+        "funcName",
+        "getMessage",
+        "message",
+        "levelname",
+        "levelno",
+        "lineno",
+        "module",
+        "msecs",
+        "msg",
+        "name",
+        "pathname",
+        "process",
+        "processName",
+        "relativeCreated",
+        "stack_info",
+        "thread",
+        "threadName",
+        "taskName",
+    )
+)
+
+
+class LoggingHandler(logging.Handler):
+    """A handler class which writes logging records, in OTLP format, to
+    a network destination or file. Supports signals from the `logging` module.
+    https://docs.python.org/3/library/logging.html
+    """
+
+    def __init__(
+        self,
+        level=logging.NOTSET,
+        logger_provider=None,
+    ) -> None:
+        super().__init__(level=level)
+        self._logger_provider = logger_provider or get_logger_provider()
+
+    @staticmethod
+    def _get_attributes(record: logging.LogRecord) -> Attributes:
+        attributes = {
+            k: v for k, v in vars(record).items() if k not in _RESERVED_ATTRS
+        }
+
+        # Add standard code attributes for logs.
+        attributes[SpanAttributes.CODE_FILEPATH] = record.pathname
+        attributes[SpanAttributes.CODE_FUNCTION] = record.funcName
+        attributes[SpanAttributes.CODE_LINENO] = record.lineno
+
+        if record.exc_info:
+            exctype, value, tb = record.exc_info
+            if exctype is not None:
+                attributes[SpanAttributes.EXCEPTION_TYPE] = exctype.__name__
+            if value is not None and value.args:
+                attributes[SpanAttributes.EXCEPTION_MESSAGE] = str(
+                    value.args[0]
+                )
+            if tb is not None:
+                # https://github.com/open-telemetry/opentelemetry-specification/blob/9fa7c656b26647b27e485a6af7e38dc716eba98a/specification/trace/semantic_conventions/exceptions.md#stacktrace-representation
+                attributes[SpanAttributes.EXCEPTION_STACKTRACE] = "".join(
+                    traceback.format_exception(*record.exc_info)
+                )
+        return attributes
+
+    def _translate(self, record: logging.LogRecord) -> LogRecord:
+        timestamp = int(record.created * 1e9)
+        observered_timestamp = time_ns()
+        span_context = get_current_span().get_span_context()
+        attributes = self._get_attributes(record)
+        severity_number = std_to_otel(record.levelno)
+        if self.formatter:
+            body = self.format(record)
+        else:
+            # `record.getMessage()` uses `record.msg` as a template to format
+            # `record.args` into. There is a special case in `record.getMessage()`
+            # where it will only attempt formatting if args are provided,
+            # otherwise, it just stringifies `record.msg`.
+            #
+            # Since the OTLP body field has a type of 'any' and the logging module
+            # is sometimes used in such a way that objects incorrectly end up
+            # set as record.msg, in those cases we would like to bypass
+            # `record.getMessage()` completely and set the body to the object
+            # itself instead of its string representation.
+            # For more background, see: https://github.com/open-telemetry/opentelemetry-python/pull/4216
+            if not record.args and not isinstance(record.msg, str):
+                # no args are provided so it's *mostly* safe to use the message template as the body
+                body = record.msg
+            else:
+                body = record.getMessage()
+
+        # related to https://github.com/open-telemetry/opentelemetry-python/issues/3548
+        # Severity Text = WARN as defined in https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#displaying-severity.
+        level_name = (
+            "WARN" if record.levelname == "WARNING" else record.levelname
+        )
+
+        logger = get_logger(record.name, logger_provider=self._logger_provider)
+        return LogRecord(
+            timestamp=timestamp,
+            observed_timestamp=observered_timestamp,
+            trace_id=span_context.trace_id,
+            span_id=span_context.span_id,
+            trace_flags=span_context.trace_flags,
+            severity_text=level_name,
+            severity_number=severity_number,
+            body=body,
+            resource=logger.resource,
+            attributes=attributes,
+        )
+
+    def emit(self, record: logging.LogRecord) -> None:
+        """
+        Emit a record. Skip emitting if logger is NoOp.
+
+        The record is translated to OTel format, and then sent across the pipeline.
+        """
+        logger = get_logger(record.name, logger_provider=self._logger_provider)
+        if not isinstance(logger, NoOpLogger):
+            logger.emit(self._translate(record))
+
+    def flush(self) -> None:
+        """
+        Flushes the logging output. Skip flushing if logging_provider has no force_flush method.
+        """
+        if hasattr(self._logger_provider, "force_flush") and callable(
+            self._logger_provider.force_flush
+        ):
+            self._logger_provider.force_flush()
+
+
+class Logger(APILogger):
+    def __init__(
+        self,
+        resource: Resource,
+        multi_log_record_processor: Union[
+            SynchronousMultiLogRecordProcessor,
+            ConcurrentMultiLogRecordProcessor,
+        ],
+        instrumentation_scope: InstrumentationScope,
+    ):
+        super().__init__(
+            instrumentation_scope.name,
+            instrumentation_scope.version,
+            instrumentation_scope.schema_url,
+            instrumentation_scope.attributes,
+        )
+        self._resource = resource
+        self._multi_log_record_processor = multi_log_record_processor
+        self._instrumentation_scope = instrumentation_scope
+
+    @property
+    def resource(self):
+        return self._resource
+
+    def emit(self, record: LogRecord):
+        """Emits the :class:`LogData` by associating :class:`LogRecord`
+        and instrumentation info.
+        """
+        log_data = LogData(record, self._instrumentation_scope)
+        self._multi_log_record_processor.emit(log_data)
+
+
+class LoggerProvider(APILoggerProvider):
+    def __init__(
+        self,
+        resource: Resource | None = None,
+        shutdown_on_exit: bool = True,
+        multi_log_record_processor: SynchronousMultiLogRecordProcessor
+        | ConcurrentMultiLogRecordProcessor
+        | None = None,
+    ):
+        if resource is None:
+            self._resource = Resource.create({})
+        else:
+            self._resource = resource
+        self._multi_log_record_processor = (
+            multi_log_record_processor or SynchronousMultiLogRecordProcessor()
+        )
+        disabled = environ.get(OTEL_SDK_DISABLED, "")
+        self._disabled = disabled.lower().strip() == "true"
+        self._at_exit_handler = None
+        if shutdown_on_exit:
+            self._at_exit_handler = atexit.register(self.shutdown)
+        self._logger_cache = {}
+        self._logger_cache_lock = Lock()
+
+    @property
+    def resource(self):
+        return self._resource
+
+    def _get_logger_no_cache(
+        self,
+        name: str,
+        version: str | None = None,
+        schema_url: str | None = None,
+        attributes: Attributes | None = None,
+    ) -> Logger:
+        return Logger(
+            self._resource,
+            self._multi_log_record_processor,
+            InstrumentationScope(
+                name,
+                version,
+                schema_url,
+                attributes,
+            ),
+        )
+
+    def _get_logger_cached(
+        self,
+        name: str,
+        version: str | None = None,
+        schema_url: str | None = None,
+    ) -> Logger:
+        with self._logger_cache_lock:
+            key = (name, version, schema_url)
+            if key in self._logger_cache:
+                return self._logger_cache[key]
+
+            self._logger_cache[key] = self._get_logger_no_cache(
+                name, version, schema_url
+            )
+            return self._logger_cache[key]
+
+    def get_logger(
+        self,
+        name: str,
+        version: str | None = None,
+        schema_url: str | None = None,
+        attributes: Attributes | None = None,
+    ) -> Logger:
+        if self._disabled:
+            return NoOpLogger(
+                name,
+                version=version,
+                schema_url=schema_url,
+                attributes=attributes,
+            )
+        if attributes is None:
+            return self._get_logger_cached(name, version, schema_url)
+        return self._get_logger_no_cache(name, version, schema_url, attributes)
+
+    def add_log_record_processor(
+        self, log_record_processor: LogRecordProcessor
+    ):
+        """Registers a new :class:`LogRecordProcessor` for this `LoggerProvider` instance.
+
+        The log processors are invoked in the same order they are registered.
+        """
+        self._multi_log_record_processor.add_log_record_processor(
+            log_record_processor
+        )
+
+    def shutdown(self):
+        """Shuts down the log processors."""
+        self._multi_log_record_processor.shutdown()
+        if self._at_exit_handler is not None:
+            atexit.unregister(self._at_exit_handler)
+            self._at_exit_handler = None
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:
+        """Force flush the log processors.
+
+        Args:
+            timeout_millis: The maximum amount of time to wait for logs to be
+                exported.
+
+        Returns:
+            True if all the log processors flushes the logs within timeout,
+            False otherwise.
+        """
+        return self._multi_log_record_processor.force_flush(timeout_millis)
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py
new file mode 100644
index 00000000..434dc745
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py
@@ -0,0 +1,462 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import abc
+import collections
+import enum
+import logging
+import os
+import sys
+import threading
+from os import environ, linesep
+from time import time_ns
+from typing import IO, Callable, Deque, List, Optional, Sequence
+
+from opentelemetry.context import (
+    _SUPPRESS_INSTRUMENTATION_KEY,
+    attach,
+    detach,
+    set_value,
+)
+from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
+from opentelemetry.sdk.environment_variables import (
+    OTEL_BLRP_EXPORT_TIMEOUT,
+    OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+    OTEL_BLRP_MAX_QUEUE_SIZE,
+    OTEL_BLRP_SCHEDULE_DELAY,
+)
+from opentelemetry.util._once import Once
+
+_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
+_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
+_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
+_DEFAULT_MAX_QUEUE_SIZE = 2048
+_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
+    "Unable to parse value for %s as integer. Defaulting to %s."
+)
+
+_logger = logging.getLogger(__name__)
+
+
+class LogExportResult(enum.Enum):
+    SUCCESS = 0
+    FAILURE = 1
+
+
+class LogExporter(abc.ABC):
+    """Interface for exporting logs.
+
+    Interface to be implemented by services that want to export logs received
+    in their own format.
+
+    To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
+    log processor.
+    """
+
+    @abc.abstractmethod
+    def export(self, batch: Sequence[LogData]):
+        """Exports a batch of logs.
+
+        Args:
+            batch: The list of `LogData` objects to be exported
+
+        Returns:
+            The result of the export
+        """
+
+    @abc.abstractmethod
+    def shutdown(self):
+        """Shuts down the exporter.
+
+        Called when the SDK is shut down.
+        """
+
+
+class ConsoleLogExporter(LogExporter):
+    """Implementation of :class:`LogExporter` that prints log records to the
+    console.
+
+    This class can be used for diagnostic purposes. It prints the exported
+    log records to the console STDOUT.
+    """
+
+    def __init__(
+        self,
+        out: IO = sys.stdout,
+        formatter: Callable[[LogRecord], str] = lambda record: record.to_json()
+        + linesep,
+    ):
+        self.out = out
+        self.formatter = formatter
+
+    def export(self, batch: Sequence[LogData]):
+        for data in batch:
+            self.out.write(self.formatter(data.log_record))
+        self.out.flush()
+        return LogExportResult.SUCCESS
+
+    def shutdown(self):
+        pass
+
+
+class SimpleLogRecordProcessor(LogRecordProcessor):
+    """This is an implementation of LogRecordProcessor which passes
+    received logs in the export-friendly LogData representation to the
+    configured LogExporter, as soon as they are emitted.
+    """
+
+    def __init__(self, exporter: LogExporter):
+        self._exporter = exporter
+        self._shutdown = False
+
+    def emit(self, log_data: LogData):
+        if self._shutdown:
+            _logger.warning("Processor is already shutdown, ignoring call")
+            return
+        token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+        try:
+            self._exporter.export((log_data,))
+        except Exception:  # pylint: disable=broad-exception-caught
+            _logger.exception("Exception while exporting logs.")
+        detach(token)
+
+    def shutdown(self):
+        self._shutdown = True
+        self._exporter.shutdown()
+
+    def force_flush(self, timeout_millis: int = 30000) -> bool:  # pylint: disable=no-self-use
+        return True
+
+
+class _FlushRequest:
+    __slots__ = ["event", "num_log_records"]
+
+    def __init__(self):
+        self.event = threading.Event()
+        self.num_log_records = 0
+
+
+_BSP_RESET_ONCE = Once()
+
+
+class BatchLogRecordProcessor(LogRecordProcessor):
+    """This is an implementation of LogRecordProcessor which creates batches of
+    received logs in the export-friendly LogData representation and
+    send to the configured LogExporter, as soon as they are emitted.
+
+    `BatchLogRecordProcessor` is configurable with the following environment
+    variables which correspond to constructor parameters:
+
+    - :envvar:`OTEL_BLRP_SCHEDULE_DELAY`
+    - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
+    - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
+    - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
+    """
+
+    _queue: Deque[LogData]
+    _flush_request: _FlushRequest | None
+    _log_records: List[LogData | None]
+
+    def __init__(
+        self,
+        exporter: LogExporter,
+        schedule_delay_millis: float | None = None,
+        max_export_batch_size: int | None = None,
+        export_timeout_millis: float | None = None,
+        max_queue_size: int | None = None,
+    ):
+        if max_queue_size is None:
+            max_queue_size = BatchLogRecordProcessor._default_max_queue_size()
+
+        if schedule_delay_millis is None:
+            schedule_delay_millis = (
+                BatchLogRecordProcessor._default_schedule_delay_millis()
+            )
+
+        if max_export_batch_size is None:
+            max_export_batch_size = (
+                BatchLogRecordProcessor._default_max_export_batch_size()
+            )
+
+        if export_timeout_millis is None:
+            export_timeout_millis = (
+                BatchLogRecordProcessor._default_export_timeout_millis()
+            )
+
+        BatchLogRecordProcessor._validate_arguments(
+            max_queue_size, schedule_delay_millis, max_export_batch_size
+        )
+
+        self._exporter = exporter
+        self._max_queue_size = max_queue_size
+        self._schedule_delay_millis = schedule_delay_millis
+        self._max_export_batch_size = max_export_batch_size
+        self._export_timeout_millis = export_timeout_millis
+        self._queue = collections.deque([], max_queue_size)
+        self._worker_thread = threading.Thread(
+            name="OtelBatchLogRecordProcessor",
+            target=self.worker,
+            daemon=True,
+        )
+        self._condition = threading.Condition(threading.Lock())
+        self._shutdown = False
+        self._flush_request = None
+        self._log_records = [None] * self._max_export_batch_size
+        self._worker_thread.start()
+        if hasattr(os, "register_at_fork"):
+            os.register_at_fork(after_in_child=self._at_fork_reinit)  # pylint: disable=protected-access
+        self._pid = os.getpid()
+
+    def _at_fork_reinit(self):
+        self._condition = threading.Condition(threading.Lock())
+        self._queue.clear()
+        self._worker_thread = threading.Thread(
+            name="OtelBatchLogRecordProcessor",
+            target=self.worker,
+            daemon=True,
+        )
+        self._worker_thread.start()
+        self._pid = os.getpid()
+
+    def worker(self):
+        timeout = self._schedule_delay_millis / 1e3
+        flush_request: Optional[_FlushRequest] = None
+        while not self._shutdown:
+            with self._condition:
+                if self._shutdown:
+                    # shutdown may have been called, avoid further processing
+                    break
+                flush_request = self._get_and_unset_flush_request()
+                if (
+                    len(self._queue) < self._max_export_batch_size
+                    and flush_request is None
+                ):
+                    self._condition.wait(timeout)
+
+                    flush_request = self._get_and_unset_flush_request()
+                    if not self._queue:
+                        timeout = self._schedule_delay_millis / 1e3
+                        self._notify_flush_request_finished(flush_request)
+                        flush_request = None
+                        continue
+                    if self._shutdown:
+                        break
+
+            start_ns = time_ns()
+            self._export(flush_request)
+            end_ns = time_ns()
+            # subtract the duration of this export call to the next timeout
+            timeout = self._schedule_delay_millis / 1e3 - (
+                (end_ns - start_ns) / 1e9
+            )
+
+            self._notify_flush_request_finished(flush_request)
+            flush_request = None
+
+        # there might have been a new flush request while export was running
+        # and before the done flag switched to true
+        with self._condition:
+            shutdown_flush_request = self._get_and_unset_flush_request()
+
+        # flush the remaining logs
+        self._drain_queue()
+        self._notify_flush_request_finished(flush_request)
+        self._notify_flush_request_finished(shutdown_flush_request)
+
+    def _export(self, flush_request: Optional[_FlushRequest] = None):
+        """Exports logs considering the given flush_request.
+
+        If flush_request is not None then logs are exported in batches
+        until the number of exported logs reached or exceeded the num of logs in
+        flush_request, otherwise exports at max max_export_batch_size logs.
+        """
+        if flush_request is None:
+            self._export_batch()
+            return
+
+        num_log_records = flush_request.num_log_records
+        while self._queue:
+            exported = self._export_batch()
+            num_log_records -= exported
+
+            if num_log_records <= 0:
+                break
+
+    def _export_batch(self) -> int:
+        """Exports at most max_export_batch_size logs and returns the number of
+        exported logs.
+        """
+        idx = 0
+        while idx < self._max_export_batch_size and self._queue:
+            record = self._queue.pop()
+            self._log_records[idx] = record
+            idx += 1
+        token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+        try:
+            self._exporter.export(self._log_records[:idx])  # type: ignore
+        except Exception:  # pylint: disable=broad-exception-caught
+            _logger.exception("Exception while exporting logs.")
+        detach(token)
+
+        for index in range(idx):
+            self._log_records[index] = None
+        return idx
+
+    def _drain_queue(self):
+        """Export all elements until queue is empty.
+
+        Can only be called from the worker thread context because it invokes
+        `export` that is not thread safe.
+        """
+        while self._queue:
+            self._export_batch()
+
+    def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
+        flush_request = self._flush_request
+        self._flush_request = None
+        if flush_request is not None:
+            flush_request.num_log_records = len(self._queue)
+        return flush_request
+
+    @staticmethod
+    def _notify_flush_request_finished(
+        flush_request: Optional[_FlushRequest] = None,
+    ):
+        if flush_request is not None:
+            flush_request.event.set()
+
+    def _get_or_create_flush_request(self) -> _FlushRequest:
+        if self._flush_request is None:
+            self._flush_request = _FlushRequest()
+        return self._flush_request
+
+    def emit(self, log_data: LogData) -> None:
+        """Adds the `LogData` to queue and notifies the waiting threads
+        when size of queue reaches max_export_batch_size.
+        """
+        if self._shutdown:
+            return
+        if self._pid != os.getpid():
+            _BSP_RESET_ONCE.do_once(self._at_fork_reinit)
+
+        self._queue.appendleft(log_data)
+        if len(self._queue) >= self._max_export_batch_size:
+            with self._condition:
+                self._condition.notify()
+
+    def shutdown(self):
+        self._shutdown = True
+        with self._condition:
+            self._condition.notify_all()
+        self._worker_thread.join()
+        self._exporter.shutdown()
+
+    def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
+        if timeout_millis is None:
+            timeout_millis = self._export_timeout_millis
+        if self._shutdown:
+            return True
+
+        with self._condition:
+            flush_request = self._get_or_create_flush_request()
+            self._condition.notify_all()
+
+        ret = flush_request.event.wait(timeout_millis / 1e3)
+        if not ret:
+            _logger.warning("Timeout was exceeded in force_flush().")
+        return ret
+
+    @staticmethod
+    def _default_max_queue_size():
+        try:
+            return int(
+                environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE)
+            )
+        except ValueError:
+            _logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BLRP_MAX_QUEUE_SIZE,
+                _DEFAULT_MAX_QUEUE_SIZE,
+            )
+            return _DEFAULT_MAX_QUEUE_SIZE
+
+    @staticmethod
+    def _default_schedule_delay_millis():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS
+                )
+            )
+        except ValueError:
+            _logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BLRP_SCHEDULE_DELAY,
+                _DEFAULT_SCHEDULE_DELAY_MILLIS,
+            )
+            return _DEFAULT_SCHEDULE_DELAY_MILLIS
+
+    @staticmethod
+    def _default_max_export_batch_size():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+                    _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+                )
+            )
+        except ValueError:
+            _logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+                _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+            )
+            return _DEFAULT_MAX_EXPORT_BATCH_SIZE
+
+    @staticmethod
+    def _default_export_timeout_millis():
+        try:
+            return int(
+                environ.get(
+                    OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS
+                )
+            )
+        except ValueError:
+            _logger.exception(
+                _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+                OTEL_BLRP_EXPORT_TIMEOUT,
+                _DEFAULT_EXPORT_TIMEOUT_MILLIS,
+            )
+            return _DEFAULT_EXPORT_TIMEOUT_MILLIS
+
+    @staticmethod
+    def _validate_arguments(
+        max_queue_size, schedule_delay_millis, max_export_batch_size
+    ):
+        if max_queue_size <= 0:
+            raise ValueError("max_queue_size must be a positive integer.")
+
+        if schedule_delay_millis <= 0:
+            raise ValueError("schedule_delay_millis must be positive.")
+
+        if max_export_batch_size <= 0:
+            raise ValueError(
+                "max_export_batch_size must be a positive integer."
+            )
+
+        if max_export_batch_size > max_queue_size:
+            raise ValueError(
+                "max_export_batch_size must be less than or equal to max_queue_size."
+            )
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py
new file mode 100644
index 00000000..68cb6b73
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py
@@ -0,0 +1,51 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import typing
+
+from opentelemetry.sdk._logs import LogData
+from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
+
+
+class InMemoryLogExporter(LogExporter):
+    """Implementation of :class:`.LogExporter` that stores logs in memory.
+
+    This class can be used for testing purposes. It stores the exported logs
+    in a list in memory that can be retrieved using the
+    :func:`.get_finished_logs` method.
+    """
+
+    def __init__(self):
+        self._logs = []
+        self._lock = threading.Lock()
+        self._stopped = False
+
+    def clear(self) -> None:
+        with self._lock:
+            self._logs.clear()
+
+    def get_finished_logs(self) -> typing.Tuple[LogData, ...]:
+        with self._lock:
+            return tuple(self._logs)
+
+    def export(self, batch: typing.Sequence[LogData]) -> LogExportResult:
+        if self._stopped:
+            return LogExportResult.FAILURE
+        with self._lock:
+            self._logs.extend(batch)
+        return LogExportResult.SUCCESS
+
+    def shutdown(self) -> None:
+        self._stopped = True
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/export/__init__.py
new file mode 100644
index 00000000..37a9eca7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/export/__init__.py
@@ -0,0 +1,35 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from opentelemetry.sdk._logs._internal.export import (
+    BatchLogRecordProcessor,
+    ConsoleLogExporter,
+    LogExporter,
+    LogExportResult,
+    SimpleLogRecordProcessor,
+)
+
+# The point module is not in the export directory to avoid a circular import.
+from opentelemetry.sdk._logs._internal.export.in_memory_log_exporter import (
+    InMemoryLogExporter,
+)
+
+__all__ = [
+    "BatchLogRecordProcessor",
+    "ConsoleLogExporter",
+    "LogExporter",
+    "LogExportResult",
+    "SimpleLogRecordProcessor",
+    "InMemoryLogExporter",
+]