aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/__init__.py36
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py712
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py462
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py51
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/export/__init__.py35
5 files changed, 1296 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/__init__.py
new file mode 100644
index 00000000..0254c135
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/__init__.py
@@ -0,0 +1,36 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from opentelemetry.sdk._logs._internal import (
+ LogData,
+ LogDroppedAttributesWarning,
+ Logger,
+ LoggerProvider,
+ LoggingHandler,
+ LogLimits,
+ LogRecord,
+ LogRecordProcessor,
+)
+
+__all__ = [
+ "LogData",
+ "Logger",
+ "LoggerProvider",
+ "LoggingHandler",
+ "LogLimits",
+ "LogRecord",
+ "LogRecordProcessor",
+ "LogDroppedAttributesWarning",
+]
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py
new file mode 100644
index 00000000..302ca1ed
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/__init__.py
@@ -0,0 +1,712 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import abc
+import atexit
+import concurrent.futures
+import json
+import logging
+import threading
+import traceback
+import warnings
+from os import environ
+from threading import Lock
+from time import time_ns
+from typing import Any, Callable, Tuple, Union # noqa
+
+from opentelemetry._logs import Logger as APILogger
+from opentelemetry._logs import LoggerProvider as APILoggerProvider
+from opentelemetry._logs import LogRecord as APILogRecord
+from opentelemetry._logs import (
+ NoOpLogger,
+ SeverityNumber,
+ get_logger,
+ get_logger_provider,
+ std_to_otel,
+)
+from opentelemetry.attributes import BoundedAttributes
+from opentelemetry.sdk.environment_variables import (
+ OTEL_ATTRIBUTE_COUNT_LIMIT,
+ OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT,
+ OTEL_SDK_DISABLED,
+)
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.util import ns_to_iso_str
+from opentelemetry.sdk.util.instrumentation import InstrumentationScope
+from opentelemetry.semconv.trace import SpanAttributes
+from opentelemetry.trace import (
+ format_span_id,
+ format_trace_id,
+ get_current_span,
+)
+from opentelemetry.trace.span import TraceFlags
+from opentelemetry.util.types import AnyValue, Attributes
+
+_logger = logging.getLogger(__name__)
+
+_DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT = 128
+_ENV_VALUE_UNSET = ""
+
+
+class LogDroppedAttributesWarning(UserWarning):
+ """Custom warning to indicate dropped log attributes due to limits.
+
+ This class is used to filter and handle these specific warnings separately
+ from other warnings, ensuring that they are only shown once without
+ interfering with default user warnings.
+ """
+
+
+warnings.simplefilter("once", LogDroppedAttributesWarning)
+
+
+class LogLimits:
+ """This class is based on a SpanLimits class in the Tracing module.
+
+ This class represents the limits that should be enforced on recorded data such as events, links, attributes etc.
+
+ This class does not enforce any limits itself. It only provides a way to read limits from env,
+ default values and from user provided arguments.
+
+ All limit arguments must be either a non-negative integer, ``None`` or ``LogLimits.UNSET``.
+
+ - All limit arguments are optional.
+ - If a limit argument is not set, the class will try to read its value from the corresponding
+ environment variable.
+ - If the environment variable is not set, the default value, if any, will be used.
+
+ Limit precedence:
+
+ - If a model specific limit is set, it will be used.
+ - Else if the corresponding global limit is set, it will be used.
+ - Else if the model specific limit has a default value, the default value will be used.
+ - Else if the global limit has a default value, the default value will be used.
+
+ Args:
+ max_attributes: Maximum number of attributes that can be added to a span, event, and link.
+ Environment variable: ``OTEL_ATTRIBUTE_COUNT_LIMIT``
+ Default: {_DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT}
+ max_attribute_length: Maximum length an attribute value can have. Values longer than
+ the specified length will be truncated.
+ """
+
+ UNSET = -1
+
+ def __init__(
+ self,
+ max_attributes: int | None = None,
+ max_attribute_length: int | None = None,
+ ):
+ # attribute count
+ global_max_attributes = self._from_env_if_absent(
+ max_attributes, OTEL_ATTRIBUTE_COUNT_LIMIT
+ )
+ self.max_attributes = (
+ global_max_attributes
+ if global_max_attributes is not None
+ else _DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT
+ )
+
+ # attribute length
+ self.max_attribute_length = self._from_env_if_absent(
+ max_attribute_length,
+ OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT,
+ )
+
+ def __repr__(self):
+ return f"{type(self).__name__}(max_attributes={self.max_attributes}, max_attribute_length={self.max_attribute_length})"
+
+ @classmethod
+ def _from_env_if_absent(
+ cls, value: int | None, env_var: str, default: int | None = None
+ ) -> int | None:
+ if value == cls.UNSET:
+ return None
+
+ err_msg = "{} must be a non-negative integer but got {}"
+
+ # if no value is provided for the limit, try to load it from env
+ if value is None:
+ # return default value if env var is not set
+ if env_var not in environ:
+ return default
+
+ str_value = environ.get(env_var, "").strip().lower()
+ if str_value == _ENV_VALUE_UNSET:
+ return None
+
+ try:
+ value = int(str_value)
+ except ValueError:
+ raise ValueError(err_msg.format(env_var, str_value))
+
+ if value < 0:
+ raise ValueError(err_msg.format(env_var, value))
+ return value
+
+
+_UnsetLogLimits = LogLimits(
+ max_attributes=LogLimits.UNSET,
+ max_attribute_length=LogLimits.UNSET,
+)
+
+
+class LogRecord(APILogRecord):
+ """A LogRecord instance represents an event being logged.
+
+ LogRecord instances are created and emitted via `Logger`
+ every time something is logged. They contain all the information
+ pertinent to the event being logged.
+ """
+
+ def __init__(
+ self,
+ timestamp: int | None = None,
+ observed_timestamp: int | None = None,
+ trace_id: int | None = None,
+ span_id: int | None = None,
+ trace_flags: TraceFlags | None = None,
+ severity_text: str | None = None,
+ severity_number: SeverityNumber | None = None,
+ body: AnyValue | None = None,
+ resource: Resource | None = None,
+ attributes: Attributes | None = None,
+ limits: LogLimits | None = _UnsetLogLimits,
+ ):
+ super().__init__(
+ **{
+ "timestamp": timestamp,
+ "observed_timestamp": observed_timestamp,
+ "trace_id": trace_id,
+ "span_id": span_id,
+ "trace_flags": trace_flags,
+ "severity_text": severity_text,
+ "severity_number": severity_number,
+ "body": body,
+ "attributes": BoundedAttributes(
+ maxlen=limits.max_attributes,
+ attributes=attributes if bool(attributes) else None,
+ immutable=False,
+ max_value_len=limits.max_attribute_length,
+ ),
+ }
+ )
+ self.resource = (
+ resource if isinstance(resource, Resource) else Resource.create({})
+ )
+ if self.dropped_attributes > 0:
+ warnings.warn(
+ "Log record attributes were dropped due to limits",
+ LogDroppedAttributesWarning,
+ stacklevel=2,
+ )
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, LogRecord):
+ return NotImplemented
+ return self.__dict__ == other.__dict__
+
+ def to_json(self, indent: int | None = 4) -> str:
+ return json.dumps(
+ {
+ "body": self.body,
+ "severity_number": self.severity_number.value
+ if self.severity_number is not None
+ else None,
+ "severity_text": self.severity_text,
+ "attributes": (
+ dict(self.attributes) if bool(self.attributes) else None
+ ),
+ "dropped_attributes": self.dropped_attributes,
+ "timestamp": ns_to_iso_str(self.timestamp),
+ "observed_timestamp": ns_to_iso_str(self.observed_timestamp),
+ "trace_id": (
+ f"0x{format_trace_id(self.trace_id)}"
+ if self.trace_id is not None
+ else ""
+ ),
+ "span_id": (
+ f"0x{format_span_id(self.span_id)}"
+ if self.span_id is not None
+ else ""
+ ),
+ "trace_flags": self.trace_flags,
+ "resource": json.loads(self.resource.to_json()),
+ },
+ indent=indent,
+ )
+
+ @property
+ def dropped_attributes(self) -> int:
+ if self.attributes:
+ return self.attributes.dropped
+ return 0
+
+
+class LogData:
+ """Readable LogRecord data plus associated InstrumentationLibrary."""
+
+ def __init__(
+ self,
+ log_record: LogRecord,
+ instrumentation_scope: InstrumentationScope,
+ ):
+ self.log_record = log_record
+ self.instrumentation_scope = instrumentation_scope
+
+
+class LogRecordProcessor(abc.ABC):
+ """Interface to hook the log record emitting action.
+
+ Log processors can be registered directly using
+ :func:`LoggerProvider.add_log_record_processor` and they are invoked
+ in the same order as they were registered.
+ """
+
+ @abc.abstractmethod
+ def emit(self, log_data: LogData):
+ """Emits the `LogData`"""
+
+ @abc.abstractmethod
+ def shutdown(self):
+ """Called when a :class:`opentelemetry.sdk._logs.Logger` is shutdown"""
+
+ @abc.abstractmethod
+ def force_flush(self, timeout_millis: int = 30000):
+ """Export all the received logs to the configured Exporter that have not yet
+ been exported.
+
+ Args:
+ timeout_millis: The maximum amount of time to wait for logs to be
+ exported.
+
+ Returns:
+ False if the timeout is exceeded, True otherwise.
+ """
+
+
+# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
+# pylint:disable=no-member
+class SynchronousMultiLogRecordProcessor(LogRecordProcessor):
+ """Implementation of class:`LogRecordProcessor` that forwards all received
+ events to a list of log processors sequentially.
+
+ The underlying log processors are called in sequential order as they were
+ added.
+ """
+
+ def __init__(self):
+ # use a tuple to avoid race conditions when adding a new log and
+ # iterating through it on "emit".
+ self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...]
+ self._lock = threading.Lock()
+
+ def add_log_record_processor(
+ self, log_record_processor: LogRecordProcessor
+ ) -> None:
+ """Adds a Logprocessor to the list of log processors handled by this instance"""
+ with self._lock:
+ self._log_record_processors += (log_record_processor,)
+
+ def emit(self, log_data: LogData) -> None:
+ for lp in self._log_record_processors:
+ lp.emit(log_data)
+
+ def shutdown(self) -> None:
+ """Shutdown the log processors one by one"""
+ for lp in self._log_record_processors:
+ lp.shutdown()
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Force flush the log processors one by one
+
+ Args:
+ timeout_millis: The maximum amount of time to wait for logs to be
+ exported. If the first n log processors exceeded the timeout
+ then remaining log processors will not be flushed.
+
+ Returns:
+ True if all the log processors flushes the logs within timeout,
+ False otherwise.
+ """
+ deadline_ns = time_ns() + timeout_millis * 1000000
+ for lp in self._log_record_processors:
+ current_ts = time_ns()
+ if current_ts >= deadline_ns:
+ return False
+
+ if not lp.force_flush((deadline_ns - current_ts) // 1000000):
+ return False
+
+ return True
+
+
+class ConcurrentMultiLogRecordProcessor(LogRecordProcessor):
+ """Implementation of :class:`LogRecordProcessor` that forwards all received
+ events to a list of log processors in parallel.
+
+ Calls to the underlying log processors are forwarded in parallel by
+ submitting them to a thread pool executor and waiting until each log
+ processor finished its work.
+
+ Args:
+ max_workers: The number of threads managed by the thread pool executor
+ and thus defining how many log processors can work in parallel.
+ """
+
+ def __init__(self, max_workers: int = 2):
+ # use a tuple to avoid race conditions when adding a new log and
+ # iterating through it on "emit".
+ self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...]
+ self._lock = threading.Lock()
+ self._executor = concurrent.futures.ThreadPoolExecutor(
+ max_workers=max_workers
+ )
+
+ def add_log_record_processor(
+ self, log_record_processor: LogRecordProcessor
+ ):
+ with self._lock:
+ self._log_record_processors += (log_record_processor,)
+
+ def _submit_and_wait(
+ self,
+ func: Callable[[LogRecordProcessor], Callable[..., None]],
+ *args: Any,
+ **kwargs: Any,
+ ):
+ futures = []
+ for lp in self._log_record_processors:
+ future = self._executor.submit(func(lp), *args, **kwargs)
+ futures.append(future)
+ for future in futures:
+ future.result()
+
+ def emit(self, log_data: LogData):
+ self._submit_and_wait(lambda lp: lp.emit, log_data)
+
+ def shutdown(self):
+ self._submit_and_wait(lambda lp: lp.shutdown)
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Force flush the log processors in parallel.
+
+ Args:
+ timeout_millis: The maximum amount of time to wait for logs to be
+ exported.
+
+ Returns:
+ True if all the log processors flushes the logs within timeout,
+ False otherwise.
+ """
+ futures = []
+ for lp in self._log_record_processors:
+ future = self._executor.submit(lp.force_flush, timeout_millis)
+ futures.append(future)
+
+ done_futures, not_done_futures = concurrent.futures.wait(
+ futures, timeout_millis / 1e3
+ )
+
+ if not_done_futures:
+ return False
+
+ for future in done_futures:
+ if not future.result():
+ return False
+
+ return True
+
+
+# skip natural LogRecord attributes
+# http://docs.python.org/library/logging.html#logrecord-attributes
+_RESERVED_ATTRS = frozenset(
+ (
+ "asctime",
+ "args",
+ "created",
+ "exc_info",
+ "exc_text",
+ "filename",
+ "funcName",
+ "getMessage",
+ "message",
+ "levelname",
+ "levelno",
+ "lineno",
+ "module",
+ "msecs",
+ "msg",
+ "name",
+ "pathname",
+ "process",
+ "processName",
+ "relativeCreated",
+ "stack_info",
+ "thread",
+ "threadName",
+ "taskName",
+ )
+)
+
+
+class LoggingHandler(logging.Handler):
+ """A handler class which writes logging records, in OTLP format, to
+ a network destination or file. Supports signals from the `logging` module.
+ https://docs.python.org/3/library/logging.html
+ """
+
+ def __init__(
+ self,
+ level=logging.NOTSET,
+ logger_provider=None,
+ ) -> None:
+ super().__init__(level=level)
+ self._logger_provider = logger_provider or get_logger_provider()
+
+ @staticmethod
+ def _get_attributes(record: logging.LogRecord) -> Attributes:
+ attributes = {
+ k: v for k, v in vars(record).items() if k not in _RESERVED_ATTRS
+ }
+
+ # Add standard code attributes for logs.
+ attributes[SpanAttributes.CODE_FILEPATH] = record.pathname
+ attributes[SpanAttributes.CODE_FUNCTION] = record.funcName
+ attributes[SpanAttributes.CODE_LINENO] = record.lineno
+
+ if record.exc_info:
+ exctype, value, tb = record.exc_info
+ if exctype is not None:
+ attributes[SpanAttributes.EXCEPTION_TYPE] = exctype.__name__
+ if value is not None and value.args:
+ attributes[SpanAttributes.EXCEPTION_MESSAGE] = str(
+ value.args[0]
+ )
+ if tb is not None:
+ # https://github.com/open-telemetry/opentelemetry-specification/blob/9fa7c656b26647b27e485a6af7e38dc716eba98a/specification/trace/semantic_conventions/exceptions.md#stacktrace-representation
+ attributes[SpanAttributes.EXCEPTION_STACKTRACE] = "".join(
+ traceback.format_exception(*record.exc_info)
+ )
+ return attributes
+
+ def _translate(self, record: logging.LogRecord) -> LogRecord:
+ timestamp = int(record.created * 1e9)
+ observered_timestamp = time_ns()
+ span_context = get_current_span().get_span_context()
+ attributes = self._get_attributes(record)
+ severity_number = std_to_otel(record.levelno)
+ if self.formatter:
+ body = self.format(record)
+ else:
+ # `record.getMessage()` uses `record.msg` as a template to format
+ # `record.args` into. There is a special case in `record.getMessage()`
+ # where it will only attempt formatting if args are provided,
+ # otherwise, it just stringifies `record.msg`.
+ #
+ # Since the OTLP body field has a type of 'any' and the logging module
+ # is sometimes used in such a way that objects incorrectly end up
+ # set as record.msg, in those cases we would like to bypass
+ # `record.getMessage()` completely and set the body to the object
+ # itself instead of its string representation.
+ # For more background, see: https://github.com/open-telemetry/opentelemetry-python/pull/4216
+ if not record.args and not isinstance(record.msg, str):
+ # no args are provided so it's *mostly* safe to use the message template as the body
+ body = record.msg
+ else:
+ body = record.getMessage()
+
+ # related to https://github.com/open-telemetry/opentelemetry-python/issues/3548
+ # Severity Text = WARN as defined in https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#displaying-severity.
+ level_name = (
+ "WARN" if record.levelname == "WARNING" else record.levelname
+ )
+
+ logger = get_logger(record.name, logger_provider=self._logger_provider)
+ return LogRecord(
+ timestamp=timestamp,
+ observed_timestamp=observered_timestamp,
+ trace_id=span_context.trace_id,
+ span_id=span_context.span_id,
+ trace_flags=span_context.trace_flags,
+ severity_text=level_name,
+ severity_number=severity_number,
+ body=body,
+ resource=logger.resource,
+ attributes=attributes,
+ )
+
+ def emit(self, record: logging.LogRecord) -> None:
+ """
+ Emit a record. Skip emitting if logger is NoOp.
+
+ The record is translated to OTel format, and then sent across the pipeline.
+ """
+ logger = get_logger(record.name, logger_provider=self._logger_provider)
+ if not isinstance(logger, NoOpLogger):
+ logger.emit(self._translate(record))
+
+ def flush(self) -> None:
+ """
+ Flushes the logging output. Skip flushing if logging_provider has no force_flush method.
+ """
+ if hasattr(self._logger_provider, "force_flush") and callable(
+ self._logger_provider.force_flush
+ ):
+ self._logger_provider.force_flush()
+
+
+class Logger(APILogger):
+ def __init__(
+ self,
+ resource: Resource,
+ multi_log_record_processor: Union[
+ SynchronousMultiLogRecordProcessor,
+ ConcurrentMultiLogRecordProcessor,
+ ],
+ instrumentation_scope: InstrumentationScope,
+ ):
+ super().__init__(
+ instrumentation_scope.name,
+ instrumentation_scope.version,
+ instrumentation_scope.schema_url,
+ instrumentation_scope.attributes,
+ )
+ self._resource = resource
+ self._multi_log_record_processor = multi_log_record_processor
+ self._instrumentation_scope = instrumentation_scope
+
+ @property
+ def resource(self):
+ return self._resource
+
+ def emit(self, record: LogRecord):
+ """Emits the :class:`LogData` by associating :class:`LogRecord`
+ and instrumentation info.
+ """
+ log_data = LogData(record, self._instrumentation_scope)
+ self._multi_log_record_processor.emit(log_data)
+
+
+class LoggerProvider(APILoggerProvider):
+ def __init__(
+ self,
+ resource: Resource | None = None,
+ shutdown_on_exit: bool = True,
+ multi_log_record_processor: SynchronousMultiLogRecordProcessor
+ | ConcurrentMultiLogRecordProcessor
+ | None = None,
+ ):
+ if resource is None:
+ self._resource = Resource.create({})
+ else:
+ self._resource = resource
+ self._multi_log_record_processor = (
+ multi_log_record_processor or SynchronousMultiLogRecordProcessor()
+ )
+ disabled = environ.get(OTEL_SDK_DISABLED, "")
+ self._disabled = disabled.lower().strip() == "true"
+ self._at_exit_handler = None
+ if shutdown_on_exit:
+ self._at_exit_handler = atexit.register(self.shutdown)
+ self._logger_cache = {}
+ self._logger_cache_lock = Lock()
+
+ @property
+ def resource(self):
+ return self._resource
+
+ def _get_logger_no_cache(
+ self,
+ name: str,
+ version: str | None = None,
+ schema_url: str | None = None,
+ attributes: Attributes | None = None,
+ ) -> Logger:
+ return Logger(
+ self._resource,
+ self._multi_log_record_processor,
+ InstrumentationScope(
+ name,
+ version,
+ schema_url,
+ attributes,
+ ),
+ )
+
+ def _get_logger_cached(
+ self,
+ name: str,
+ version: str | None = None,
+ schema_url: str | None = None,
+ ) -> Logger:
+ with self._logger_cache_lock:
+ key = (name, version, schema_url)
+ if key in self._logger_cache:
+ return self._logger_cache[key]
+
+ self._logger_cache[key] = self._get_logger_no_cache(
+ name, version, schema_url
+ )
+ return self._logger_cache[key]
+
+ def get_logger(
+ self,
+ name: str,
+ version: str | None = None,
+ schema_url: str | None = None,
+ attributes: Attributes | None = None,
+ ) -> Logger:
+ if self._disabled:
+ return NoOpLogger(
+ name,
+ version=version,
+ schema_url=schema_url,
+ attributes=attributes,
+ )
+ if attributes is None:
+ return self._get_logger_cached(name, version, schema_url)
+ return self._get_logger_no_cache(name, version, schema_url, attributes)
+
+ def add_log_record_processor(
+ self, log_record_processor: LogRecordProcessor
+ ):
+ """Registers a new :class:`LogRecordProcessor` for this `LoggerProvider` instance.
+
+ The log processors are invoked in the same order they are registered.
+ """
+ self._multi_log_record_processor.add_log_record_processor(
+ log_record_processor
+ )
+
+ def shutdown(self):
+ """Shuts down the log processors."""
+ self._multi_log_record_processor.shutdown()
+ if self._at_exit_handler is not None:
+ atexit.unregister(self._at_exit_handler)
+ self._at_exit_handler = None
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Force flush the log processors.
+
+ Args:
+ timeout_millis: The maximum amount of time to wait for logs to be
+ exported.
+
+ Returns:
+ True if all the log processors flushes the logs within timeout,
+ False otherwise.
+ """
+ return self._multi_log_record_processor.force_flush(timeout_millis)
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py
new file mode 100644
index 00000000..434dc745
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/__init__.py
@@ -0,0 +1,462 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import abc
+import collections
+import enum
+import logging
+import os
+import sys
+import threading
+from os import environ, linesep
+from time import time_ns
+from typing import IO, Callable, Deque, List, Optional, Sequence
+
+from opentelemetry.context import (
+ _SUPPRESS_INSTRUMENTATION_KEY,
+ attach,
+ detach,
+ set_value,
+)
+from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
+from opentelemetry.sdk.environment_variables import (
+ OTEL_BLRP_EXPORT_TIMEOUT,
+ OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+ OTEL_BLRP_MAX_QUEUE_SIZE,
+ OTEL_BLRP_SCHEDULE_DELAY,
+)
+from opentelemetry.util._once import Once
+
+_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
+_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
+_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
+_DEFAULT_MAX_QUEUE_SIZE = 2048
+_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
+ "Unable to parse value for %s as integer. Defaulting to %s."
+)
+
+_logger = logging.getLogger(__name__)
+
+
+class LogExportResult(enum.Enum):
+ SUCCESS = 0
+ FAILURE = 1
+
+
+class LogExporter(abc.ABC):
+ """Interface for exporting logs.
+
+ Interface to be implemented by services that want to export logs received
+ in their own format.
+
+ To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
+ log processor.
+ """
+
+ @abc.abstractmethod
+ def export(self, batch: Sequence[LogData]):
+ """Exports a batch of logs.
+
+ Args:
+ batch: The list of `LogData` objects to be exported
+
+ Returns:
+ The result of the export
+ """
+
+ @abc.abstractmethod
+ def shutdown(self):
+ """Shuts down the exporter.
+
+ Called when the SDK is shut down.
+ """
+
+
+class ConsoleLogExporter(LogExporter):
+ """Implementation of :class:`LogExporter` that prints log records to the
+ console.
+
+ This class can be used for diagnostic purposes. It prints the exported
+ log records to the console STDOUT.
+ """
+
+ def __init__(
+ self,
+ out: IO = sys.stdout,
+ formatter: Callable[[LogRecord], str] = lambda record: record.to_json()
+ + linesep,
+ ):
+ self.out = out
+ self.formatter = formatter
+
+ def export(self, batch: Sequence[LogData]):
+ for data in batch:
+ self.out.write(self.formatter(data.log_record))
+ self.out.flush()
+ return LogExportResult.SUCCESS
+
+ def shutdown(self):
+ pass
+
+
+class SimpleLogRecordProcessor(LogRecordProcessor):
+ """This is an implementation of LogRecordProcessor which passes
+ received logs in the export-friendly LogData representation to the
+ configured LogExporter, as soon as they are emitted.
+ """
+
+ def __init__(self, exporter: LogExporter):
+ self._exporter = exporter
+ self._shutdown = False
+
+ def emit(self, log_data: LogData):
+ if self._shutdown:
+ _logger.warning("Processor is already shutdown, ignoring call")
+ return
+ token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+ try:
+ self._exporter.export((log_data,))
+ except Exception: # pylint: disable=broad-exception-caught
+ _logger.exception("Exception while exporting logs.")
+ detach(token)
+
+ def shutdown(self):
+ self._shutdown = True
+ self._exporter.shutdown()
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=no-self-use
+ return True
+
+
+class _FlushRequest:
+ __slots__ = ["event", "num_log_records"]
+
+ def __init__(self):
+ self.event = threading.Event()
+ self.num_log_records = 0
+
+
+_BSP_RESET_ONCE = Once()
+
+
+class BatchLogRecordProcessor(LogRecordProcessor):
+ """This is an implementation of LogRecordProcessor which creates batches of
+ received logs in the export-friendly LogData representation and
+ send to the configured LogExporter, as soon as they are emitted.
+
+ `BatchLogRecordProcessor` is configurable with the following environment
+ variables which correspond to constructor parameters:
+
+ - :envvar:`OTEL_BLRP_SCHEDULE_DELAY`
+ - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
+ - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
+ - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
+ """
+
+ _queue: Deque[LogData]
+ _flush_request: _FlushRequest | None
+ _log_records: List[LogData | None]
+
+ def __init__(
+ self,
+ exporter: LogExporter,
+ schedule_delay_millis: float | None = None,
+ max_export_batch_size: int | None = None,
+ export_timeout_millis: float | None = None,
+ max_queue_size: int | None = None,
+ ):
+ if max_queue_size is None:
+ max_queue_size = BatchLogRecordProcessor._default_max_queue_size()
+
+ if schedule_delay_millis is None:
+ schedule_delay_millis = (
+ BatchLogRecordProcessor._default_schedule_delay_millis()
+ )
+
+ if max_export_batch_size is None:
+ max_export_batch_size = (
+ BatchLogRecordProcessor._default_max_export_batch_size()
+ )
+
+ if export_timeout_millis is None:
+ export_timeout_millis = (
+ BatchLogRecordProcessor._default_export_timeout_millis()
+ )
+
+ BatchLogRecordProcessor._validate_arguments(
+ max_queue_size, schedule_delay_millis, max_export_batch_size
+ )
+
+ self._exporter = exporter
+ self._max_queue_size = max_queue_size
+ self._schedule_delay_millis = schedule_delay_millis
+ self._max_export_batch_size = max_export_batch_size
+ self._export_timeout_millis = export_timeout_millis
+ self._queue = collections.deque([], max_queue_size)
+ self._worker_thread = threading.Thread(
+ name="OtelBatchLogRecordProcessor",
+ target=self.worker,
+ daemon=True,
+ )
+ self._condition = threading.Condition(threading.Lock())
+ self._shutdown = False
+ self._flush_request = None
+ self._log_records = [None] * self._max_export_batch_size
+ self._worker_thread.start()
+ if hasattr(os, "register_at_fork"):
+ os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
+ self._pid = os.getpid()
+
+ def _at_fork_reinit(self):
+ self._condition = threading.Condition(threading.Lock())
+ self._queue.clear()
+ self._worker_thread = threading.Thread(
+ name="OtelBatchLogRecordProcessor",
+ target=self.worker,
+ daemon=True,
+ )
+ self._worker_thread.start()
+ self._pid = os.getpid()
+
+ def worker(self):
+ timeout = self._schedule_delay_millis / 1e3
+ flush_request: Optional[_FlushRequest] = None
+ while not self._shutdown:
+ with self._condition:
+ if self._shutdown:
+ # shutdown may have been called, avoid further processing
+ break
+ flush_request = self._get_and_unset_flush_request()
+ if (
+ len(self._queue) < self._max_export_batch_size
+ and flush_request is None
+ ):
+ self._condition.wait(timeout)
+
+ flush_request = self._get_and_unset_flush_request()
+ if not self._queue:
+ timeout = self._schedule_delay_millis / 1e3
+ self._notify_flush_request_finished(flush_request)
+ flush_request = None
+ continue
+ if self._shutdown:
+ break
+
+ start_ns = time_ns()
+ self._export(flush_request)
+ end_ns = time_ns()
+ # subtract the duration of this export call to the next timeout
+ timeout = self._schedule_delay_millis / 1e3 - (
+ (end_ns - start_ns) / 1e9
+ )
+
+ self._notify_flush_request_finished(flush_request)
+ flush_request = None
+
+ # there might have been a new flush request while export was running
+ # and before the done flag switched to true
+ with self._condition:
+ shutdown_flush_request = self._get_and_unset_flush_request()
+
+ # flush the remaining logs
+ self._drain_queue()
+ self._notify_flush_request_finished(flush_request)
+ self._notify_flush_request_finished(shutdown_flush_request)
+
+ def _export(self, flush_request: Optional[_FlushRequest] = None):
+ """Exports logs considering the given flush_request.
+
+ If flush_request is not None then logs are exported in batches
+ until the number of exported logs reached or exceeded the num of logs in
+ flush_request, otherwise exports at max max_export_batch_size logs.
+ """
+ if flush_request is None:
+ self._export_batch()
+ return
+
+ num_log_records = flush_request.num_log_records
+ while self._queue:
+ exported = self._export_batch()
+ num_log_records -= exported
+
+ if num_log_records <= 0:
+ break
+
+ def _export_batch(self) -> int:
+ """Exports at most max_export_batch_size logs and returns the number of
+ exported logs.
+ """
+ idx = 0
+ while idx < self._max_export_batch_size and self._queue:
+ record = self._queue.pop()
+ self._log_records[idx] = record
+ idx += 1
+ token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+ try:
+ self._exporter.export(self._log_records[:idx]) # type: ignore
+ except Exception: # pylint: disable=broad-exception-caught
+ _logger.exception("Exception while exporting logs.")
+ detach(token)
+
+ for index in range(idx):
+ self._log_records[index] = None
+ return idx
+
+ def _drain_queue(self):
+ """Export all elements until queue is empty.
+
+ Can only be called from the worker thread context because it invokes
+ `export` that is not thread safe.
+ """
+ while self._queue:
+ self._export_batch()
+
+ def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
+ flush_request = self._flush_request
+ self._flush_request = None
+ if flush_request is not None:
+ flush_request.num_log_records = len(self._queue)
+ return flush_request
+
+ @staticmethod
+ def _notify_flush_request_finished(
+ flush_request: Optional[_FlushRequest] = None,
+ ):
+ if flush_request is not None:
+ flush_request.event.set()
+
+ def _get_or_create_flush_request(self) -> _FlushRequest:
+ if self._flush_request is None:
+ self._flush_request = _FlushRequest()
+ return self._flush_request
+
+ def emit(self, log_data: LogData) -> None:
+ """Adds the `LogData` to queue and notifies the waiting threads
+ when size of queue reaches max_export_batch_size.
+ """
+ if self._shutdown:
+ return
+ if self._pid != os.getpid():
+ _BSP_RESET_ONCE.do_once(self._at_fork_reinit)
+
+ self._queue.appendleft(log_data)
+ if len(self._queue) >= self._max_export_batch_size:
+ with self._condition:
+ self._condition.notify()
+
+ def shutdown(self):
+ self._shutdown = True
+ with self._condition:
+ self._condition.notify_all()
+ self._worker_thread.join()
+ self._exporter.shutdown()
+
+ def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
+ if timeout_millis is None:
+ timeout_millis = self._export_timeout_millis
+ if self._shutdown:
+ return True
+
+ with self._condition:
+ flush_request = self._get_or_create_flush_request()
+ self._condition.notify_all()
+
+ ret = flush_request.event.wait(timeout_millis / 1e3)
+ if not ret:
+ _logger.warning("Timeout was exceeded in force_flush().")
+ return ret
+
+ @staticmethod
+ def _default_max_queue_size():
+ try:
+ return int(
+ environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE)
+ )
+ except ValueError:
+ _logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BLRP_MAX_QUEUE_SIZE,
+ _DEFAULT_MAX_QUEUE_SIZE,
+ )
+ return _DEFAULT_MAX_QUEUE_SIZE
+
+ @staticmethod
+ def _default_schedule_delay_millis():
+ try:
+ return int(
+ environ.get(
+ OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS
+ )
+ )
+ except ValueError:
+ _logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BLRP_SCHEDULE_DELAY,
+ _DEFAULT_SCHEDULE_DELAY_MILLIS,
+ )
+ return _DEFAULT_SCHEDULE_DELAY_MILLIS
+
+ @staticmethod
+ def _default_max_export_batch_size():
+ try:
+ return int(
+ environ.get(
+ OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+ _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+ )
+ )
+ except ValueError:
+ _logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
+ _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+ )
+ return _DEFAULT_MAX_EXPORT_BATCH_SIZE
+
+ @staticmethod
+ def _default_export_timeout_millis():
+ try:
+ return int(
+ environ.get(
+ OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS
+ )
+ )
+ except ValueError:
+ _logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BLRP_EXPORT_TIMEOUT,
+ _DEFAULT_EXPORT_TIMEOUT_MILLIS,
+ )
+ return _DEFAULT_EXPORT_TIMEOUT_MILLIS
+
+ @staticmethod
+ def _validate_arguments(
+ max_queue_size, schedule_delay_millis, max_export_batch_size
+ ):
+ if max_queue_size <= 0:
+ raise ValueError("max_queue_size must be a positive integer.")
+
+ if schedule_delay_millis <= 0:
+ raise ValueError("schedule_delay_millis must be positive.")
+
+ if max_export_batch_size <= 0:
+ raise ValueError(
+ "max_export_batch_size must be a positive integer."
+ )
+
+ if max_export_batch_size > max_queue_size:
+ raise ValueError(
+ "max_export_batch_size must be less than or equal to max_queue_size."
+ )
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py
new file mode 100644
index 00000000..68cb6b73
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py
@@ -0,0 +1,51 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import typing
+
+from opentelemetry.sdk._logs import LogData
+from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
+
+
+class InMemoryLogExporter(LogExporter):
+ """Implementation of :class:`.LogExporter` that stores logs in memory.
+
+ This class can be used for testing purposes. It stores the exported logs
+ in a list in memory that can be retrieved using the
+ :func:`.get_finished_logs` method.
+ """
+
+ def __init__(self):
+ self._logs = []
+ self._lock = threading.Lock()
+ self._stopped = False
+
+ def clear(self) -> None:
+ with self._lock:
+ self._logs.clear()
+
+ def get_finished_logs(self) -> typing.Tuple[LogData, ...]:
+ with self._lock:
+ return tuple(self._logs)
+
+ def export(self, batch: typing.Sequence[LogData]) -> LogExportResult:
+ if self._stopped:
+ return LogExportResult.FAILURE
+ with self._lock:
+ self._logs.extend(batch)
+ return LogExportResult.SUCCESS
+
+ def shutdown(self) -> None:
+ self._stopped = True
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/export/__init__.py
new file mode 100644
index 00000000..37a9eca7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/_logs/export/__init__.py
@@ -0,0 +1,35 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from opentelemetry.sdk._logs._internal.export import (
+ BatchLogRecordProcessor,
+ ConsoleLogExporter,
+ LogExporter,
+ LogExportResult,
+ SimpleLogRecordProcessor,
+)
+
+# The point module is not in the export directory to avoid a circular import.
+from opentelemetry.sdk._logs._internal.export.in_memory_log_exporter import (
+ InMemoryLogExporter,
+)
+
+__all__ = [
+ "BatchLogRecordProcessor",
+ "ConsoleLogExporter",
+ "LogExporter",
+ "LogExportResult",
+ "SimpleLogRecordProcessor",
+ "InMemoryLogExporter",
+]