diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace')
5 files changed, 2396 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/__init__.py new file mode 100644 index 00000000..3ac45806 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/__init__.py @@ -0,0 +1,1305 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=too-many-lines +import abc +import atexit +import concurrent.futures +import json +import logging +import threading +import traceback +import typing +from os import environ +from time import time_ns +from types import MappingProxyType, TracebackType +from typing import ( + Any, + Callable, + Dict, + Iterator, + List, + Mapping, + MutableMapping, + Optional, + Sequence, + Tuple, + Type, + Union, +) +from warnings import filterwarnings + +from deprecated import deprecated + +from opentelemetry import context as context_api +from opentelemetry import trace as trace_api +from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk import util +from opentelemetry.sdk.environment_variables import ( + OTEL_ATTRIBUTE_COUNT_LIMIT, + OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, + OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT, + OTEL_LINK_ATTRIBUTE_COUNT_LIMIT, + OTEL_SDK_DISABLED, + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, + OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT, + OTEL_SPAN_EVENT_COUNT_LIMIT, + OTEL_SPAN_LINK_COUNT_LIMIT, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import sampling +from opentelemetry.sdk.trace.id_generator import IdGenerator, RandomIdGenerator +from opentelemetry.sdk.util import BoundedList +from opentelemetry.sdk.util.instrumentation import ( + InstrumentationInfo, + InstrumentationScope, +) +from opentelemetry.semconv.attributes.exception_attributes import ( + EXCEPTION_ESCAPED, + EXCEPTION_MESSAGE, + EXCEPTION_STACKTRACE, + EXCEPTION_TYPE, +) +from opentelemetry.trace import NoOpTracer, SpanContext +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util import types +from opentelemetry.util._decorator import _agnosticcontextmanager + +logger = logging.getLogger(__name__) + +_DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT = 128 +_DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT = 128 +_DEFAULT_OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT = 128 +_DEFAULT_OTEL_LINK_ATTRIBUTE_COUNT_LIMIT = 128 +_DEFAULT_OTEL_SPAN_EVENT_COUNT_LIMIT = 128 +_DEFAULT_OTEL_SPAN_LINK_COUNT_LIMIT = 128 + + +_ENV_VALUE_UNSET = "" + + +class SpanProcessor: + """Interface which allows hooks for SDK's `Span` start and end method + invocations. + + Span processors can be registered directly using + :func:`TracerProvider.add_span_processor` and they are invoked + in the same order as they were registered. + """ + + def on_start( + self, + span: "Span", + parent_context: Optional[context_api.Context] = None, + ) -> None: + """Called when a :class:`opentelemetry.trace.Span` is started. + + This method is called synchronously on the thread that starts the + span, therefore it should not block or throw an exception. + + Args: + span: The :class:`opentelemetry.trace.Span` that just started. + parent_context: The parent context of the span that just started. + """ + + def on_end(self, span: "ReadableSpan") -> None: + """Called when a :class:`opentelemetry.trace.Span` is ended. + + This method is called synchronously on the thread that ends the + span, therefore it should not block or throw an exception. + + Args: + span: The :class:`opentelemetry.trace.Span` that just ended. + """ + + def shutdown(self) -> None: + """Called when a :class:`opentelemetry.sdk.trace.TracerProvider` is shutdown.""" + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Export all ended spans to the configured Exporter that have not yet + been exported. + + Args: + timeout_millis: The maximum amount of time to wait for spans to be + exported. + + Returns: + False if the timeout is exceeded, True otherwise. + """ + + +# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved +# pylint:disable=no-member +class SynchronousMultiSpanProcessor(SpanProcessor): + """Implementation of class:`SpanProcessor` that forwards all received + events to a list of span processors sequentially. + + The underlying span processors are called in sequential order as they were + added. + """ + + _span_processors: Tuple[SpanProcessor, ...] + + def __init__(self): + # use a tuple to avoid race conditions when adding a new span and + # iterating through it on "on_start" and "on_end". + self._span_processors = () + self._lock = threading.Lock() + + def add_span_processor(self, span_processor: SpanProcessor) -> None: + """Adds a SpanProcessor to the list handled by this instance.""" + with self._lock: + self._span_processors += (span_processor,) + + def on_start( + self, + span: "Span", + parent_context: Optional[context_api.Context] = None, + ) -> None: + for sp in self._span_processors: + sp.on_start(span, parent_context=parent_context) + + def on_end(self, span: "ReadableSpan") -> None: + for sp in self._span_processors: + sp.on_end(span) + + def shutdown(self) -> None: + """Sequentially shuts down all underlying span processors.""" + for sp in self._span_processors: + sp.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Sequentially calls force_flush on all underlying + :class:`SpanProcessor` + + Args: + timeout_millis: The maximum amount of time over all span processors + to wait for spans to be exported. In case the first n span + processors exceeded the timeout followup span processors will be + skipped. + + Returns: + True if all span processors flushed their spans within the + given timeout, False otherwise. + """ + deadline_ns = time_ns() + timeout_millis * 1000000 + for sp in self._span_processors: + current_time_ns = time_ns() + if current_time_ns >= deadline_ns: + return False + + if not sp.force_flush((deadline_ns - current_time_ns) // 1000000): + return False + + return True + + +class ConcurrentMultiSpanProcessor(SpanProcessor): + """Implementation of :class:`SpanProcessor` that forwards all received + events to a list of span processors in parallel. + + Calls to the underlying span processors are forwarded in parallel by + submitting them to a thread pool executor and waiting until each span + processor finished its work. + + Args: + num_threads: The number of threads managed by the thread pool executor + and thus defining how many span processors can work in parallel. + """ + + def __init__(self, num_threads: int = 2): + # use a tuple to avoid race conditions when adding a new span and + # iterating through it on "on_start" and "on_end". + self._span_processors = () # type: Tuple[SpanProcessor, ...] + self._lock = threading.Lock() + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=num_threads + ) + + def add_span_processor(self, span_processor: SpanProcessor) -> None: + """Adds a SpanProcessor to the list handled by this instance.""" + with self._lock: + self._span_processors += (span_processor,) + + def _submit_and_await( + self, + func: Callable[[SpanProcessor], Callable[..., None]], + *args: Any, + **kwargs: Any, + ): + futures = [] + for sp in self._span_processors: + future = self._executor.submit(func(sp), *args, **kwargs) + futures.append(future) + for future in futures: + future.result() + + def on_start( + self, + span: "Span", + parent_context: Optional[context_api.Context] = None, + ) -> None: + self._submit_and_await( + lambda sp: sp.on_start, span, parent_context=parent_context + ) + + def on_end(self, span: "ReadableSpan") -> None: + self._submit_and_await(lambda sp: sp.on_end, span) + + def shutdown(self) -> None: + """Shuts down all underlying span processors in parallel.""" + self._submit_and_await(lambda sp: sp.shutdown) + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Calls force_flush on all underlying span processors in parallel. + + Args: + timeout_millis: The maximum amount of time to wait for spans to be + exported. + + Returns: + True if all span processors flushed their spans within the given + timeout, False otherwise. + """ + futures = [] + for sp in self._span_processors: # type: SpanProcessor + future = self._executor.submit(sp.force_flush, timeout_millis) + futures.append(future) + + timeout_sec = timeout_millis / 1e3 + done_futures, not_done_futures = concurrent.futures.wait( + futures, timeout_sec + ) + if not_done_futures: + return False + + for future in done_futures: + if not future.result(): + return False + + return True + + +class EventBase(abc.ABC): + def __init__(self, name: str, timestamp: Optional[int] = None) -> None: + self._name = name + if timestamp is None: + self._timestamp = time_ns() + else: + self._timestamp = timestamp + + @property + def name(self) -> str: + return self._name + + @property + def timestamp(self) -> int: + return self._timestamp + + @property + @abc.abstractmethod + def attributes(self) -> types.Attributes: + pass + + +class Event(EventBase): + """A text annotation with a set of attributes. The attributes of an event + are immutable. + + Args: + name: Name of the event. + attributes: Attributes of the event. + timestamp: Timestamp of the event. If `None` it will filled + automatically. + """ + + def __init__( + self, + name: str, + attributes: types.Attributes = None, + timestamp: Optional[int] = None, + limit: Optional[int] = _DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, + ) -> None: + super().__init__(name, timestamp) + self._attributes = attributes + + @property + def attributes(self) -> types.Attributes: + return self._attributes + + @property + def dropped_attributes(self) -> int: + if isinstance(self._attributes, BoundedAttributes): + return self._attributes.dropped + return 0 + + +def _check_span_ended(func): + def wrapper(self, *args, **kwargs): + already_ended = False + with self._lock: # pylint: disable=protected-access + if self._end_time is None: # pylint: disable=protected-access + func(self, *args, **kwargs) + else: + already_ended = True + + if already_ended: + logger.warning("Tried calling %s on an ended span.", func.__name__) + + return wrapper + + +def _is_valid_link(context: SpanContext, attributes: types.Attributes) -> bool: + return bool( + context and (context.is_valid or (attributes or context.trace_state)) + ) + + +class ReadableSpan: + """Provides read-only access to span attributes. + + Users should NOT be creating these objects directly. `ReadableSpan`s are created as + a direct result from using the tracing pipeline via the `Tracer`. + + """ + + def __init__( + self, + name: str, + context: Optional[trace_api.SpanContext] = None, + parent: Optional[trace_api.SpanContext] = None, + resource: Optional[Resource] = None, + attributes: types.Attributes = None, + events: Sequence[Event] = (), + links: Sequence[trace_api.Link] = (), + kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL, + instrumentation_info: Optional[InstrumentationInfo] = None, + status: Status = Status(StatusCode.UNSET), + start_time: Optional[int] = None, + end_time: Optional[int] = None, + instrumentation_scope: Optional[InstrumentationScope] = None, + ) -> None: + self._name = name + self._context = context + self._kind = kind + self._instrumentation_info = instrumentation_info + self._instrumentation_scope = instrumentation_scope + self._parent = parent + self._start_time = start_time + self._end_time = end_time + self._attributes = attributes + self._events = events + self._links = links + if resource is None: + self._resource = Resource.create({}) + else: + self._resource = resource + self._status = status + + @property + def dropped_attributes(self) -> int: + if isinstance(self._attributes, BoundedAttributes): + return self._attributes.dropped + return 0 + + @property + def dropped_events(self) -> int: + if isinstance(self._events, BoundedList): + return self._events.dropped + return 0 + + @property + def dropped_links(self) -> int: + if isinstance(self._links, BoundedList): + return self._links.dropped + return 0 + + @property + def name(self) -> str: + return self._name + + def get_span_context(self): + return self._context + + @property + def context(self): + return self._context + + @property + def kind(self) -> trace_api.SpanKind: + return self._kind + + @property + def parent(self) -> Optional[trace_api.SpanContext]: + return self._parent + + @property + def start_time(self) -> Optional[int]: + return self._start_time + + @property + def end_time(self) -> Optional[int]: + return self._end_time + + @property + def status(self) -> trace_api.Status: + return self._status + + @property + def attributes(self) -> types.Attributes: + return MappingProxyType(self._attributes or {}) + + @property + def events(self) -> Sequence[Event]: + return tuple(event for event in self._events) + + @property + def links(self) -> Sequence[trace_api.Link]: + return tuple(link for link in self._links) + + @property + def resource(self) -> Resource: + return self._resource + + @property + @deprecated( + version="1.11.1", reason="You should use instrumentation_scope" + ) + def instrumentation_info(self) -> Optional[InstrumentationInfo]: + return self._instrumentation_info + + @property + def instrumentation_scope(self) -> Optional[InstrumentationScope]: + return self._instrumentation_scope + + def to_json(self, indent: Optional[int] = 4): + parent_id = None + if self.parent is not None: + parent_id = f"0x{trace_api.format_span_id(self.parent.span_id)}" + + start_time = None + if self._start_time: + start_time = util.ns_to_iso_str(self._start_time) + + end_time = None + if self._end_time: + end_time = util.ns_to_iso_str(self._end_time) + + status = { + "status_code": str(self._status.status_code.name), + } + if self._status.description: + status["description"] = self._status.description + + f_span = { + "name": self._name, + "context": ( + self._format_context(self._context) if self._context else None + ), + "kind": str(self.kind), + "parent_id": parent_id, + "start_time": start_time, + "end_time": end_time, + "status": status, + "attributes": self._format_attributes(self._attributes), + "events": self._format_events(self._events), + "links": self._format_links(self._links), + "resource": json.loads(self.resource.to_json()), + } + + return json.dumps(f_span, indent=indent) + + @staticmethod + def _format_context(context: SpanContext) -> Dict[str, str]: + return { + "trace_id": f"0x{trace_api.format_trace_id(context.trace_id)}", + "span_id": f"0x{trace_api.format_span_id(context.span_id)}", + "trace_state": repr(context.trace_state), + } + + @staticmethod + def _format_attributes( + attributes: types.Attributes, + ) -> Optional[Dict[str, Any]]: + if attributes is not None and not isinstance(attributes, dict): + return dict(attributes) + return attributes + + @staticmethod + def _format_events(events: Sequence[Event]) -> List[Dict[str, Any]]: + return [ + { + "name": event.name, + "timestamp": util.ns_to_iso_str(event.timestamp), + "attributes": Span._format_attributes( # pylint: disable=protected-access + event.attributes + ), + } + for event in events + ] + + @staticmethod + def _format_links(links: Sequence[trace_api.Link]) -> List[Dict[str, Any]]: + return [ + { + "context": Span._format_context( # pylint: disable=protected-access + link.context + ), + "attributes": Span._format_attributes( # pylint: disable=protected-access + link.attributes + ), + } + for link in links + ] + + +class SpanLimits: + """The limits that should be enforce on recorded data such as events, links, attributes etc. + + This class does not enforce any limits itself. It only provides an a way read limits from env, + default values and from user provided arguments. + + All limit arguments must be either a non-negative integer, ``None`` or ``SpanLimits.UNSET``. + + - All limit arguments are optional. + - If a limit argument is not set, the class will try to read its value from the corresponding + environment variable. + - If the environment variable is not set, the default value, if any, will be used. + + Limit precedence: + + - If a model specific limit is set, it will be used. + - Else if the corresponding global limit is set, it will be used. + - Else if the model specific limit has a default value, the default value will be used. + - Else if the global limit has a default value, the default value will be used. + + Args: + max_attributes: Maximum number of attributes that can be added to a span, event, and link. + Environment variable: OTEL_ATTRIBUTE_COUNT_LIMIT + Default: {_DEFAULT_ATTRIBUTE_COUNT_LIMIT} + max_events: Maximum number of events that can be added to a Span. + Environment variable: OTEL_SPAN_EVENT_COUNT_LIMIT + Default: {_DEFAULT_SPAN_EVENT_COUNT_LIMIT} + max_links: Maximum number of links that can be added to a Span. + Environment variable: OTEL_SPAN_LINK_COUNT_LIMIT + Default: {_DEFAULT_SPAN_LINK_COUNT_LIMIT} + max_span_attributes: Maximum number of attributes that can be added to a Span. + Environment variable: OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT + Default: {_DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT} + max_event_attributes: Maximum number of attributes that can be added to an Event. + Default: {_DEFAULT_OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT} + max_link_attributes: Maximum number of attributes that can be added to a Link. + Default: {_DEFAULT_OTEL_LINK_ATTRIBUTE_COUNT_LIMIT} + max_attribute_length: Maximum length an attribute value can have. Values longer than + the specified length will be truncated. + max_span_attribute_length: Maximum length a span attribute value can have. Values longer than + the specified length will be truncated. + """ + + UNSET = -1 + + def __init__( + self, + max_attributes: Optional[int] = None, + max_events: Optional[int] = None, + max_links: Optional[int] = None, + max_span_attributes: Optional[int] = None, + max_event_attributes: Optional[int] = None, + max_link_attributes: Optional[int] = None, + max_attribute_length: Optional[int] = None, + max_span_attribute_length: Optional[int] = None, + ): + # span events and links count + self.max_events = self._from_env_if_absent( + max_events, + OTEL_SPAN_EVENT_COUNT_LIMIT, + _DEFAULT_OTEL_SPAN_EVENT_COUNT_LIMIT, + ) + self.max_links = self._from_env_if_absent( + max_links, + OTEL_SPAN_LINK_COUNT_LIMIT, + _DEFAULT_OTEL_SPAN_LINK_COUNT_LIMIT, + ) + + # attribute count + global_max_attributes = self._from_env_if_absent( + max_attributes, OTEL_ATTRIBUTE_COUNT_LIMIT + ) + self.max_attributes = ( + global_max_attributes + if global_max_attributes is not None + else _DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT + ) + + self.max_span_attributes = self._from_env_if_absent( + max_span_attributes, + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, + ( + global_max_attributes + if global_max_attributes is not None + else _DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT + ), + ) + self.max_event_attributes = self._from_env_if_absent( + max_event_attributes, + OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT, + ( + global_max_attributes + if global_max_attributes is not None + else _DEFAULT_OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT + ), + ) + self.max_link_attributes = self._from_env_if_absent( + max_link_attributes, + OTEL_LINK_ATTRIBUTE_COUNT_LIMIT, + ( + global_max_attributes + if global_max_attributes is not None + else _DEFAULT_OTEL_LINK_ATTRIBUTE_COUNT_LIMIT + ), + ) + + # attribute length + self.max_attribute_length = self._from_env_if_absent( + max_attribute_length, + OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, + ) + self.max_span_attribute_length = self._from_env_if_absent( + max_span_attribute_length, + OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT, + # use global attribute length limit as default + self.max_attribute_length, + ) + + def __repr__(self): + return f"{type(self).__name__}(max_span_attributes={self.max_span_attributes}, max_events_attributes={self.max_event_attributes}, max_link_attributes={self.max_link_attributes}, max_attributes={self.max_attributes}, max_events={self.max_events}, max_links={self.max_links}, max_attribute_length={self.max_attribute_length})" + + @classmethod + def _from_env_if_absent( + cls, value: Optional[int], env_var: str, default: Optional[int] = None + ) -> Optional[int]: + if value == cls.UNSET: + return None + + err_msg = "{} must be a non-negative integer but got {}" + + # if no value is provided for the limit, try to load it from env + if value is None: + # return default value if env var is not set + if env_var not in environ: + return default + + str_value = environ.get(env_var, "").strip().lower() + if str_value == _ENV_VALUE_UNSET: + return None + + try: + value = int(str_value) + except ValueError: + raise ValueError(err_msg.format(env_var, str_value)) + + if value < 0: + raise ValueError(err_msg.format(env_var, value)) + return value + + +_UnsetLimits = SpanLimits( + max_attributes=SpanLimits.UNSET, + max_events=SpanLimits.UNSET, + max_links=SpanLimits.UNSET, + max_span_attributes=SpanLimits.UNSET, + max_event_attributes=SpanLimits.UNSET, + max_link_attributes=SpanLimits.UNSET, + max_attribute_length=SpanLimits.UNSET, + max_span_attribute_length=SpanLimits.UNSET, +) + +# not removed for backward compat. please use SpanLimits instead. +SPAN_ATTRIBUTE_COUNT_LIMIT = SpanLimits._from_env_if_absent( # pylint: disable=protected-access + None, + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, + _DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, +) + + +class Span(trace_api.Span, ReadableSpan): + """See `opentelemetry.trace.Span`. + + Users should create `Span` objects via the `Tracer` instead of this + constructor. + + Args: + name: The name of the operation this span represents + context: The immutable span context + parent: This span's parent's `opentelemetry.trace.SpanContext`, or + None if this is a root span + sampler: The sampler used to create this span + trace_config: TODO + resource: Entity producing telemetry + attributes: The span's attributes to be exported + events: Timestamped events to be exported + links: Links to other spans to be exported + span_processor: `SpanProcessor` to invoke when starting and ending + this `Span`. + limits: `SpanLimits` instance that was passed to the `TracerProvider` + """ + + def __new__(cls, *args, **kwargs): + if cls is Span: + raise TypeError("Span must be instantiated via a tracer.") + return super().__new__(cls) + + # pylint: disable=too-many-locals + def __init__( + self, + name: str, + context: trace_api.SpanContext, + parent: Optional[trace_api.SpanContext] = None, + sampler: Optional[sampling.Sampler] = None, + trace_config: None = None, # TODO + resource: Optional[Resource] = None, + attributes: types.Attributes = None, + events: Optional[Sequence[Event]] = None, + links: Sequence[trace_api.Link] = (), + kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL, + span_processor: SpanProcessor = SpanProcessor(), + instrumentation_info: Optional[InstrumentationInfo] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + limits=_UnsetLimits, + instrumentation_scope: Optional[InstrumentationScope] = None, + ) -> None: + if resource is None: + resource = Resource.create({}) + super().__init__( + name=name, + context=context, + parent=parent, + kind=kind, + resource=resource, + instrumentation_info=instrumentation_info, + instrumentation_scope=instrumentation_scope, + ) + self._sampler = sampler + self._trace_config = trace_config + self._record_exception = record_exception + self._set_status_on_exception = set_status_on_exception + self._span_processor = span_processor + self._limits = limits + self._lock = threading.Lock() + self._attributes = BoundedAttributes( + self._limits.max_span_attributes, + attributes, + immutable=False, + max_value_len=self._limits.max_span_attribute_length, + ) + self._events = self._new_events() + if events: + for event in events: + event._attributes = BoundedAttributes( + self._limits.max_event_attributes, + event.attributes, + max_value_len=self._limits.max_attribute_length, + ) + self._events.append(event) + + self._links = self._new_links(links) + + def __repr__(self): + return f'{type(self).__name__}(name="{self._name}", context={self._context})' + + def _new_events(self): + return BoundedList(self._limits.max_events) + + def _new_links(self, links: Sequence[trace_api.Link]): + if not links: + return BoundedList(self._limits.max_links) + + valid_links = [] + for link in links: + if link and _is_valid_link(link.context, link.attributes): + # pylint: disable=protected-access + link._attributes = BoundedAttributes( + self._limits.max_link_attributes, + link.attributes, + max_value_len=self._limits.max_attribute_length, + ) + valid_links.append(link) + + return BoundedList.from_seq(self._limits.max_links, valid_links) + + def get_span_context(self): + return self._context + + def set_attributes( + self, attributes: Mapping[str, types.AttributeValue] + ) -> None: + with self._lock: + if self._end_time is not None: + logger.warning("Setting attribute on ended span.") + return + + for key, value in attributes.items(): + self._attributes[key] = value + + def set_attribute(self, key: str, value: types.AttributeValue) -> None: + return self.set_attributes({key: value}) + + @_check_span_ended + def _add_event(self, event: EventBase) -> None: + self._events.append(event) + + def add_event( + self, + name: str, + attributes: types.Attributes = None, + timestamp: Optional[int] = None, + ) -> None: + attributes = BoundedAttributes( + self._limits.max_event_attributes, + attributes, + max_value_len=self._limits.max_attribute_length, + ) + self._add_event( + Event( + name=name, + attributes=attributes, + timestamp=timestamp, + ) + ) + + @_check_span_ended + def _add_link(self, link: trace_api.Link) -> None: + self._links.append(link) + + def add_link( + self, + context: SpanContext, + attributes: types.Attributes = None, + ) -> None: + if not _is_valid_link(context, attributes): + return + + attributes = BoundedAttributes( + self._limits.max_link_attributes, + attributes, + max_value_len=self._limits.max_attribute_length, + ) + self._add_link( + trace_api.Link( + context=context, + attributes=attributes, + ) + ) + + def _readable_span(self) -> ReadableSpan: + return ReadableSpan( + name=self._name, + context=self._context, + parent=self._parent, + resource=self._resource, + attributes=self._attributes, + events=self._events, + links=self._links, + kind=self.kind, + status=self._status, + start_time=self._start_time, + end_time=self._end_time, + instrumentation_info=self._instrumentation_info, + instrumentation_scope=self._instrumentation_scope, + ) + + def start( + self, + start_time: Optional[int] = None, + parent_context: Optional[context_api.Context] = None, + ) -> None: + with self._lock: + if self._start_time is not None: + logger.warning("Calling start() on a started span.") + return + self._start_time = ( + start_time if start_time is not None else time_ns() + ) + + self._span_processor.on_start(self, parent_context=parent_context) + + def end(self, end_time: Optional[int] = None) -> None: + with self._lock: + if self._start_time is None: + raise RuntimeError("Calling end() on a not started span.") + if self._end_time is not None: + logger.warning("Calling end() on an ended span.") + return + + self._end_time = end_time if end_time is not None else time_ns() + + self._span_processor.on_end(self._readable_span()) + + @_check_span_ended + def update_name(self, name: str) -> None: + self._name = name + + def is_recording(self) -> bool: + return self._end_time is None + + @_check_span_ended + def set_status( + self, + status: typing.Union[Status, StatusCode], + description: typing.Optional[str] = None, + ) -> None: + # Ignore future calls if status is already set to OK + # Ignore calls to set to StatusCode.UNSET + if isinstance(status, Status): + if ( + self._status + and self._status.status_code is StatusCode.OK + or status.status_code is StatusCode.UNSET + ): + return + if description is not None: + logger.warning( + "Description %s ignored. Use either `Status` or `(StatusCode, Description)`", + description, + ) + self._status = status + elif isinstance(status, StatusCode): + if ( + self._status + and self._status.status_code is StatusCode.OK + or status is StatusCode.UNSET + ): + return + self._status = Status(status, description) + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + """Ends context manager and calls `end` on the `Span`.""" + if exc_val is not None and self.is_recording(): + # Record the exception as an event + # pylint:disable=protected-access + if self._record_exception: + self.record_exception(exception=exc_val, escaped=True) + # Records status if span is used as context manager + # i.e. with tracer.start_span() as span: + if self._set_status_on_exception: + self.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{exc_type.__name__}: {exc_val}", + ) + ) + + super().__exit__(exc_type, exc_val, exc_tb) + + def record_exception( + self, + exception: BaseException, + attributes: types.Attributes = None, + timestamp: Optional[int] = None, + escaped: bool = False, + ) -> None: + """Records an exception as a span event.""" + # TODO: keep only exception as first argument after baseline is 3.10 + stacktrace = "".join( + traceback.format_exception( + type(exception), value=exception, tb=exception.__traceback__ + ) + ) + module = type(exception).__module__ + qualname = type(exception).__qualname__ + exception_type = ( + f"{module}.{qualname}" + if module and module != "builtins" + else qualname + ) + _attributes: MutableMapping[str, types.AttributeValue] = { + EXCEPTION_TYPE: exception_type, + EXCEPTION_MESSAGE: str(exception), + EXCEPTION_STACKTRACE: stacktrace, + EXCEPTION_ESCAPED: str(escaped), + } + if attributes: + _attributes.update(attributes) + self.add_event( + name="exception", attributes=_attributes, timestamp=timestamp + ) + + +class _Span(Span): + """Protected implementation of `opentelemetry.trace.Span`. + + This constructor exists to prevent the instantiation of the `Span` class + by other mechanisms than through the `Tracer`. + """ + + +class Tracer(trace_api.Tracer): + """See `opentelemetry.trace.Tracer`.""" + + def __init__( + self, + sampler: sampling.Sampler, + resource: Resource, + span_processor: Union[ + SynchronousMultiSpanProcessor, ConcurrentMultiSpanProcessor + ], + id_generator: IdGenerator, + instrumentation_info: InstrumentationInfo, + span_limits: SpanLimits, + instrumentation_scope: InstrumentationScope, + ) -> None: + self.sampler = sampler + self.resource = resource + self.span_processor = span_processor + self.id_generator = id_generator + self.instrumentation_info = instrumentation_info + self._span_limits = span_limits + self._instrumentation_scope = instrumentation_scope + + @_agnosticcontextmanager # pylint: disable=protected-access + def start_as_current_span( + self, + name: str, + context: Optional[context_api.Context] = None, + kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL, + attributes: types.Attributes = None, + links: Optional[Sequence[trace_api.Link]] = (), + start_time: Optional[int] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + ) -> Iterator[trace_api.Span]: + span = self.start_span( + name=name, + context=context, + kind=kind, + attributes=attributes, + links=links, + start_time=start_time, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) + with trace_api.use_span( + span, + end_on_exit=end_on_exit, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) as span: + yield span + + def start_span( # pylint: disable=too-many-locals + self, + name: str, + context: Optional[context_api.Context] = None, + kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL, + attributes: types.Attributes = None, + links: Optional[Sequence[trace_api.Link]] = (), + start_time: Optional[int] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + ) -> trace_api.Span: + parent_span_context = trace_api.get_current_span( + context + ).get_span_context() + + if parent_span_context is not None and not isinstance( + parent_span_context, trace_api.SpanContext + ): + raise TypeError( + "parent_span_context must be a SpanContext or None." + ) + + # is_valid determines root span + if parent_span_context is None or not parent_span_context.is_valid: + parent_span_context = None + trace_id = self.id_generator.generate_trace_id() + else: + trace_id = parent_span_context.trace_id + + # The sampler decides whether to create a real or no-op span at the + # time of span creation. No-op spans do not record events, and are not + # exported. + # The sampler may also add attributes to the newly-created span, e.g. + # to include information about the sampling result. + # The sampler may also modify the parent span context's tracestate + sampling_result = self.sampler.should_sample( + context, trace_id, name, kind, attributes, links + ) + + trace_flags = ( + trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED) + if sampling_result.decision.is_sampled() + else trace_api.TraceFlags(trace_api.TraceFlags.DEFAULT) + ) + span_context = trace_api.SpanContext( + trace_id, + self.id_generator.generate_span_id(), + is_remote=False, + trace_flags=trace_flags, + trace_state=sampling_result.trace_state, + ) + + # Only record if is_recording() is true + if sampling_result.decision.is_recording(): + # pylint:disable=protected-access + span = _Span( + name=name, + context=span_context, + parent=parent_span_context, + sampler=self.sampler, + resource=self.resource, + attributes=sampling_result.attributes.copy(), + span_processor=self.span_processor, + kind=kind, + links=links, + instrumentation_info=self.instrumentation_info, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + limits=self._span_limits, + instrumentation_scope=self._instrumentation_scope, + ) + span.start(start_time=start_time, parent_context=context) + else: + span = trace_api.NonRecordingSpan(context=span_context) + return span + + +class TracerProvider(trace_api.TracerProvider): + """See `opentelemetry.trace.TracerProvider`.""" + + def __init__( + self, + sampler: Optional[sampling.Sampler] = None, + resource: Optional[Resource] = None, + shutdown_on_exit: bool = True, + active_span_processor: Union[ + SynchronousMultiSpanProcessor, ConcurrentMultiSpanProcessor, None + ] = None, + id_generator: Optional[IdGenerator] = None, + span_limits: Optional[SpanLimits] = None, + ) -> None: + self._active_span_processor = ( + active_span_processor or SynchronousMultiSpanProcessor() + ) + if id_generator is None: + self.id_generator = RandomIdGenerator() + else: + self.id_generator = id_generator + if resource is None: + self._resource = Resource.create({}) + else: + self._resource = resource + if not sampler: + sampler = sampling._get_from_env_or_default() + self.sampler = sampler + self._span_limits = span_limits or SpanLimits() + disabled = environ.get(OTEL_SDK_DISABLED, "") + self._disabled = disabled.lower().strip() == "true" + self._atexit_handler = None + + if shutdown_on_exit: + self._atexit_handler = atexit.register(self.shutdown) + + @property + def resource(self) -> Resource: + return self._resource + + def get_tracer( + self, + instrumenting_module_name: str, + instrumenting_library_version: typing.Optional[str] = None, + schema_url: typing.Optional[str] = None, + attributes: typing.Optional[types.Attributes] = None, + ) -> "trace_api.Tracer": + if self._disabled: + return NoOpTracer() + if not instrumenting_module_name: # Reject empty strings too. + instrumenting_module_name = "" + logger.error("get_tracer called with missing module name.") + if instrumenting_library_version is None: + instrumenting_library_version = "" + + filterwarnings( + "ignore", + message=( + r"Call to deprecated method __init__. \(You should use " + r"InstrumentationScope\) -- Deprecated since version 1.11.1." + ), + category=DeprecationWarning, + module="opentelemetry.sdk.trace", + ) + + instrumentation_info = InstrumentationInfo( + instrumenting_module_name, + instrumenting_library_version, + schema_url, + ) + + return Tracer( + self.sampler, + self.resource, + self._active_span_processor, + self.id_generator, + instrumentation_info, + self._span_limits, + InstrumentationScope( + instrumenting_module_name, + instrumenting_library_version, + schema_url, + attributes, + ), + ) + + def add_span_processor(self, span_processor: SpanProcessor) -> None: + """Registers a new :class:`SpanProcessor` for this `TracerProvider`. + + The span processors are invoked in the same order they are registered. + """ + + # no lock here because add_span_processor is thread safe for both + # SynchronousMultiSpanProcessor and ConcurrentMultiSpanProcessor. + self._active_span_processor.add_span_processor(span_processor) + + def shutdown(self) -> None: + """Shut down the span processors added to the tracer provider.""" + self._active_span_processor.shutdown() + if self._atexit_handler is not None: + atexit.unregister(self._atexit_handler) + self._atexit_handler = None + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Requests the active span processor to process all spans that have not + yet been processed. + + By default force flush is called sequentially on all added span + processors. This means that span processors further back in the list + have less time to flush their spans. + To have span processors flush their spans in parallel it is possible to + initialize the tracer provider with an instance of + `ConcurrentMultiSpanProcessor` at the cost of using multiple threads. + + Args: + timeout_millis: The maximum amount of time to wait for spans to be + processed. + + Returns: + False if the timeout is exceeded, True otherwise. + """ + return self._active_span_processor.force_flush(timeout_millis) diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py new file mode 100644 index 00000000..47d1769a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py @@ -0,0 +1,517 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import collections +import logging +import os +import sys +import threading +import typing +from enum import Enum +from os import environ, linesep +from time import time_ns + +from opentelemetry.context import ( + _SUPPRESS_INSTRUMENTATION_KEY, + Context, + attach, + detach, + set_value, +) +from opentelemetry.sdk.environment_variables import ( + OTEL_BSP_EXPORT_TIMEOUT, + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, + OTEL_BSP_MAX_QUEUE_SIZE, + OTEL_BSP_SCHEDULE_DELAY, +) +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor +from opentelemetry.util._once import Once + +_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000 +_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512 +_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000 +_DEFAULT_MAX_QUEUE_SIZE = 2048 +_ENV_VAR_INT_VALUE_ERROR_MESSAGE = ( + "Unable to parse value for %s as integer. Defaulting to %s." +) + +logger = logging.getLogger(__name__) + + +class SpanExportResult(Enum): + SUCCESS = 0 + FAILURE = 1 + + +class SpanExporter: + """Interface for exporting spans. + + Interface to be implemented by services that want to export spans recorded + in their own format. + + To export data this MUST be registered to the :class`opentelemetry.sdk.trace.Tracer` using a + `SimpleSpanProcessor` or a `BatchSpanProcessor`. + """ + + def export( + self, spans: typing.Sequence[ReadableSpan] + ) -> "SpanExportResult": + """Exports a batch of telemetry data. + + Args: + spans: The list of `opentelemetry.trace.Span` objects to be exported + + Returns: + The result of the export + """ + + def shutdown(self) -> None: + """Shuts down the exporter. + + Called when the SDK is shut down. + """ + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Hint to ensure that the export of any spans the exporter has received + prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably + before returning from this method. + """ + + +class SimpleSpanProcessor(SpanProcessor): + """Simple SpanProcessor implementation. + + SimpleSpanProcessor is an implementation of `SpanProcessor` that + passes ended spans directly to the configured `SpanExporter`. + """ + + def __init__(self, span_exporter: SpanExporter): + self.span_exporter = span_exporter + + def on_start( + self, span: Span, parent_context: typing.Optional[Context] = None + ) -> None: + pass + + def on_end(self, span: ReadableSpan) -> None: + if not span.context.trace_flags.sampled: + return + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + self.span_exporter.export((span,)) + # pylint: disable=broad-exception-caught + except Exception: + logger.exception("Exception while exporting Span.") + detach(token) + + def shutdown(self) -> None: + self.span_exporter.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + # pylint: disable=unused-argument + return True + + +class _FlushRequest: + """Represents a request for the BatchSpanProcessor to flush spans.""" + + __slots__ = ["event", "num_spans"] + + def __init__(self): + self.event = threading.Event() + self.num_spans = 0 + + +_BSP_RESET_ONCE = Once() + + +class BatchSpanProcessor(SpanProcessor): + """Batch span processor implementation. + + `BatchSpanProcessor` is an implementation of `SpanProcessor` that + batches ended spans and pushes them to the configured `SpanExporter`. + + `BatchSpanProcessor` is configurable with the following environment + variables which correspond to constructor parameters: + + - :envvar:`OTEL_BSP_SCHEDULE_DELAY` + - :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` + - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` + - :envvar:`OTEL_BSP_EXPORT_TIMEOUT` + """ + + def __init__( + self, + span_exporter: SpanExporter, + max_queue_size: int | None = None, + schedule_delay_millis: float | None = None, + max_export_batch_size: int | None = None, + export_timeout_millis: float | None = None, + ): + if max_queue_size is None: + max_queue_size = BatchSpanProcessor._default_max_queue_size() + + if schedule_delay_millis is None: + schedule_delay_millis = ( + BatchSpanProcessor._default_schedule_delay_millis() + ) + + if max_export_batch_size is None: + max_export_batch_size = ( + BatchSpanProcessor._default_max_export_batch_size() + ) + + if export_timeout_millis is None: + export_timeout_millis = ( + BatchSpanProcessor._default_export_timeout_millis() + ) + + BatchSpanProcessor._validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ) + + self.span_exporter = span_exporter + self.queue = collections.deque([], max_queue_size) # type: typing.Deque[Span] + self.worker_thread = threading.Thread( + name="OtelBatchSpanProcessor", target=self.worker, daemon=True + ) + self.condition = threading.Condition(threading.Lock()) + self._flush_request = None # type: typing.Optional[_FlushRequest] + self.schedule_delay_millis = schedule_delay_millis + self.max_export_batch_size = max_export_batch_size + self.max_queue_size = max_queue_size + self.export_timeout_millis = export_timeout_millis + self.done = False + # flag that indicates that spans are being dropped + self._spans_dropped = False + # precallocated list to send spans to exporter + self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] + self.worker_thread.start() + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + self._pid = os.getpid() + + def on_start( + self, span: Span, parent_context: Context | None = None + ) -> None: + pass + + def on_end(self, span: ReadableSpan) -> None: + if self.done: + logger.warning("Already shutdown, dropping span.") + return + if not span.context.trace_flags.sampled: + return + if self._pid != os.getpid(): + _BSP_RESET_ONCE.do_once(self._at_fork_reinit) + + if len(self.queue) == self.max_queue_size: + if not self._spans_dropped: + logger.warning("Queue is full, likely spans will be dropped.") + self._spans_dropped = True + + self.queue.appendleft(span) + + if len(self.queue) >= self.max_export_batch_size: + with self.condition: + self.condition.notify() + + def _at_fork_reinit(self): + self.condition = threading.Condition(threading.Lock()) + self.queue.clear() + + # worker_thread is local to a process, only the thread that issued fork continues + # to exist. A new worker thread must be started in child process. + self.worker_thread = threading.Thread( + name="OtelBatchSpanProcessor", target=self.worker, daemon=True + ) + self.worker_thread.start() + self._pid = os.getpid() + + def worker(self): + timeout = self.schedule_delay_millis / 1e3 + flush_request = None # type: typing.Optional[_FlushRequest] + while not self.done: + with self.condition: + if self.done: + # done flag may have changed, avoid waiting + break + flush_request = self._get_and_unset_flush_request() + if ( + len(self.queue) < self.max_export_batch_size + and flush_request is None + ): + self.condition.wait(timeout) + flush_request = self._get_and_unset_flush_request() + if not self.queue: + # spurious notification, let's wait again, reset timeout + timeout = self.schedule_delay_millis / 1e3 + self._notify_flush_request_finished(flush_request) + flush_request = None + continue + if self.done: + # missing spans will be sent when calling flush + break + + # subtract the duration of this export call to the next timeout + start = time_ns() + self._export(flush_request) + end = time_ns() + duration = (end - start) / 1e9 + timeout = self.schedule_delay_millis / 1e3 - duration + + self._notify_flush_request_finished(flush_request) + flush_request = None + + # there might have been a new flush request while export was running + # and before the done flag switched to true + with self.condition: + shutdown_flush_request = self._get_and_unset_flush_request() + + # be sure that all spans are sent + self._drain_queue() + self._notify_flush_request_finished(flush_request) + self._notify_flush_request_finished(shutdown_flush_request) + + def _get_and_unset_flush_request( + self, + ) -> typing.Optional[_FlushRequest]: + """Returns the current flush request and makes it invisible to the + worker thread for subsequent calls. + """ + flush_request = self._flush_request + self._flush_request = None + if flush_request is not None: + flush_request.num_spans = len(self.queue) + return flush_request + + @staticmethod + def _notify_flush_request_finished( + flush_request: typing.Optional[_FlushRequest], + ): + """Notifies the flush initiator(s) waiting on the given request/event + that the flush operation was finished. + """ + if flush_request is not None: + flush_request.event.set() + + def _get_or_create_flush_request(self) -> _FlushRequest: + """Either returns the current active flush event or creates a new one. + + The flush event will be visible and read by the worker thread before an + export operation starts. Callers of a flush operation may wait on the + returned event to be notified when the flush/export operation was + finished. + + This method is not thread-safe, i.e. callers need to take care about + synchronization/locking. + """ + if self._flush_request is None: + self._flush_request = _FlushRequest() + return self._flush_request + + def _export(self, flush_request: typing.Optional[_FlushRequest]): + """Exports spans considering the given flush_request. + + In case of a given flush_requests spans are exported in batches until + the number of exported spans reached or exceeded the number of spans in + the flush request. + In no flush_request was given at most max_export_batch_size spans are + exported. + """ + if not flush_request: + self._export_batch() + return + + num_spans = flush_request.num_spans + while self.queue: + num_exported = self._export_batch() + num_spans -= num_exported + + if num_spans <= 0: + break + + def _export_batch(self) -> int: + """Exports at most max_export_batch_size spans and returns the number of + exported spans. + """ + idx = 0 + # currently only a single thread acts as consumer, so queue.pop() will + # not raise an exception + while idx < self.max_export_batch_size and self.queue: + self.spans_list[idx] = self.queue.pop() + idx += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + # Ignore type b/c the Optional[None]+slicing is too "clever" + # for mypy + self.span_exporter.export(self.spans_list[:idx]) # type: ignore + except Exception: # pylint: disable=broad-exception-caught + logger.exception("Exception while exporting Span batch.") + detach(token) + + # clean up list + for index in range(idx): + self.spans_list[index] = None + return idx + + def _drain_queue(self): + """Export all elements until queue is empty. + + Can only be called from the worker thread context because it invokes + `export` that is not thread safe. + """ + while self.queue: + self._export_batch() + + def force_flush(self, timeout_millis: int | None = None) -> bool: + if timeout_millis is None: + timeout_millis = self.export_timeout_millis + + if self.done: + logger.warning("Already shutdown, ignoring call to force_flush().") + return True + + with self.condition: + flush_request = self._get_or_create_flush_request() + # signal the worker thread to flush and wait for it to finish + self.condition.notify_all() + + # wait for token to be processed + ret = flush_request.event.wait(timeout_millis / 1e3) + if not ret: + logger.warning("Timeout was exceeded in force_flush().") + return ret + + def shutdown(self) -> None: + # signal the worker thread to finish and then wait for it + self.done = True + with self.condition: + self.condition.notify_all() + self.worker_thread.join() + self.span_exporter.shutdown() + + @staticmethod + def _default_max_queue_size(): + try: + return int( + environ.get(OTEL_BSP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE) + ) + except ValueError: + logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BSP_MAX_QUEUE_SIZE, + _DEFAULT_MAX_QUEUE_SIZE, + ) + return _DEFAULT_MAX_QUEUE_SIZE + + @staticmethod + def _default_schedule_delay_millis(): + try: + return int( + environ.get( + OTEL_BSP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS + ) + ) + except ValueError: + logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BSP_SCHEDULE_DELAY, + _DEFAULT_SCHEDULE_DELAY_MILLIS, + ) + return _DEFAULT_SCHEDULE_DELAY_MILLIS + + @staticmethod + def _default_max_export_batch_size(): + try: + return int( + environ.get( + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + ) + except ValueError: + logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + return _DEFAULT_MAX_EXPORT_BATCH_SIZE + + @staticmethod + def _default_export_timeout_millis(): + try: + return int( + environ.get( + OTEL_BSP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS + ) + ) + except ValueError: + logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BSP_EXPORT_TIMEOUT, + _DEFAULT_EXPORT_TIMEOUT_MILLIS, + ) + return _DEFAULT_EXPORT_TIMEOUT_MILLIS + + @staticmethod + def _validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ): + if max_queue_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + if max_export_batch_size <= 0: + raise ValueError( + "max_export_batch_size must be a positive integer." + ) + + if max_export_batch_size > max_queue_size: + raise ValueError( + "max_export_batch_size must be less than or equal to max_queue_size." + ) + + +class ConsoleSpanExporter(SpanExporter): + """Implementation of :class:`SpanExporter` that prints spans to the + console. + + This class can be used for diagnostic purposes. It prints the exported + spans to the console STDOUT. + """ + + def __init__( + self, + service_name: str | None = None, + out: typing.IO = sys.stdout, + formatter: typing.Callable[ + [ReadableSpan], str + ] = lambda span: span.to_json() + linesep, + ): + self.out = out + self.formatter = formatter + self.service_name = service_name + + def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: + for span in spans: + self.out.write(self.formatter(span)) + self.out.flush() + return SpanExportResult.SUCCESS + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py new file mode 100644 index 00000000..c28ecfd2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py @@ -0,0 +1,61 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import typing + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult + + +class InMemorySpanExporter(SpanExporter): + """Implementation of :class:`.SpanExporter` that stores spans in memory. + + This class can be used for testing purposes. It stores the exported spans + in a list in memory that can be retrieved using the + :func:`.get_finished_spans` method. + """ + + def __init__(self) -> None: + self._finished_spans: typing.List[ReadableSpan] = [] + self._stopped = False + self._lock = threading.Lock() + + def clear(self) -> None: + """Clear list of collected spans.""" + with self._lock: + self._finished_spans.clear() + + def get_finished_spans(self) -> typing.Tuple[ReadableSpan, ...]: + """Get list of collected spans.""" + with self._lock: + return tuple(self._finished_spans) + + def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: + """Stores a list of spans in memory.""" + if self._stopped: + return SpanExportResult.FAILURE + with self._lock: + self._finished_spans.extend(spans) + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + """Shut downs the exporter. + + Calls to export after the exporter has been shut down will fail. + """ + self._stopped = True + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/id_generator.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/id_generator.py new file mode 100644 index 00000000..cd1f89bc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/id_generator.py @@ -0,0 +1,60 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import random + +from opentelemetry import trace + + +class IdGenerator(abc.ABC): + @abc.abstractmethod + def generate_span_id(self) -> int: + """Get a new span ID. + + Returns: + A 64-bit int for use as a span ID + """ + + @abc.abstractmethod + def generate_trace_id(self) -> int: + """Get a new trace ID. + + Implementations should at least make the 64 least significant bits + uniformly random. Samplers like the `TraceIdRatioBased` sampler rely on + this randomness to make sampling decisions. + + See `the specification on TraceIdRatioBased <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#traceidratiobased>`_. + + Returns: + A 128-bit int for use as a trace ID + """ + + +class RandomIdGenerator(IdGenerator): + """The default ID generator for TracerProvider which randomly generates all + bits when generating IDs. + """ + + def generate_span_id(self) -> int: + span_id = random.getrandbits(64) + while span_id == trace.INVALID_SPAN_ID: + span_id = random.getrandbits(64) + return span_id + + def generate_trace_id(self) -> int: + trace_id = random.getrandbits(128) + while trace_id == trace.INVALID_TRACE_ID: + trace_id = random.getrandbits(128) + return trace_id diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/sampling.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/sampling.py new file mode 100644 index 00000000..fb6990a0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/sampling.py @@ -0,0 +1,453 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +For general information about sampling, see `the specification <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#sampling>`_. + +OpenTelemetry provides two types of samplers: + +- `StaticSampler` +- `TraceIdRatioBased` + +A `StaticSampler` always returns the same sampling result regardless of the conditions. Both possible StaticSamplers are already created: + +- Always sample spans: ALWAYS_ON +- Never sample spans: ALWAYS_OFF + +A `TraceIdRatioBased` sampler makes a random sampling result based on the sampling probability given. + +If the span being sampled has a parent, `ParentBased` will respect the parent delegate sampler. Otherwise, it returns the sampling result from the given root sampler. + +Currently, sampling results are always made during the creation of the span. However, this might not always be the case in the future (see `OTEP #115 <https://github.com/open-telemetry/oteps/pull/115>`_). + +Custom samplers can be created by subclassing `Sampler` and implementing `Sampler.should_sample` as well as `Sampler.get_description`. + +Samplers are able to modify the `opentelemetry.trace.span.TraceState` of the parent of the span being created. For custom samplers, it is suggested to implement `Sampler.should_sample` to utilize the +parent span context's `opentelemetry.trace.span.TraceState` and pass into the `SamplingResult` instead of the explicit trace_state field passed into the parameter of `Sampler.should_sample`. + +To use a sampler, pass it into the tracer provider constructor. For example: + +.. code:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, + ) + from opentelemetry.sdk.trace.sampling import TraceIdRatioBased + + # sample 1 in every 1000 traces + sampler = TraceIdRatioBased(1/1000) + + # set the sampler onto the global tracer provider + trace.set_tracer_provider(TracerProvider(sampler=sampler)) + + # set up an exporter for sampled spans + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # created spans will now be sampled by the TraceIdRatioBased sampler + with trace.get_tracer(__name__).start_as_current_span("Test Span"): + ... + +The tracer sampler can also be configured via environment variables ``OTEL_TRACES_SAMPLER`` and ``OTEL_TRACES_SAMPLER_ARG`` (only if applicable). +The list of built-in values for ``OTEL_TRACES_SAMPLER`` are: + + * always_on - Sampler that always samples spans, regardless of the parent span's sampling decision. + * always_off - Sampler that never samples spans, regardless of the parent span's sampling decision. + * traceidratio - Sampler that samples probabilistically based on rate. + * parentbased_always_on - (default) Sampler that respects its parent span's sampling decision, but otherwise always samples. + * parentbased_always_off - Sampler that respects its parent span's sampling decision, but otherwise never samples. + * parentbased_traceidratio - Sampler that respects its parent span's sampling decision, but otherwise samples probabilistically based on rate. + +Sampling probability can be set with ``OTEL_TRACES_SAMPLER_ARG`` if the sampler is traceidratio or parentbased_traceidratio. Rate must be in the range [0.0,1.0]. When not provided rate will be set to +1.0 (maximum rate possible). + +Prev example but with environment variables. Please make sure to set the env ``OTEL_TRACES_SAMPLER=traceidratio`` and ``OTEL_TRACES_SAMPLER_ARG=0.001``. + +.. code:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, + ) + + trace.set_tracer_provider(TracerProvider()) + + # set up an exporter for sampled spans + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # created spans will now be sampled by the TraceIdRatioBased sampler with rate 1/1000. + with trace.get_tracer(__name__).start_as_current_span("Test Span"): + ... + +When utilizing a configurator, you can configure a custom sampler. In order to create a configurable custom sampler, create an entry point for the custom sampler +factory method or function under the entry point group, ``opentelemetry_traces_sampler``. The custom sampler factory method must be of type ``Callable[[str], Sampler]``, taking a single string argument and +returning a Sampler object. The single input will come from the string value of the ``OTEL_TRACES_SAMPLER_ARG`` environment variable. If ``OTEL_TRACES_SAMPLER_ARG`` is not configured, the input will +be an empty string. For example: + +.. code:: python + + setup( + ... + entry_points={ + ... + "opentelemetry_traces_sampler": [ + "custom_sampler_name = path.to.sampler.factory.method:CustomSamplerFactory.get_sampler" + ] + } + ) + # ... + class CustomRatioSampler(Sampler): + def __init__(rate): + # ... + # ... + class CustomSamplerFactory: + @staticmethod + def get_sampler(sampler_argument): + try: + rate = float(sampler_argument) + return CustomSampler(rate) + except ValueError: # In case argument is empty string. + return CustomSampler(0.5) + +In order to configure you application with a custom sampler's entry point, set the ``OTEL_TRACES_SAMPLER`` environment variable to the key name of the entry point. For example, to configured the +above sampler, set ``OTEL_TRACES_SAMPLER=custom_sampler_name`` and ``OTEL_TRACES_SAMPLER_ARG=0.5``. +""" + +import abc +import enum +import os +from logging import getLogger +from types import MappingProxyType +from typing import Optional, Sequence + +# pylint: disable=unused-import +from opentelemetry.context import Context +from opentelemetry.sdk.environment_variables import ( + OTEL_TRACES_SAMPLER, + OTEL_TRACES_SAMPLER_ARG, +) +from opentelemetry.trace import Link, SpanKind, get_current_span +from opentelemetry.trace.span import TraceState +from opentelemetry.util.types import Attributes + +_logger = getLogger(__name__) + + +class Decision(enum.Enum): + # IsRecording() == false, span will not be recorded and all events and attributes will be dropped. + DROP = 0 + # IsRecording() == true, but Sampled flag MUST NOT be set. + RECORD_ONLY = 1 + # IsRecording() == true AND Sampled flag` MUST be set. + RECORD_AND_SAMPLE = 2 + + def is_recording(self): + return self in (Decision.RECORD_ONLY, Decision.RECORD_AND_SAMPLE) + + def is_sampled(self): + return self is Decision.RECORD_AND_SAMPLE + + +class SamplingResult: + """A sampling result as applied to a newly-created Span. + + Args: + decision: A sampling decision based off of whether the span is recorded + and the sampled flag in trace flags in the span context. + attributes: Attributes to add to the `opentelemetry.trace.Span`. + trace_state: The tracestate used for the `opentelemetry.trace.Span`. + Could possibly have been modified by the sampler. + """ + + def __repr__(self) -> str: + return f"{type(self).__name__}({str(self.decision)}, attributes={str(self.attributes)})" + + def __init__( + self, + decision: Decision, + attributes: "Attributes" = None, + trace_state: Optional["TraceState"] = None, + ) -> None: + self.decision = decision + if attributes is None: + self.attributes = MappingProxyType({}) + else: + self.attributes = MappingProxyType(attributes) + self.trace_state = trace_state + + +class Sampler(abc.ABC): + @abc.abstractmethod + def should_sample( + self, + parent_context: Optional["Context"], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence["Link"]] = None, + trace_state: Optional["TraceState"] = None, + ) -> "SamplingResult": + pass + + @abc.abstractmethod + def get_description(self) -> str: + pass + + +class StaticSampler(Sampler): + """Sampler that always returns the same decision.""" + + def __init__(self, decision: "Decision") -> None: + self._decision = decision + + def should_sample( + self, + parent_context: Optional["Context"], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence["Link"]] = None, + trace_state: Optional["TraceState"] = None, + ) -> "SamplingResult": + if self._decision is Decision.DROP: + attributes = None + return SamplingResult( + self._decision, + attributes, + _get_parent_trace_state(parent_context), + ) + + def get_description(self) -> str: + if self._decision is Decision.DROP: + return "AlwaysOffSampler" + return "AlwaysOnSampler" + + +ALWAYS_OFF = StaticSampler(Decision.DROP) +"""Sampler that never samples spans, regardless of the parent span's sampling decision.""" + +ALWAYS_ON = StaticSampler(Decision.RECORD_AND_SAMPLE) +"""Sampler that always samples spans, regardless of the parent span's sampling decision.""" + + +class TraceIdRatioBased(Sampler): + """ + Sampler that makes sampling decisions probabilistically based on `rate`. + + Args: + rate: Probability (between 0 and 1) that a span will be sampled + """ + + def __init__(self, rate: float): + if rate < 0.0 or rate > 1.0: + raise ValueError("Probability must be in range [0.0, 1.0].") + self._rate = rate + self._bound = self.get_bound_for_rate(self._rate) + + # For compatibility with 64 bit trace IDs, the sampler checks the 64 + # low-order bits of the trace ID to decide whether to sample a given trace. + TRACE_ID_LIMIT = (1 << 64) - 1 + + @classmethod + def get_bound_for_rate(cls, rate: float) -> int: + return round(rate * (cls.TRACE_ID_LIMIT + 1)) + + @property + def rate(self) -> float: + return self._rate + + @property + def bound(self) -> int: + return self._bound + + def should_sample( + self, + parent_context: Optional["Context"], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence["Link"]] = None, + trace_state: Optional["TraceState"] = None, + ) -> "SamplingResult": + decision = Decision.DROP + if trace_id & self.TRACE_ID_LIMIT < self.bound: + decision = Decision.RECORD_AND_SAMPLE + if decision is Decision.DROP: + attributes = None + return SamplingResult( + decision, + attributes, + _get_parent_trace_state(parent_context), + ) + + def get_description(self) -> str: + return f"TraceIdRatioBased{{{self._rate}}}" + + +class ParentBased(Sampler): + """ + If a parent is set, applies the respective delegate sampler. + Otherwise, uses the root provided at initialization to make a + decision. + + Args: + root: Sampler called for spans with no parent (root spans). + remote_parent_sampled: Sampler called for a remote sampled parent. + remote_parent_not_sampled: Sampler called for a remote parent that is + not sampled. + local_parent_sampled: Sampler called for a local sampled parent. + local_parent_not_sampled: Sampler called for a local parent that is + not sampled. + """ + + def __init__( + self, + root: Sampler, + remote_parent_sampled: Sampler = ALWAYS_ON, + remote_parent_not_sampled: Sampler = ALWAYS_OFF, + local_parent_sampled: Sampler = ALWAYS_ON, + local_parent_not_sampled: Sampler = ALWAYS_OFF, + ): + self._root = root + self._remote_parent_sampled = remote_parent_sampled + self._remote_parent_not_sampled = remote_parent_not_sampled + self._local_parent_sampled = local_parent_sampled + self._local_parent_not_sampled = local_parent_not_sampled + + def should_sample( + self, + parent_context: Optional["Context"], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence["Link"]] = None, + trace_state: Optional["TraceState"] = None, + ) -> "SamplingResult": + parent_span_context = get_current_span( + parent_context + ).get_span_context() + # default to the root sampler + sampler = self._root + # respect the sampling and remote flag of the parent if present + if parent_span_context is not None and parent_span_context.is_valid: + if parent_span_context.is_remote: + if parent_span_context.trace_flags.sampled: + sampler = self._remote_parent_sampled + else: + sampler = self._remote_parent_not_sampled + else: + if parent_span_context.trace_flags.sampled: + sampler = self._local_parent_sampled + else: + sampler = self._local_parent_not_sampled + + return sampler.should_sample( + parent_context=parent_context, + trace_id=trace_id, + name=name, + kind=kind, + attributes=attributes, + links=links, + ) + + def get_description(self): + return f"ParentBased{{root:{self._root.get_description()},remoteParentSampled:{self._remote_parent_sampled.get_description()},remoteParentNotSampled:{self._remote_parent_not_sampled.get_description()},localParentSampled:{self._local_parent_sampled.get_description()},localParentNotSampled:{self._local_parent_not_sampled.get_description()}}}" + + +DEFAULT_OFF = ParentBased(ALWAYS_OFF) +"""Sampler that respects its parent span's sampling decision, but otherwise never samples.""" + +DEFAULT_ON = ParentBased(ALWAYS_ON) +"""Sampler that respects its parent span's sampling decision, but otherwise always samples.""" + + +class ParentBasedTraceIdRatio(ParentBased): + """ + Sampler that respects its parent span's sampling decision, but otherwise + samples probabilistically based on `rate`. + """ + + def __init__(self, rate: float): + root = TraceIdRatioBased(rate=rate) + super().__init__(root=root) + + +class _AlwaysOff(StaticSampler): + def __init__(self, _): + super().__init__(Decision.DROP) + + +class _AlwaysOn(StaticSampler): + def __init__(self, _): + super().__init__(Decision.RECORD_AND_SAMPLE) + + +class _ParentBasedAlwaysOff(ParentBased): + def __init__(self, _): + super().__init__(ALWAYS_OFF) + + +class _ParentBasedAlwaysOn(ParentBased): + def __init__(self, _): + super().__init__(ALWAYS_ON) + + +_KNOWN_SAMPLERS = { + "always_on": ALWAYS_ON, + "always_off": ALWAYS_OFF, + "parentbased_always_on": DEFAULT_ON, + "parentbased_always_off": DEFAULT_OFF, + "traceidratio": TraceIdRatioBased, + "parentbased_traceidratio": ParentBasedTraceIdRatio, +} + + +def _get_from_env_or_default() -> Sampler: + trace_sampler = os.getenv( + OTEL_TRACES_SAMPLER, "parentbased_always_on" + ).lower() + if trace_sampler not in _KNOWN_SAMPLERS: + _logger.warning("Couldn't recognize sampler %s.", trace_sampler) + trace_sampler = "parentbased_always_on" + + if trace_sampler in ("traceidratio", "parentbased_traceidratio"): + try: + rate = float(os.getenv(OTEL_TRACES_SAMPLER_ARG)) + except (ValueError, TypeError): + _logger.warning("Could not convert TRACES_SAMPLER_ARG to float.") + rate = 1.0 + return _KNOWN_SAMPLERS[trace_sampler](rate) + + return _KNOWN_SAMPLERS[trace_sampler] + + +def _get_parent_trace_state( + parent_context: Optional[Context], +) -> Optional["TraceState"]: + parent_span_context = get_current_span(parent_context).get_span_context() + if parent_span_context is None or not parent_span_context.is_valid: + return None + return parent_span_context.trace_state |