aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace')
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/__init__.py1305
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py517
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py61
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/id_generator.py60
-rw-r--r--.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/sampling.py453
5 files changed, 2396 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/__init__.py
new file mode 100644
index 00000000..3ac45806
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/__init__.py
@@ -0,0 +1,1305 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=too-many-lines
+import abc
+import atexit
+import concurrent.futures
+import json
+import logging
+import threading
+import traceback
+import typing
+from os import environ
+from time import time_ns
+from types import MappingProxyType, TracebackType
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterator,
+ List,
+ Mapping,
+ MutableMapping,
+ Optional,
+ Sequence,
+ Tuple,
+ Type,
+ Union,
+)
+from warnings import filterwarnings
+
+from deprecated import deprecated
+
+from opentelemetry import context as context_api
+from opentelemetry import trace as trace_api
+from opentelemetry.attributes import BoundedAttributes
+from opentelemetry.sdk import util
+from opentelemetry.sdk.environment_variables import (
+ OTEL_ATTRIBUTE_COUNT_LIMIT,
+ OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT,
+ OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT,
+ OTEL_LINK_ATTRIBUTE_COUNT_LIMIT,
+ OTEL_SDK_DISABLED,
+ OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
+ OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT,
+ OTEL_SPAN_EVENT_COUNT_LIMIT,
+ OTEL_SPAN_LINK_COUNT_LIMIT,
+)
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.trace import sampling
+from opentelemetry.sdk.trace.id_generator import IdGenerator, RandomIdGenerator
+from opentelemetry.sdk.util import BoundedList
+from opentelemetry.sdk.util.instrumentation import (
+ InstrumentationInfo,
+ InstrumentationScope,
+)
+from opentelemetry.semconv.attributes.exception_attributes import (
+ EXCEPTION_ESCAPED,
+ EXCEPTION_MESSAGE,
+ EXCEPTION_STACKTRACE,
+ EXCEPTION_TYPE,
+)
+from opentelemetry.trace import NoOpTracer, SpanContext
+from opentelemetry.trace.status import Status, StatusCode
+from opentelemetry.util import types
+from opentelemetry.util._decorator import _agnosticcontextmanager
+
+logger = logging.getLogger(__name__)
+
+_DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT = 128
+_DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT = 128
+_DEFAULT_OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT = 128
+_DEFAULT_OTEL_LINK_ATTRIBUTE_COUNT_LIMIT = 128
+_DEFAULT_OTEL_SPAN_EVENT_COUNT_LIMIT = 128
+_DEFAULT_OTEL_SPAN_LINK_COUNT_LIMIT = 128
+
+
+_ENV_VALUE_UNSET = ""
+
+
+class SpanProcessor:
+ """Interface which allows hooks for SDK's `Span` start and end method
+ invocations.
+
+ Span processors can be registered directly using
+ :func:`TracerProvider.add_span_processor` and they are invoked
+ in the same order as they were registered.
+ """
+
+ def on_start(
+ self,
+ span: "Span",
+ parent_context: Optional[context_api.Context] = None,
+ ) -> None:
+ """Called when a :class:`opentelemetry.trace.Span` is started.
+
+ This method is called synchronously on the thread that starts the
+ span, therefore it should not block or throw an exception.
+
+ Args:
+ span: The :class:`opentelemetry.trace.Span` that just started.
+ parent_context: The parent context of the span that just started.
+ """
+
+ def on_end(self, span: "ReadableSpan") -> None:
+ """Called when a :class:`opentelemetry.trace.Span` is ended.
+
+ This method is called synchronously on the thread that ends the
+ span, therefore it should not block or throw an exception.
+
+ Args:
+ span: The :class:`opentelemetry.trace.Span` that just ended.
+ """
+
+ def shutdown(self) -> None:
+ """Called when a :class:`opentelemetry.sdk.trace.TracerProvider` is shutdown."""
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Export all ended spans to the configured Exporter that have not yet
+ been exported.
+
+ Args:
+ timeout_millis: The maximum amount of time to wait for spans to be
+ exported.
+
+ Returns:
+ False if the timeout is exceeded, True otherwise.
+ """
+
+
+# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
+# pylint:disable=no-member
+class SynchronousMultiSpanProcessor(SpanProcessor):
+ """Implementation of class:`SpanProcessor` that forwards all received
+ events to a list of span processors sequentially.
+
+ The underlying span processors are called in sequential order as they were
+ added.
+ """
+
+ _span_processors: Tuple[SpanProcessor, ...]
+
+ def __init__(self):
+ # use a tuple to avoid race conditions when adding a new span and
+ # iterating through it on "on_start" and "on_end".
+ self._span_processors = ()
+ self._lock = threading.Lock()
+
+ def add_span_processor(self, span_processor: SpanProcessor) -> None:
+ """Adds a SpanProcessor to the list handled by this instance."""
+ with self._lock:
+ self._span_processors += (span_processor,)
+
+ def on_start(
+ self,
+ span: "Span",
+ parent_context: Optional[context_api.Context] = None,
+ ) -> None:
+ for sp in self._span_processors:
+ sp.on_start(span, parent_context=parent_context)
+
+ def on_end(self, span: "ReadableSpan") -> None:
+ for sp in self._span_processors:
+ sp.on_end(span)
+
+ def shutdown(self) -> None:
+ """Sequentially shuts down all underlying span processors."""
+ for sp in self._span_processors:
+ sp.shutdown()
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Sequentially calls force_flush on all underlying
+ :class:`SpanProcessor`
+
+ Args:
+ timeout_millis: The maximum amount of time over all span processors
+ to wait for spans to be exported. In case the first n span
+ processors exceeded the timeout followup span processors will be
+ skipped.
+
+ Returns:
+ True if all span processors flushed their spans within the
+ given timeout, False otherwise.
+ """
+ deadline_ns = time_ns() + timeout_millis * 1000000
+ for sp in self._span_processors:
+ current_time_ns = time_ns()
+ if current_time_ns >= deadline_ns:
+ return False
+
+ if not sp.force_flush((deadline_ns - current_time_ns) // 1000000):
+ return False
+
+ return True
+
+
+class ConcurrentMultiSpanProcessor(SpanProcessor):
+ """Implementation of :class:`SpanProcessor` that forwards all received
+ events to a list of span processors in parallel.
+
+ Calls to the underlying span processors are forwarded in parallel by
+ submitting them to a thread pool executor and waiting until each span
+ processor finished its work.
+
+ Args:
+ num_threads: The number of threads managed by the thread pool executor
+ and thus defining how many span processors can work in parallel.
+ """
+
+ def __init__(self, num_threads: int = 2):
+ # use a tuple to avoid race conditions when adding a new span and
+ # iterating through it on "on_start" and "on_end".
+ self._span_processors = () # type: Tuple[SpanProcessor, ...]
+ self._lock = threading.Lock()
+ self._executor = concurrent.futures.ThreadPoolExecutor(
+ max_workers=num_threads
+ )
+
+ def add_span_processor(self, span_processor: SpanProcessor) -> None:
+ """Adds a SpanProcessor to the list handled by this instance."""
+ with self._lock:
+ self._span_processors += (span_processor,)
+
+ def _submit_and_await(
+ self,
+ func: Callable[[SpanProcessor], Callable[..., None]],
+ *args: Any,
+ **kwargs: Any,
+ ):
+ futures = []
+ for sp in self._span_processors:
+ future = self._executor.submit(func(sp), *args, **kwargs)
+ futures.append(future)
+ for future in futures:
+ future.result()
+
+ def on_start(
+ self,
+ span: "Span",
+ parent_context: Optional[context_api.Context] = None,
+ ) -> None:
+ self._submit_and_await(
+ lambda sp: sp.on_start, span, parent_context=parent_context
+ )
+
+ def on_end(self, span: "ReadableSpan") -> None:
+ self._submit_and_await(lambda sp: sp.on_end, span)
+
+ def shutdown(self) -> None:
+ """Shuts down all underlying span processors in parallel."""
+ self._submit_and_await(lambda sp: sp.shutdown)
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Calls force_flush on all underlying span processors in parallel.
+
+ Args:
+ timeout_millis: The maximum amount of time to wait for spans to be
+ exported.
+
+ Returns:
+ True if all span processors flushed their spans within the given
+ timeout, False otherwise.
+ """
+ futures = []
+ for sp in self._span_processors: # type: SpanProcessor
+ future = self._executor.submit(sp.force_flush, timeout_millis)
+ futures.append(future)
+
+ timeout_sec = timeout_millis / 1e3
+ done_futures, not_done_futures = concurrent.futures.wait(
+ futures, timeout_sec
+ )
+ if not_done_futures:
+ return False
+
+ for future in done_futures:
+ if not future.result():
+ return False
+
+ return True
+
+
+class EventBase(abc.ABC):
+ def __init__(self, name: str, timestamp: Optional[int] = None) -> None:
+ self._name = name
+ if timestamp is None:
+ self._timestamp = time_ns()
+ else:
+ self._timestamp = timestamp
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def timestamp(self) -> int:
+ return self._timestamp
+
+ @property
+ @abc.abstractmethod
+ def attributes(self) -> types.Attributes:
+ pass
+
+
+class Event(EventBase):
+ """A text annotation with a set of attributes. The attributes of an event
+ are immutable.
+
+ Args:
+ name: Name of the event.
+ attributes: Attributes of the event.
+ timestamp: Timestamp of the event. If `None` it will filled
+ automatically.
+ """
+
+ def __init__(
+ self,
+ name: str,
+ attributes: types.Attributes = None,
+ timestamp: Optional[int] = None,
+ limit: Optional[int] = _DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
+ ) -> None:
+ super().__init__(name, timestamp)
+ self._attributes = attributes
+
+ @property
+ def attributes(self) -> types.Attributes:
+ return self._attributes
+
+ @property
+ def dropped_attributes(self) -> int:
+ if isinstance(self._attributes, BoundedAttributes):
+ return self._attributes.dropped
+ return 0
+
+
+def _check_span_ended(func):
+ def wrapper(self, *args, **kwargs):
+ already_ended = False
+ with self._lock: # pylint: disable=protected-access
+ if self._end_time is None: # pylint: disable=protected-access
+ func(self, *args, **kwargs)
+ else:
+ already_ended = True
+
+ if already_ended:
+ logger.warning("Tried calling %s on an ended span.", func.__name__)
+
+ return wrapper
+
+
+def _is_valid_link(context: SpanContext, attributes: types.Attributes) -> bool:
+ return bool(
+ context and (context.is_valid or (attributes or context.trace_state))
+ )
+
+
+class ReadableSpan:
+ """Provides read-only access to span attributes.
+
+ Users should NOT be creating these objects directly. `ReadableSpan`s are created as
+ a direct result from using the tracing pipeline via the `Tracer`.
+
+ """
+
+ def __init__(
+ self,
+ name: str,
+ context: Optional[trace_api.SpanContext] = None,
+ parent: Optional[trace_api.SpanContext] = None,
+ resource: Optional[Resource] = None,
+ attributes: types.Attributes = None,
+ events: Sequence[Event] = (),
+ links: Sequence[trace_api.Link] = (),
+ kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL,
+ instrumentation_info: Optional[InstrumentationInfo] = None,
+ status: Status = Status(StatusCode.UNSET),
+ start_time: Optional[int] = None,
+ end_time: Optional[int] = None,
+ instrumentation_scope: Optional[InstrumentationScope] = None,
+ ) -> None:
+ self._name = name
+ self._context = context
+ self._kind = kind
+ self._instrumentation_info = instrumentation_info
+ self._instrumentation_scope = instrumentation_scope
+ self._parent = parent
+ self._start_time = start_time
+ self._end_time = end_time
+ self._attributes = attributes
+ self._events = events
+ self._links = links
+ if resource is None:
+ self._resource = Resource.create({})
+ else:
+ self._resource = resource
+ self._status = status
+
+ @property
+ def dropped_attributes(self) -> int:
+ if isinstance(self._attributes, BoundedAttributes):
+ return self._attributes.dropped
+ return 0
+
+ @property
+ def dropped_events(self) -> int:
+ if isinstance(self._events, BoundedList):
+ return self._events.dropped
+ return 0
+
+ @property
+ def dropped_links(self) -> int:
+ if isinstance(self._links, BoundedList):
+ return self._links.dropped
+ return 0
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ def get_span_context(self):
+ return self._context
+
+ @property
+ def context(self):
+ return self._context
+
+ @property
+ def kind(self) -> trace_api.SpanKind:
+ return self._kind
+
+ @property
+ def parent(self) -> Optional[trace_api.SpanContext]:
+ return self._parent
+
+ @property
+ def start_time(self) -> Optional[int]:
+ return self._start_time
+
+ @property
+ def end_time(self) -> Optional[int]:
+ return self._end_time
+
+ @property
+ def status(self) -> trace_api.Status:
+ return self._status
+
+ @property
+ def attributes(self) -> types.Attributes:
+ return MappingProxyType(self._attributes or {})
+
+ @property
+ def events(self) -> Sequence[Event]:
+ return tuple(event for event in self._events)
+
+ @property
+ def links(self) -> Sequence[trace_api.Link]:
+ return tuple(link for link in self._links)
+
+ @property
+ def resource(self) -> Resource:
+ return self._resource
+
+ @property
+ @deprecated(
+ version="1.11.1", reason="You should use instrumentation_scope"
+ )
+ def instrumentation_info(self) -> Optional[InstrumentationInfo]:
+ return self._instrumentation_info
+
+ @property
+ def instrumentation_scope(self) -> Optional[InstrumentationScope]:
+ return self._instrumentation_scope
+
+ def to_json(self, indent: Optional[int] = 4):
+ parent_id = None
+ if self.parent is not None:
+ parent_id = f"0x{trace_api.format_span_id(self.parent.span_id)}"
+
+ start_time = None
+ if self._start_time:
+ start_time = util.ns_to_iso_str(self._start_time)
+
+ end_time = None
+ if self._end_time:
+ end_time = util.ns_to_iso_str(self._end_time)
+
+ status = {
+ "status_code": str(self._status.status_code.name),
+ }
+ if self._status.description:
+ status["description"] = self._status.description
+
+ f_span = {
+ "name": self._name,
+ "context": (
+ self._format_context(self._context) if self._context else None
+ ),
+ "kind": str(self.kind),
+ "parent_id": parent_id,
+ "start_time": start_time,
+ "end_time": end_time,
+ "status": status,
+ "attributes": self._format_attributes(self._attributes),
+ "events": self._format_events(self._events),
+ "links": self._format_links(self._links),
+ "resource": json.loads(self.resource.to_json()),
+ }
+
+ return json.dumps(f_span, indent=indent)
+
+ @staticmethod
+ def _format_context(context: SpanContext) -> Dict[str, str]:
+ return {
+ "trace_id": f"0x{trace_api.format_trace_id(context.trace_id)}",
+ "span_id": f"0x{trace_api.format_span_id(context.span_id)}",
+ "trace_state": repr(context.trace_state),
+ }
+
+ @staticmethod
+ def _format_attributes(
+ attributes: types.Attributes,
+ ) -> Optional[Dict[str, Any]]:
+ if attributes is not None and not isinstance(attributes, dict):
+ return dict(attributes)
+ return attributes
+
+ @staticmethod
+ def _format_events(events: Sequence[Event]) -> List[Dict[str, Any]]:
+ return [
+ {
+ "name": event.name,
+ "timestamp": util.ns_to_iso_str(event.timestamp),
+ "attributes": Span._format_attributes( # pylint: disable=protected-access
+ event.attributes
+ ),
+ }
+ for event in events
+ ]
+
+ @staticmethod
+ def _format_links(links: Sequence[trace_api.Link]) -> List[Dict[str, Any]]:
+ return [
+ {
+ "context": Span._format_context( # pylint: disable=protected-access
+ link.context
+ ),
+ "attributes": Span._format_attributes( # pylint: disable=protected-access
+ link.attributes
+ ),
+ }
+ for link in links
+ ]
+
+
+class SpanLimits:
+ """The limits that should be enforce on recorded data such as events, links, attributes etc.
+
+ This class does not enforce any limits itself. It only provides an a way read limits from env,
+ default values and from user provided arguments.
+
+ All limit arguments must be either a non-negative integer, ``None`` or ``SpanLimits.UNSET``.
+
+ - All limit arguments are optional.
+ - If a limit argument is not set, the class will try to read its value from the corresponding
+ environment variable.
+ - If the environment variable is not set, the default value, if any, will be used.
+
+ Limit precedence:
+
+ - If a model specific limit is set, it will be used.
+ - Else if the corresponding global limit is set, it will be used.
+ - Else if the model specific limit has a default value, the default value will be used.
+ - Else if the global limit has a default value, the default value will be used.
+
+ Args:
+ max_attributes: Maximum number of attributes that can be added to a span, event, and link.
+ Environment variable: OTEL_ATTRIBUTE_COUNT_LIMIT
+ Default: {_DEFAULT_ATTRIBUTE_COUNT_LIMIT}
+ max_events: Maximum number of events that can be added to a Span.
+ Environment variable: OTEL_SPAN_EVENT_COUNT_LIMIT
+ Default: {_DEFAULT_SPAN_EVENT_COUNT_LIMIT}
+ max_links: Maximum number of links that can be added to a Span.
+ Environment variable: OTEL_SPAN_LINK_COUNT_LIMIT
+ Default: {_DEFAULT_SPAN_LINK_COUNT_LIMIT}
+ max_span_attributes: Maximum number of attributes that can be added to a Span.
+ Environment variable: OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT
+ Default: {_DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT}
+ max_event_attributes: Maximum number of attributes that can be added to an Event.
+ Default: {_DEFAULT_OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT}
+ max_link_attributes: Maximum number of attributes that can be added to a Link.
+ Default: {_DEFAULT_OTEL_LINK_ATTRIBUTE_COUNT_LIMIT}
+ max_attribute_length: Maximum length an attribute value can have. Values longer than
+ the specified length will be truncated.
+ max_span_attribute_length: Maximum length a span attribute value can have. Values longer than
+ the specified length will be truncated.
+ """
+
+ UNSET = -1
+
+ def __init__(
+ self,
+ max_attributes: Optional[int] = None,
+ max_events: Optional[int] = None,
+ max_links: Optional[int] = None,
+ max_span_attributes: Optional[int] = None,
+ max_event_attributes: Optional[int] = None,
+ max_link_attributes: Optional[int] = None,
+ max_attribute_length: Optional[int] = None,
+ max_span_attribute_length: Optional[int] = None,
+ ):
+ # span events and links count
+ self.max_events = self._from_env_if_absent(
+ max_events,
+ OTEL_SPAN_EVENT_COUNT_LIMIT,
+ _DEFAULT_OTEL_SPAN_EVENT_COUNT_LIMIT,
+ )
+ self.max_links = self._from_env_if_absent(
+ max_links,
+ OTEL_SPAN_LINK_COUNT_LIMIT,
+ _DEFAULT_OTEL_SPAN_LINK_COUNT_LIMIT,
+ )
+
+ # attribute count
+ global_max_attributes = self._from_env_if_absent(
+ max_attributes, OTEL_ATTRIBUTE_COUNT_LIMIT
+ )
+ self.max_attributes = (
+ global_max_attributes
+ if global_max_attributes is not None
+ else _DEFAULT_OTEL_ATTRIBUTE_COUNT_LIMIT
+ )
+
+ self.max_span_attributes = self._from_env_if_absent(
+ max_span_attributes,
+ OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
+ (
+ global_max_attributes
+ if global_max_attributes is not None
+ else _DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT
+ ),
+ )
+ self.max_event_attributes = self._from_env_if_absent(
+ max_event_attributes,
+ OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT,
+ (
+ global_max_attributes
+ if global_max_attributes is not None
+ else _DEFAULT_OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT
+ ),
+ )
+ self.max_link_attributes = self._from_env_if_absent(
+ max_link_attributes,
+ OTEL_LINK_ATTRIBUTE_COUNT_LIMIT,
+ (
+ global_max_attributes
+ if global_max_attributes is not None
+ else _DEFAULT_OTEL_LINK_ATTRIBUTE_COUNT_LIMIT
+ ),
+ )
+
+ # attribute length
+ self.max_attribute_length = self._from_env_if_absent(
+ max_attribute_length,
+ OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT,
+ )
+ self.max_span_attribute_length = self._from_env_if_absent(
+ max_span_attribute_length,
+ OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT,
+ # use global attribute length limit as default
+ self.max_attribute_length,
+ )
+
+ def __repr__(self):
+ return f"{type(self).__name__}(max_span_attributes={self.max_span_attributes}, max_events_attributes={self.max_event_attributes}, max_link_attributes={self.max_link_attributes}, max_attributes={self.max_attributes}, max_events={self.max_events}, max_links={self.max_links}, max_attribute_length={self.max_attribute_length})"
+
+ @classmethod
+ def _from_env_if_absent(
+ cls, value: Optional[int], env_var: str, default: Optional[int] = None
+ ) -> Optional[int]:
+ if value == cls.UNSET:
+ return None
+
+ err_msg = "{} must be a non-negative integer but got {}"
+
+ # if no value is provided for the limit, try to load it from env
+ if value is None:
+ # return default value if env var is not set
+ if env_var not in environ:
+ return default
+
+ str_value = environ.get(env_var, "").strip().lower()
+ if str_value == _ENV_VALUE_UNSET:
+ return None
+
+ try:
+ value = int(str_value)
+ except ValueError:
+ raise ValueError(err_msg.format(env_var, str_value))
+
+ if value < 0:
+ raise ValueError(err_msg.format(env_var, value))
+ return value
+
+
+_UnsetLimits = SpanLimits(
+ max_attributes=SpanLimits.UNSET,
+ max_events=SpanLimits.UNSET,
+ max_links=SpanLimits.UNSET,
+ max_span_attributes=SpanLimits.UNSET,
+ max_event_attributes=SpanLimits.UNSET,
+ max_link_attributes=SpanLimits.UNSET,
+ max_attribute_length=SpanLimits.UNSET,
+ max_span_attribute_length=SpanLimits.UNSET,
+)
+
+# not removed for backward compat. please use SpanLimits instead.
+SPAN_ATTRIBUTE_COUNT_LIMIT = SpanLimits._from_env_if_absent( # pylint: disable=protected-access
+ None,
+ OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
+ _DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
+)
+
+
+class Span(trace_api.Span, ReadableSpan):
+ """See `opentelemetry.trace.Span`.
+
+ Users should create `Span` objects via the `Tracer` instead of this
+ constructor.
+
+ Args:
+ name: The name of the operation this span represents
+ context: The immutable span context
+ parent: This span's parent's `opentelemetry.trace.SpanContext`, or
+ None if this is a root span
+ sampler: The sampler used to create this span
+ trace_config: TODO
+ resource: Entity producing telemetry
+ attributes: The span's attributes to be exported
+ events: Timestamped events to be exported
+ links: Links to other spans to be exported
+ span_processor: `SpanProcessor` to invoke when starting and ending
+ this `Span`.
+ limits: `SpanLimits` instance that was passed to the `TracerProvider`
+ """
+
+ def __new__(cls, *args, **kwargs):
+ if cls is Span:
+ raise TypeError("Span must be instantiated via a tracer.")
+ return super().__new__(cls)
+
+ # pylint: disable=too-many-locals
+ def __init__(
+ self,
+ name: str,
+ context: trace_api.SpanContext,
+ parent: Optional[trace_api.SpanContext] = None,
+ sampler: Optional[sampling.Sampler] = None,
+ trace_config: None = None, # TODO
+ resource: Optional[Resource] = None,
+ attributes: types.Attributes = None,
+ events: Optional[Sequence[Event]] = None,
+ links: Sequence[trace_api.Link] = (),
+ kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL,
+ span_processor: SpanProcessor = SpanProcessor(),
+ instrumentation_info: Optional[InstrumentationInfo] = None,
+ record_exception: bool = True,
+ set_status_on_exception: bool = True,
+ limits=_UnsetLimits,
+ instrumentation_scope: Optional[InstrumentationScope] = None,
+ ) -> None:
+ if resource is None:
+ resource = Resource.create({})
+ super().__init__(
+ name=name,
+ context=context,
+ parent=parent,
+ kind=kind,
+ resource=resource,
+ instrumentation_info=instrumentation_info,
+ instrumentation_scope=instrumentation_scope,
+ )
+ self._sampler = sampler
+ self._trace_config = trace_config
+ self._record_exception = record_exception
+ self._set_status_on_exception = set_status_on_exception
+ self._span_processor = span_processor
+ self._limits = limits
+ self._lock = threading.Lock()
+ self._attributes = BoundedAttributes(
+ self._limits.max_span_attributes,
+ attributes,
+ immutable=False,
+ max_value_len=self._limits.max_span_attribute_length,
+ )
+ self._events = self._new_events()
+ if events:
+ for event in events:
+ event._attributes = BoundedAttributes(
+ self._limits.max_event_attributes,
+ event.attributes,
+ max_value_len=self._limits.max_attribute_length,
+ )
+ self._events.append(event)
+
+ self._links = self._new_links(links)
+
+ def __repr__(self):
+ return f'{type(self).__name__}(name="{self._name}", context={self._context})'
+
+ def _new_events(self):
+ return BoundedList(self._limits.max_events)
+
+ def _new_links(self, links: Sequence[trace_api.Link]):
+ if not links:
+ return BoundedList(self._limits.max_links)
+
+ valid_links = []
+ for link in links:
+ if link and _is_valid_link(link.context, link.attributes):
+ # pylint: disable=protected-access
+ link._attributes = BoundedAttributes(
+ self._limits.max_link_attributes,
+ link.attributes,
+ max_value_len=self._limits.max_attribute_length,
+ )
+ valid_links.append(link)
+
+ return BoundedList.from_seq(self._limits.max_links, valid_links)
+
+ def get_span_context(self):
+ return self._context
+
+ def set_attributes(
+ self, attributes: Mapping[str, types.AttributeValue]
+ ) -> None:
+ with self._lock:
+ if self._end_time is not None:
+ logger.warning("Setting attribute on ended span.")
+ return
+
+ for key, value in attributes.items():
+ self._attributes[key] = value
+
+ def set_attribute(self, key: str, value: types.AttributeValue) -> None:
+ return self.set_attributes({key: value})
+
+ @_check_span_ended
+ def _add_event(self, event: EventBase) -> None:
+ self._events.append(event)
+
+ def add_event(
+ self,
+ name: str,
+ attributes: types.Attributes = None,
+ timestamp: Optional[int] = None,
+ ) -> None:
+ attributes = BoundedAttributes(
+ self._limits.max_event_attributes,
+ attributes,
+ max_value_len=self._limits.max_attribute_length,
+ )
+ self._add_event(
+ Event(
+ name=name,
+ attributes=attributes,
+ timestamp=timestamp,
+ )
+ )
+
+ @_check_span_ended
+ def _add_link(self, link: trace_api.Link) -> None:
+ self._links.append(link)
+
+ def add_link(
+ self,
+ context: SpanContext,
+ attributes: types.Attributes = None,
+ ) -> None:
+ if not _is_valid_link(context, attributes):
+ return
+
+ attributes = BoundedAttributes(
+ self._limits.max_link_attributes,
+ attributes,
+ max_value_len=self._limits.max_attribute_length,
+ )
+ self._add_link(
+ trace_api.Link(
+ context=context,
+ attributes=attributes,
+ )
+ )
+
+ def _readable_span(self) -> ReadableSpan:
+ return ReadableSpan(
+ name=self._name,
+ context=self._context,
+ parent=self._parent,
+ resource=self._resource,
+ attributes=self._attributes,
+ events=self._events,
+ links=self._links,
+ kind=self.kind,
+ status=self._status,
+ start_time=self._start_time,
+ end_time=self._end_time,
+ instrumentation_info=self._instrumentation_info,
+ instrumentation_scope=self._instrumentation_scope,
+ )
+
+ def start(
+ self,
+ start_time: Optional[int] = None,
+ parent_context: Optional[context_api.Context] = None,
+ ) -> None:
+ with self._lock:
+ if self._start_time is not None:
+ logger.warning("Calling start() on a started span.")
+ return
+ self._start_time = (
+ start_time if start_time is not None else time_ns()
+ )
+
+ self._span_processor.on_start(self, parent_context=parent_context)
+
+ def end(self, end_time: Optional[int] = None) -> None:
+ with self._lock:
+ if self._start_time is None:
+ raise RuntimeError("Calling end() on a not started span.")
+ if self._end_time is not None:
+ logger.warning("Calling end() on an ended span.")
+ return
+
+ self._end_time = end_time if end_time is not None else time_ns()
+
+ self._span_processor.on_end(self._readable_span())
+
+ @_check_span_ended
+ def update_name(self, name: str) -> None:
+ self._name = name
+
+ def is_recording(self) -> bool:
+ return self._end_time is None
+
+ @_check_span_ended
+ def set_status(
+ self,
+ status: typing.Union[Status, StatusCode],
+ description: typing.Optional[str] = None,
+ ) -> None:
+ # Ignore future calls if status is already set to OK
+ # Ignore calls to set to StatusCode.UNSET
+ if isinstance(status, Status):
+ if (
+ self._status
+ and self._status.status_code is StatusCode.OK
+ or status.status_code is StatusCode.UNSET
+ ):
+ return
+ if description is not None:
+ logger.warning(
+ "Description %s ignored. Use either `Status` or `(StatusCode, Description)`",
+ description,
+ )
+ self._status = status
+ elif isinstance(status, StatusCode):
+ if (
+ self._status
+ and self._status.status_code is StatusCode.OK
+ or status is StatusCode.UNSET
+ ):
+ return
+ self._status = Status(status, description)
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ """Ends context manager and calls `end` on the `Span`."""
+ if exc_val is not None and self.is_recording():
+ # Record the exception as an event
+ # pylint:disable=protected-access
+ if self._record_exception:
+ self.record_exception(exception=exc_val, escaped=True)
+ # Records status if span is used as context manager
+ # i.e. with tracer.start_span() as span:
+ if self._set_status_on_exception:
+ self.set_status(
+ Status(
+ status_code=StatusCode.ERROR,
+ description=f"{exc_type.__name__}: {exc_val}",
+ )
+ )
+
+ super().__exit__(exc_type, exc_val, exc_tb)
+
+ def record_exception(
+ self,
+ exception: BaseException,
+ attributes: types.Attributes = None,
+ timestamp: Optional[int] = None,
+ escaped: bool = False,
+ ) -> None:
+ """Records an exception as a span event."""
+ # TODO: keep only exception as first argument after baseline is 3.10
+ stacktrace = "".join(
+ traceback.format_exception(
+ type(exception), value=exception, tb=exception.__traceback__
+ )
+ )
+ module = type(exception).__module__
+ qualname = type(exception).__qualname__
+ exception_type = (
+ f"{module}.{qualname}"
+ if module and module != "builtins"
+ else qualname
+ )
+ _attributes: MutableMapping[str, types.AttributeValue] = {
+ EXCEPTION_TYPE: exception_type,
+ EXCEPTION_MESSAGE: str(exception),
+ EXCEPTION_STACKTRACE: stacktrace,
+ EXCEPTION_ESCAPED: str(escaped),
+ }
+ if attributes:
+ _attributes.update(attributes)
+ self.add_event(
+ name="exception", attributes=_attributes, timestamp=timestamp
+ )
+
+
+class _Span(Span):
+ """Protected implementation of `opentelemetry.trace.Span`.
+
+ This constructor exists to prevent the instantiation of the `Span` class
+ by other mechanisms than through the `Tracer`.
+ """
+
+
+class Tracer(trace_api.Tracer):
+ """See `opentelemetry.trace.Tracer`."""
+
+ def __init__(
+ self,
+ sampler: sampling.Sampler,
+ resource: Resource,
+ span_processor: Union[
+ SynchronousMultiSpanProcessor, ConcurrentMultiSpanProcessor
+ ],
+ id_generator: IdGenerator,
+ instrumentation_info: InstrumentationInfo,
+ span_limits: SpanLimits,
+ instrumentation_scope: InstrumentationScope,
+ ) -> None:
+ self.sampler = sampler
+ self.resource = resource
+ self.span_processor = span_processor
+ self.id_generator = id_generator
+ self.instrumentation_info = instrumentation_info
+ self._span_limits = span_limits
+ self._instrumentation_scope = instrumentation_scope
+
+ @_agnosticcontextmanager # pylint: disable=protected-access
+ def start_as_current_span(
+ self,
+ name: str,
+ context: Optional[context_api.Context] = None,
+ kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL,
+ attributes: types.Attributes = None,
+ links: Optional[Sequence[trace_api.Link]] = (),
+ start_time: Optional[int] = None,
+ record_exception: bool = True,
+ set_status_on_exception: bool = True,
+ end_on_exit: bool = True,
+ ) -> Iterator[trace_api.Span]:
+ span = self.start_span(
+ name=name,
+ context=context,
+ kind=kind,
+ attributes=attributes,
+ links=links,
+ start_time=start_time,
+ record_exception=record_exception,
+ set_status_on_exception=set_status_on_exception,
+ )
+ with trace_api.use_span(
+ span,
+ end_on_exit=end_on_exit,
+ record_exception=record_exception,
+ set_status_on_exception=set_status_on_exception,
+ ) as span:
+ yield span
+
+ def start_span( # pylint: disable=too-many-locals
+ self,
+ name: str,
+ context: Optional[context_api.Context] = None,
+ kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL,
+ attributes: types.Attributes = None,
+ links: Optional[Sequence[trace_api.Link]] = (),
+ start_time: Optional[int] = None,
+ record_exception: bool = True,
+ set_status_on_exception: bool = True,
+ ) -> trace_api.Span:
+ parent_span_context = trace_api.get_current_span(
+ context
+ ).get_span_context()
+
+ if parent_span_context is not None and not isinstance(
+ parent_span_context, trace_api.SpanContext
+ ):
+ raise TypeError(
+ "parent_span_context must be a SpanContext or None."
+ )
+
+ # is_valid determines root span
+ if parent_span_context is None or not parent_span_context.is_valid:
+ parent_span_context = None
+ trace_id = self.id_generator.generate_trace_id()
+ else:
+ trace_id = parent_span_context.trace_id
+
+ # The sampler decides whether to create a real or no-op span at the
+ # time of span creation. No-op spans do not record events, and are not
+ # exported.
+ # The sampler may also add attributes to the newly-created span, e.g.
+ # to include information about the sampling result.
+ # The sampler may also modify the parent span context's tracestate
+ sampling_result = self.sampler.should_sample(
+ context, trace_id, name, kind, attributes, links
+ )
+
+ trace_flags = (
+ trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED)
+ if sampling_result.decision.is_sampled()
+ else trace_api.TraceFlags(trace_api.TraceFlags.DEFAULT)
+ )
+ span_context = trace_api.SpanContext(
+ trace_id,
+ self.id_generator.generate_span_id(),
+ is_remote=False,
+ trace_flags=trace_flags,
+ trace_state=sampling_result.trace_state,
+ )
+
+ # Only record if is_recording() is true
+ if sampling_result.decision.is_recording():
+ # pylint:disable=protected-access
+ span = _Span(
+ name=name,
+ context=span_context,
+ parent=parent_span_context,
+ sampler=self.sampler,
+ resource=self.resource,
+ attributes=sampling_result.attributes.copy(),
+ span_processor=self.span_processor,
+ kind=kind,
+ links=links,
+ instrumentation_info=self.instrumentation_info,
+ record_exception=record_exception,
+ set_status_on_exception=set_status_on_exception,
+ limits=self._span_limits,
+ instrumentation_scope=self._instrumentation_scope,
+ )
+ span.start(start_time=start_time, parent_context=context)
+ else:
+ span = trace_api.NonRecordingSpan(context=span_context)
+ return span
+
+
+class TracerProvider(trace_api.TracerProvider):
+ """See `opentelemetry.trace.TracerProvider`."""
+
+ def __init__(
+ self,
+ sampler: Optional[sampling.Sampler] = None,
+ resource: Optional[Resource] = None,
+ shutdown_on_exit: bool = True,
+ active_span_processor: Union[
+ SynchronousMultiSpanProcessor, ConcurrentMultiSpanProcessor, None
+ ] = None,
+ id_generator: Optional[IdGenerator] = None,
+ span_limits: Optional[SpanLimits] = None,
+ ) -> None:
+ self._active_span_processor = (
+ active_span_processor or SynchronousMultiSpanProcessor()
+ )
+ if id_generator is None:
+ self.id_generator = RandomIdGenerator()
+ else:
+ self.id_generator = id_generator
+ if resource is None:
+ self._resource = Resource.create({})
+ else:
+ self._resource = resource
+ if not sampler:
+ sampler = sampling._get_from_env_or_default()
+ self.sampler = sampler
+ self._span_limits = span_limits or SpanLimits()
+ disabled = environ.get(OTEL_SDK_DISABLED, "")
+ self._disabled = disabled.lower().strip() == "true"
+ self._atexit_handler = None
+
+ if shutdown_on_exit:
+ self._atexit_handler = atexit.register(self.shutdown)
+
+ @property
+ def resource(self) -> Resource:
+ return self._resource
+
+ def get_tracer(
+ self,
+ instrumenting_module_name: str,
+ instrumenting_library_version: typing.Optional[str] = None,
+ schema_url: typing.Optional[str] = None,
+ attributes: typing.Optional[types.Attributes] = None,
+ ) -> "trace_api.Tracer":
+ if self._disabled:
+ return NoOpTracer()
+ if not instrumenting_module_name: # Reject empty strings too.
+ instrumenting_module_name = ""
+ logger.error("get_tracer called with missing module name.")
+ if instrumenting_library_version is None:
+ instrumenting_library_version = ""
+
+ filterwarnings(
+ "ignore",
+ message=(
+ r"Call to deprecated method __init__. \(You should use "
+ r"InstrumentationScope\) -- Deprecated since version 1.11.1."
+ ),
+ category=DeprecationWarning,
+ module="opentelemetry.sdk.trace",
+ )
+
+ instrumentation_info = InstrumentationInfo(
+ instrumenting_module_name,
+ instrumenting_library_version,
+ schema_url,
+ )
+
+ return Tracer(
+ self.sampler,
+ self.resource,
+ self._active_span_processor,
+ self.id_generator,
+ instrumentation_info,
+ self._span_limits,
+ InstrumentationScope(
+ instrumenting_module_name,
+ instrumenting_library_version,
+ schema_url,
+ attributes,
+ ),
+ )
+
+ def add_span_processor(self, span_processor: SpanProcessor) -> None:
+ """Registers a new :class:`SpanProcessor` for this `TracerProvider`.
+
+ The span processors are invoked in the same order they are registered.
+ """
+
+ # no lock here because add_span_processor is thread safe for both
+ # SynchronousMultiSpanProcessor and ConcurrentMultiSpanProcessor.
+ self._active_span_processor.add_span_processor(span_processor)
+
+ def shutdown(self) -> None:
+ """Shut down the span processors added to the tracer provider."""
+ self._active_span_processor.shutdown()
+ if self._atexit_handler is not None:
+ atexit.unregister(self._atexit_handler)
+ self._atexit_handler = None
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Requests the active span processor to process all spans that have not
+ yet been processed.
+
+ By default force flush is called sequentially on all added span
+ processors. This means that span processors further back in the list
+ have less time to flush their spans.
+ To have span processors flush their spans in parallel it is possible to
+ initialize the tracer provider with an instance of
+ `ConcurrentMultiSpanProcessor` at the cost of using multiple threads.
+
+ Args:
+ timeout_millis: The maximum amount of time to wait for spans to be
+ processed.
+
+ Returns:
+ False if the timeout is exceeded, True otherwise.
+ """
+ return self._active_span_processor.force_flush(timeout_millis)
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py
new file mode 100644
index 00000000..47d1769a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/__init__.py
@@ -0,0 +1,517 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+
+import collections
+import logging
+import os
+import sys
+import threading
+import typing
+from enum import Enum
+from os import environ, linesep
+from time import time_ns
+
+from opentelemetry.context import (
+ _SUPPRESS_INSTRUMENTATION_KEY,
+ Context,
+ attach,
+ detach,
+ set_value,
+)
+from opentelemetry.sdk.environment_variables import (
+ OTEL_BSP_EXPORT_TIMEOUT,
+ OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+ OTEL_BSP_MAX_QUEUE_SIZE,
+ OTEL_BSP_SCHEDULE_DELAY,
+)
+from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
+from opentelemetry.util._once import Once
+
+_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
+_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
+_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
+_DEFAULT_MAX_QUEUE_SIZE = 2048
+_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
+ "Unable to parse value for %s as integer. Defaulting to %s."
+)
+
+logger = logging.getLogger(__name__)
+
+
+class SpanExportResult(Enum):
+ SUCCESS = 0
+ FAILURE = 1
+
+
+class SpanExporter:
+ """Interface for exporting spans.
+
+ Interface to be implemented by services that want to export spans recorded
+ in their own format.
+
+ To export data this MUST be registered to the :class`opentelemetry.sdk.trace.Tracer` using a
+ `SimpleSpanProcessor` or a `BatchSpanProcessor`.
+ """
+
+ def export(
+ self, spans: typing.Sequence[ReadableSpan]
+ ) -> "SpanExportResult":
+ """Exports a batch of telemetry data.
+
+ Args:
+ spans: The list of `opentelemetry.trace.Span` objects to be exported
+
+ Returns:
+ The result of the export
+ """
+
+ def shutdown(self) -> None:
+ """Shuts down the exporter.
+
+ Called when the SDK is shut down.
+ """
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ """Hint to ensure that the export of any spans the exporter has received
+ prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably
+ before returning from this method.
+ """
+
+
+class SimpleSpanProcessor(SpanProcessor):
+ """Simple SpanProcessor implementation.
+
+ SimpleSpanProcessor is an implementation of `SpanProcessor` that
+ passes ended spans directly to the configured `SpanExporter`.
+ """
+
+ def __init__(self, span_exporter: SpanExporter):
+ self.span_exporter = span_exporter
+
+ def on_start(
+ self, span: Span, parent_context: typing.Optional[Context] = None
+ ) -> None:
+ pass
+
+ def on_end(self, span: ReadableSpan) -> None:
+ if not span.context.trace_flags.sampled:
+ return
+ token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+ try:
+ self.span_exporter.export((span,))
+ # pylint: disable=broad-exception-caught
+ except Exception:
+ logger.exception("Exception while exporting Span.")
+ detach(token)
+
+ def shutdown(self) -> None:
+ self.span_exporter.shutdown()
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ # pylint: disable=unused-argument
+ return True
+
+
+class _FlushRequest:
+ """Represents a request for the BatchSpanProcessor to flush spans."""
+
+ __slots__ = ["event", "num_spans"]
+
+ def __init__(self):
+ self.event = threading.Event()
+ self.num_spans = 0
+
+
+_BSP_RESET_ONCE = Once()
+
+
+class BatchSpanProcessor(SpanProcessor):
+ """Batch span processor implementation.
+
+ `BatchSpanProcessor` is an implementation of `SpanProcessor` that
+ batches ended spans and pushes them to the configured `SpanExporter`.
+
+ `BatchSpanProcessor` is configurable with the following environment
+ variables which correspond to constructor parameters:
+
+ - :envvar:`OTEL_BSP_SCHEDULE_DELAY`
+ - :envvar:`OTEL_BSP_MAX_QUEUE_SIZE`
+ - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
+ - :envvar:`OTEL_BSP_EXPORT_TIMEOUT`
+ """
+
+ def __init__(
+ self,
+ span_exporter: SpanExporter,
+ max_queue_size: int | None = None,
+ schedule_delay_millis: float | None = None,
+ max_export_batch_size: int | None = None,
+ export_timeout_millis: float | None = None,
+ ):
+ if max_queue_size is None:
+ max_queue_size = BatchSpanProcessor._default_max_queue_size()
+
+ if schedule_delay_millis is None:
+ schedule_delay_millis = (
+ BatchSpanProcessor._default_schedule_delay_millis()
+ )
+
+ if max_export_batch_size is None:
+ max_export_batch_size = (
+ BatchSpanProcessor._default_max_export_batch_size()
+ )
+
+ if export_timeout_millis is None:
+ export_timeout_millis = (
+ BatchSpanProcessor._default_export_timeout_millis()
+ )
+
+ BatchSpanProcessor._validate_arguments(
+ max_queue_size, schedule_delay_millis, max_export_batch_size
+ )
+
+ self.span_exporter = span_exporter
+ self.queue = collections.deque([], max_queue_size) # type: typing.Deque[Span]
+ self.worker_thread = threading.Thread(
+ name="OtelBatchSpanProcessor", target=self.worker, daemon=True
+ )
+ self.condition = threading.Condition(threading.Lock())
+ self._flush_request = None # type: typing.Optional[_FlushRequest]
+ self.schedule_delay_millis = schedule_delay_millis
+ self.max_export_batch_size = max_export_batch_size
+ self.max_queue_size = max_queue_size
+ self.export_timeout_millis = export_timeout_millis
+ self.done = False
+ # flag that indicates that spans are being dropped
+ self._spans_dropped = False
+ # precallocated list to send spans to exporter
+ self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
+ self.worker_thread.start()
+ if hasattr(os, "register_at_fork"):
+ os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
+ self._pid = os.getpid()
+
+ def on_start(
+ self, span: Span, parent_context: Context | None = None
+ ) -> None:
+ pass
+
+ def on_end(self, span: ReadableSpan) -> None:
+ if self.done:
+ logger.warning("Already shutdown, dropping span.")
+ return
+ if not span.context.trace_flags.sampled:
+ return
+ if self._pid != os.getpid():
+ _BSP_RESET_ONCE.do_once(self._at_fork_reinit)
+
+ if len(self.queue) == self.max_queue_size:
+ if not self._spans_dropped:
+ logger.warning("Queue is full, likely spans will be dropped.")
+ self._spans_dropped = True
+
+ self.queue.appendleft(span)
+
+ if len(self.queue) >= self.max_export_batch_size:
+ with self.condition:
+ self.condition.notify()
+
+ def _at_fork_reinit(self):
+ self.condition = threading.Condition(threading.Lock())
+ self.queue.clear()
+
+ # worker_thread is local to a process, only the thread that issued fork continues
+ # to exist. A new worker thread must be started in child process.
+ self.worker_thread = threading.Thread(
+ name="OtelBatchSpanProcessor", target=self.worker, daemon=True
+ )
+ self.worker_thread.start()
+ self._pid = os.getpid()
+
+ def worker(self):
+ timeout = self.schedule_delay_millis / 1e3
+ flush_request = None # type: typing.Optional[_FlushRequest]
+ while not self.done:
+ with self.condition:
+ if self.done:
+ # done flag may have changed, avoid waiting
+ break
+ flush_request = self._get_and_unset_flush_request()
+ if (
+ len(self.queue) < self.max_export_batch_size
+ and flush_request is None
+ ):
+ self.condition.wait(timeout)
+ flush_request = self._get_and_unset_flush_request()
+ if not self.queue:
+ # spurious notification, let's wait again, reset timeout
+ timeout = self.schedule_delay_millis / 1e3
+ self._notify_flush_request_finished(flush_request)
+ flush_request = None
+ continue
+ if self.done:
+ # missing spans will be sent when calling flush
+ break
+
+ # subtract the duration of this export call to the next timeout
+ start = time_ns()
+ self._export(flush_request)
+ end = time_ns()
+ duration = (end - start) / 1e9
+ timeout = self.schedule_delay_millis / 1e3 - duration
+
+ self._notify_flush_request_finished(flush_request)
+ flush_request = None
+
+ # there might have been a new flush request while export was running
+ # and before the done flag switched to true
+ with self.condition:
+ shutdown_flush_request = self._get_and_unset_flush_request()
+
+ # be sure that all spans are sent
+ self._drain_queue()
+ self._notify_flush_request_finished(flush_request)
+ self._notify_flush_request_finished(shutdown_flush_request)
+
+ def _get_and_unset_flush_request(
+ self,
+ ) -> typing.Optional[_FlushRequest]:
+ """Returns the current flush request and makes it invisible to the
+ worker thread for subsequent calls.
+ """
+ flush_request = self._flush_request
+ self._flush_request = None
+ if flush_request is not None:
+ flush_request.num_spans = len(self.queue)
+ return flush_request
+
+ @staticmethod
+ def _notify_flush_request_finished(
+ flush_request: typing.Optional[_FlushRequest],
+ ):
+ """Notifies the flush initiator(s) waiting on the given request/event
+ that the flush operation was finished.
+ """
+ if flush_request is not None:
+ flush_request.event.set()
+
+ def _get_or_create_flush_request(self) -> _FlushRequest:
+ """Either returns the current active flush event or creates a new one.
+
+ The flush event will be visible and read by the worker thread before an
+ export operation starts. Callers of a flush operation may wait on the
+ returned event to be notified when the flush/export operation was
+ finished.
+
+ This method is not thread-safe, i.e. callers need to take care about
+ synchronization/locking.
+ """
+ if self._flush_request is None:
+ self._flush_request = _FlushRequest()
+ return self._flush_request
+
+ def _export(self, flush_request: typing.Optional[_FlushRequest]):
+ """Exports spans considering the given flush_request.
+
+ In case of a given flush_requests spans are exported in batches until
+ the number of exported spans reached or exceeded the number of spans in
+ the flush request.
+ In no flush_request was given at most max_export_batch_size spans are
+ exported.
+ """
+ if not flush_request:
+ self._export_batch()
+ return
+
+ num_spans = flush_request.num_spans
+ while self.queue:
+ num_exported = self._export_batch()
+ num_spans -= num_exported
+
+ if num_spans <= 0:
+ break
+
+ def _export_batch(self) -> int:
+ """Exports at most max_export_batch_size spans and returns the number of
+ exported spans.
+ """
+ idx = 0
+ # currently only a single thread acts as consumer, so queue.pop() will
+ # not raise an exception
+ while idx < self.max_export_batch_size and self.queue:
+ self.spans_list[idx] = self.queue.pop()
+ idx += 1
+ token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
+ try:
+ # Ignore type b/c the Optional[None]+slicing is too "clever"
+ # for mypy
+ self.span_exporter.export(self.spans_list[:idx]) # type: ignore
+ except Exception: # pylint: disable=broad-exception-caught
+ logger.exception("Exception while exporting Span batch.")
+ detach(token)
+
+ # clean up list
+ for index in range(idx):
+ self.spans_list[index] = None
+ return idx
+
+ def _drain_queue(self):
+ """Export all elements until queue is empty.
+
+ Can only be called from the worker thread context because it invokes
+ `export` that is not thread safe.
+ """
+ while self.queue:
+ self._export_batch()
+
+ def force_flush(self, timeout_millis: int | None = None) -> bool:
+ if timeout_millis is None:
+ timeout_millis = self.export_timeout_millis
+
+ if self.done:
+ logger.warning("Already shutdown, ignoring call to force_flush().")
+ return True
+
+ with self.condition:
+ flush_request = self._get_or_create_flush_request()
+ # signal the worker thread to flush and wait for it to finish
+ self.condition.notify_all()
+
+ # wait for token to be processed
+ ret = flush_request.event.wait(timeout_millis / 1e3)
+ if not ret:
+ logger.warning("Timeout was exceeded in force_flush().")
+ return ret
+
+ def shutdown(self) -> None:
+ # signal the worker thread to finish and then wait for it
+ self.done = True
+ with self.condition:
+ self.condition.notify_all()
+ self.worker_thread.join()
+ self.span_exporter.shutdown()
+
+ @staticmethod
+ def _default_max_queue_size():
+ try:
+ return int(
+ environ.get(OTEL_BSP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE)
+ )
+ except ValueError:
+ logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BSP_MAX_QUEUE_SIZE,
+ _DEFAULT_MAX_QUEUE_SIZE,
+ )
+ return _DEFAULT_MAX_QUEUE_SIZE
+
+ @staticmethod
+ def _default_schedule_delay_millis():
+ try:
+ return int(
+ environ.get(
+ OTEL_BSP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS
+ )
+ )
+ except ValueError:
+ logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BSP_SCHEDULE_DELAY,
+ _DEFAULT_SCHEDULE_DELAY_MILLIS,
+ )
+ return _DEFAULT_SCHEDULE_DELAY_MILLIS
+
+ @staticmethod
+ def _default_max_export_batch_size():
+ try:
+ return int(
+ environ.get(
+ OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+ _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+ )
+ )
+ except ValueError:
+ logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
+ _DEFAULT_MAX_EXPORT_BATCH_SIZE,
+ )
+ return _DEFAULT_MAX_EXPORT_BATCH_SIZE
+
+ @staticmethod
+ def _default_export_timeout_millis():
+ try:
+ return int(
+ environ.get(
+ OTEL_BSP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS
+ )
+ )
+ except ValueError:
+ logger.exception(
+ _ENV_VAR_INT_VALUE_ERROR_MESSAGE,
+ OTEL_BSP_EXPORT_TIMEOUT,
+ _DEFAULT_EXPORT_TIMEOUT_MILLIS,
+ )
+ return _DEFAULT_EXPORT_TIMEOUT_MILLIS
+
+ @staticmethod
+ def _validate_arguments(
+ max_queue_size, schedule_delay_millis, max_export_batch_size
+ ):
+ if max_queue_size <= 0:
+ raise ValueError("max_queue_size must be a positive integer.")
+
+ if schedule_delay_millis <= 0:
+ raise ValueError("schedule_delay_millis must be positive.")
+
+ if max_export_batch_size <= 0:
+ raise ValueError(
+ "max_export_batch_size must be a positive integer."
+ )
+
+ if max_export_batch_size > max_queue_size:
+ raise ValueError(
+ "max_export_batch_size must be less than or equal to max_queue_size."
+ )
+
+
+class ConsoleSpanExporter(SpanExporter):
+ """Implementation of :class:`SpanExporter` that prints spans to the
+ console.
+
+ This class can be used for diagnostic purposes. It prints the exported
+ spans to the console STDOUT.
+ """
+
+ def __init__(
+ self,
+ service_name: str | None = None,
+ out: typing.IO = sys.stdout,
+ formatter: typing.Callable[
+ [ReadableSpan], str
+ ] = lambda span: span.to_json() + linesep,
+ ):
+ self.out = out
+ self.formatter = formatter
+ self.service_name = service_name
+
+ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
+ for span in spans:
+ self.out.write(self.formatter(span))
+ self.out.flush()
+ return SpanExportResult.SUCCESS
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ return True
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py
new file mode 100644
index 00000000..c28ecfd2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/export/in_memory_span_exporter.py
@@ -0,0 +1,61 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import typing
+
+from opentelemetry.sdk.trace import ReadableSpan
+from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
+
+
+class InMemorySpanExporter(SpanExporter):
+ """Implementation of :class:`.SpanExporter` that stores spans in memory.
+
+ This class can be used for testing purposes. It stores the exported spans
+ in a list in memory that can be retrieved using the
+ :func:`.get_finished_spans` method.
+ """
+
+ def __init__(self) -> None:
+ self._finished_spans: typing.List[ReadableSpan] = []
+ self._stopped = False
+ self._lock = threading.Lock()
+
+ def clear(self) -> None:
+ """Clear list of collected spans."""
+ with self._lock:
+ self._finished_spans.clear()
+
+ def get_finished_spans(self) -> typing.Tuple[ReadableSpan, ...]:
+ """Get list of collected spans."""
+ with self._lock:
+ return tuple(self._finished_spans)
+
+ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
+ """Stores a list of spans in memory."""
+ if self._stopped:
+ return SpanExportResult.FAILURE
+ with self._lock:
+ self._finished_spans.extend(spans)
+ return SpanExportResult.SUCCESS
+
+ def shutdown(self) -> None:
+ """Shut downs the exporter.
+
+ Calls to export after the exporter has been shut down will fail.
+ """
+ self._stopped = True
+
+ def force_flush(self, timeout_millis: int = 30000) -> bool:
+ return True
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/id_generator.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/id_generator.py
new file mode 100644
index 00000000..cd1f89bc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/id_generator.py
@@ -0,0 +1,60 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import random
+
+from opentelemetry import trace
+
+
+class IdGenerator(abc.ABC):
+ @abc.abstractmethod
+ def generate_span_id(self) -> int:
+ """Get a new span ID.
+
+ Returns:
+ A 64-bit int for use as a span ID
+ """
+
+ @abc.abstractmethod
+ def generate_trace_id(self) -> int:
+ """Get a new trace ID.
+
+ Implementations should at least make the 64 least significant bits
+ uniformly random. Samplers like the `TraceIdRatioBased` sampler rely on
+ this randomness to make sampling decisions.
+
+ See `the specification on TraceIdRatioBased <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#traceidratiobased>`_.
+
+ Returns:
+ A 128-bit int for use as a trace ID
+ """
+
+
+class RandomIdGenerator(IdGenerator):
+ """The default ID generator for TracerProvider which randomly generates all
+ bits when generating IDs.
+ """
+
+ def generate_span_id(self) -> int:
+ span_id = random.getrandbits(64)
+ while span_id == trace.INVALID_SPAN_ID:
+ span_id = random.getrandbits(64)
+ return span_id
+
+ def generate_trace_id(self) -> int:
+ trace_id = random.getrandbits(128)
+ while trace_id == trace.INVALID_TRACE_ID:
+ trace_id = random.getrandbits(128)
+ return trace_id
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/sampling.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/sampling.py
new file mode 100644
index 00000000..fb6990a0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/trace/sampling.py
@@ -0,0 +1,453 @@
+# Copyright The OpenTelemetry Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+For general information about sampling, see `the specification <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#sampling>`_.
+
+OpenTelemetry provides two types of samplers:
+
+- `StaticSampler`
+- `TraceIdRatioBased`
+
+A `StaticSampler` always returns the same sampling result regardless of the conditions. Both possible StaticSamplers are already created:
+
+- Always sample spans: ALWAYS_ON
+- Never sample spans: ALWAYS_OFF
+
+A `TraceIdRatioBased` sampler makes a random sampling result based on the sampling probability given.
+
+If the span being sampled has a parent, `ParentBased` will respect the parent delegate sampler. Otherwise, it returns the sampling result from the given root sampler.
+
+Currently, sampling results are always made during the creation of the span. However, this might not always be the case in the future (see `OTEP #115 <https://github.com/open-telemetry/oteps/pull/115>`_).
+
+Custom samplers can be created by subclassing `Sampler` and implementing `Sampler.should_sample` as well as `Sampler.get_description`.
+
+Samplers are able to modify the `opentelemetry.trace.span.TraceState` of the parent of the span being created. For custom samplers, it is suggested to implement `Sampler.should_sample` to utilize the
+parent span context's `opentelemetry.trace.span.TraceState` and pass into the `SamplingResult` instead of the explicit trace_state field passed into the parameter of `Sampler.should_sample`.
+
+To use a sampler, pass it into the tracer provider constructor. For example:
+
+.. code:: python
+
+ from opentelemetry import trace
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import (
+ ConsoleSpanExporter,
+ SimpleSpanProcessor,
+ )
+ from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
+
+ # sample 1 in every 1000 traces
+ sampler = TraceIdRatioBased(1/1000)
+
+ # set the sampler onto the global tracer provider
+ trace.set_tracer_provider(TracerProvider(sampler=sampler))
+
+ # set up an exporter for sampled spans
+ trace.get_tracer_provider().add_span_processor(
+ SimpleSpanProcessor(ConsoleSpanExporter())
+ )
+
+ # created spans will now be sampled by the TraceIdRatioBased sampler
+ with trace.get_tracer(__name__).start_as_current_span("Test Span"):
+ ...
+
+The tracer sampler can also be configured via environment variables ``OTEL_TRACES_SAMPLER`` and ``OTEL_TRACES_SAMPLER_ARG`` (only if applicable).
+The list of built-in values for ``OTEL_TRACES_SAMPLER`` are:
+
+ * always_on - Sampler that always samples spans, regardless of the parent span's sampling decision.
+ * always_off - Sampler that never samples spans, regardless of the parent span's sampling decision.
+ * traceidratio - Sampler that samples probabilistically based on rate.
+ * parentbased_always_on - (default) Sampler that respects its parent span's sampling decision, but otherwise always samples.
+ * parentbased_always_off - Sampler that respects its parent span's sampling decision, but otherwise never samples.
+ * parentbased_traceidratio - Sampler that respects its parent span's sampling decision, but otherwise samples probabilistically based on rate.
+
+Sampling probability can be set with ``OTEL_TRACES_SAMPLER_ARG`` if the sampler is traceidratio or parentbased_traceidratio. Rate must be in the range [0.0,1.0]. When not provided rate will be set to
+1.0 (maximum rate possible).
+
+Prev example but with environment variables. Please make sure to set the env ``OTEL_TRACES_SAMPLER=traceidratio`` and ``OTEL_TRACES_SAMPLER_ARG=0.001``.
+
+.. code:: python
+
+ from opentelemetry import trace
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import (
+ ConsoleSpanExporter,
+ SimpleSpanProcessor,
+ )
+
+ trace.set_tracer_provider(TracerProvider())
+
+ # set up an exporter for sampled spans
+ trace.get_tracer_provider().add_span_processor(
+ SimpleSpanProcessor(ConsoleSpanExporter())
+ )
+
+ # created spans will now be sampled by the TraceIdRatioBased sampler with rate 1/1000.
+ with trace.get_tracer(__name__).start_as_current_span("Test Span"):
+ ...
+
+When utilizing a configurator, you can configure a custom sampler. In order to create a configurable custom sampler, create an entry point for the custom sampler
+factory method or function under the entry point group, ``opentelemetry_traces_sampler``. The custom sampler factory method must be of type ``Callable[[str], Sampler]``, taking a single string argument and
+returning a Sampler object. The single input will come from the string value of the ``OTEL_TRACES_SAMPLER_ARG`` environment variable. If ``OTEL_TRACES_SAMPLER_ARG`` is not configured, the input will
+be an empty string. For example:
+
+.. code:: python
+
+ setup(
+ ...
+ entry_points={
+ ...
+ "opentelemetry_traces_sampler": [
+ "custom_sampler_name = path.to.sampler.factory.method:CustomSamplerFactory.get_sampler"
+ ]
+ }
+ )
+ # ...
+ class CustomRatioSampler(Sampler):
+ def __init__(rate):
+ # ...
+ # ...
+ class CustomSamplerFactory:
+ @staticmethod
+ def get_sampler(sampler_argument):
+ try:
+ rate = float(sampler_argument)
+ return CustomSampler(rate)
+ except ValueError: # In case argument is empty string.
+ return CustomSampler(0.5)
+
+In order to configure you application with a custom sampler's entry point, set the ``OTEL_TRACES_SAMPLER`` environment variable to the key name of the entry point. For example, to configured the
+above sampler, set ``OTEL_TRACES_SAMPLER=custom_sampler_name`` and ``OTEL_TRACES_SAMPLER_ARG=0.5``.
+"""
+
+import abc
+import enum
+import os
+from logging import getLogger
+from types import MappingProxyType
+from typing import Optional, Sequence
+
+# pylint: disable=unused-import
+from opentelemetry.context import Context
+from opentelemetry.sdk.environment_variables import (
+ OTEL_TRACES_SAMPLER,
+ OTEL_TRACES_SAMPLER_ARG,
+)
+from opentelemetry.trace import Link, SpanKind, get_current_span
+from opentelemetry.trace.span import TraceState
+from opentelemetry.util.types import Attributes
+
+_logger = getLogger(__name__)
+
+
+class Decision(enum.Enum):
+ # IsRecording() == false, span will not be recorded and all events and attributes will be dropped.
+ DROP = 0
+ # IsRecording() == true, but Sampled flag MUST NOT be set.
+ RECORD_ONLY = 1
+ # IsRecording() == true AND Sampled flag` MUST be set.
+ RECORD_AND_SAMPLE = 2
+
+ def is_recording(self):
+ return self in (Decision.RECORD_ONLY, Decision.RECORD_AND_SAMPLE)
+
+ def is_sampled(self):
+ return self is Decision.RECORD_AND_SAMPLE
+
+
+class SamplingResult:
+ """A sampling result as applied to a newly-created Span.
+
+ Args:
+ decision: A sampling decision based off of whether the span is recorded
+ and the sampled flag in trace flags in the span context.
+ attributes: Attributes to add to the `opentelemetry.trace.Span`.
+ trace_state: The tracestate used for the `opentelemetry.trace.Span`.
+ Could possibly have been modified by the sampler.
+ """
+
+ def __repr__(self) -> str:
+ return f"{type(self).__name__}({str(self.decision)}, attributes={str(self.attributes)})"
+
+ def __init__(
+ self,
+ decision: Decision,
+ attributes: "Attributes" = None,
+ trace_state: Optional["TraceState"] = None,
+ ) -> None:
+ self.decision = decision
+ if attributes is None:
+ self.attributes = MappingProxyType({})
+ else:
+ self.attributes = MappingProxyType(attributes)
+ self.trace_state = trace_state
+
+
+class Sampler(abc.ABC):
+ @abc.abstractmethod
+ def should_sample(
+ self,
+ parent_context: Optional["Context"],
+ trace_id: int,
+ name: str,
+ kind: Optional[SpanKind] = None,
+ attributes: Attributes = None,
+ links: Optional[Sequence["Link"]] = None,
+ trace_state: Optional["TraceState"] = None,
+ ) -> "SamplingResult":
+ pass
+
+ @abc.abstractmethod
+ def get_description(self) -> str:
+ pass
+
+
+class StaticSampler(Sampler):
+ """Sampler that always returns the same decision."""
+
+ def __init__(self, decision: "Decision") -> None:
+ self._decision = decision
+
+ def should_sample(
+ self,
+ parent_context: Optional["Context"],
+ trace_id: int,
+ name: str,
+ kind: Optional[SpanKind] = None,
+ attributes: Attributes = None,
+ links: Optional[Sequence["Link"]] = None,
+ trace_state: Optional["TraceState"] = None,
+ ) -> "SamplingResult":
+ if self._decision is Decision.DROP:
+ attributes = None
+ return SamplingResult(
+ self._decision,
+ attributes,
+ _get_parent_trace_state(parent_context),
+ )
+
+ def get_description(self) -> str:
+ if self._decision is Decision.DROP:
+ return "AlwaysOffSampler"
+ return "AlwaysOnSampler"
+
+
+ALWAYS_OFF = StaticSampler(Decision.DROP)
+"""Sampler that never samples spans, regardless of the parent span's sampling decision."""
+
+ALWAYS_ON = StaticSampler(Decision.RECORD_AND_SAMPLE)
+"""Sampler that always samples spans, regardless of the parent span's sampling decision."""
+
+
+class TraceIdRatioBased(Sampler):
+ """
+ Sampler that makes sampling decisions probabilistically based on `rate`.
+
+ Args:
+ rate: Probability (between 0 and 1) that a span will be sampled
+ """
+
+ def __init__(self, rate: float):
+ if rate < 0.0 or rate > 1.0:
+ raise ValueError("Probability must be in range [0.0, 1.0].")
+ self._rate = rate
+ self._bound = self.get_bound_for_rate(self._rate)
+
+ # For compatibility with 64 bit trace IDs, the sampler checks the 64
+ # low-order bits of the trace ID to decide whether to sample a given trace.
+ TRACE_ID_LIMIT = (1 << 64) - 1
+
+ @classmethod
+ def get_bound_for_rate(cls, rate: float) -> int:
+ return round(rate * (cls.TRACE_ID_LIMIT + 1))
+
+ @property
+ def rate(self) -> float:
+ return self._rate
+
+ @property
+ def bound(self) -> int:
+ return self._bound
+
+ def should_sample(
+ self,
+ parent_context: Optional["Context"],
+ trace_id: int,
+ name: str,
+ kind: Optional[SpanKind] = None,
+ attributes: Attributes = None,
+ links: Optional[Sequence["Link"]] = None,
+ trace_state: Optional["TraceState"] = None,
+ ) -> "SamplingResult":
+ decision = Decision.DROP
+ if trace_id & self.TRACE_ID_LIMIT < self.bound:
+ decision = Decision.RECORD_AND_SAMPLE
+ if decision is Decision.DROP:
+ attributes = None
+ return SamplingResult(
+ decision,
+ attributes,
+ _get_parent_trace_state(parent_context),
+ )
+
+ def get_description(self) -> str:
+ return f"TraceIdRatioBased{{{self._rate}}}"
+
+
+class ParentBased(Sampler):
+ """
+ If a parent is set, applies the respective delegate sampler.
+ Otherwise, uses the root provided at initialization to make a
+ decision.
+
+ Args:
+ root: Sampler called for spans with no parent (root spans).
+ remote_parent_sampled: Sampler called for a remote sampled parent.
+ remote_parent_not_sampled: Sampler called for a remote parent that is
+ not sampled.
+ local_parent_sampled: Sampler called for a local sampled parent.
+ local_parent_not_sampled: Sampler called for a local parent that is
+ not sampled.
+ """
+
+ def __init__(
+ self,
+ root: Sampler,
+ remote_parent_sampled: Sampler = ALWAYS_ON,
+ remote_parent_not_sampled: Sampler = ALWAYS_OFF,
+ local_parent_sampled: Sampler = ALWAYS_ON,
+ local_parent_not_sampled: Sampler = ALWAYS_OFF,
+ ):
+ self._root = root
+ self._remote_parent_sampled = remote_parent_sampled
+ self._remote_parent_not_sampled = remote_parent_not_sampled
+ self._local_parent_sampled = local_parent_sampled
+ self._local_parent_not_sampled = local_parent_not_sampled
+
+ def should_sample(
+ self,
+ parent_context: Optional["Context"],
+ trace_id: int,
+ name: str,
+ kind: Optional[SpanKind] = None,
+ attributes: Attributes = None,
+ links: Optional[Sequence["Link"]] = None,
+ trace_state: Optional["TraceState"] = None,
+ ) -> "SamplingResult":
+ parent_span_context = get_current_span(
+ parent_context
+ ).get_span_context()
+ # default to the root sampler
+ sampler = self._root
+ # respect the sampling and remote flag of the parent if present
+ if parent_span_context is not None and parent_span_context.is_valid:
+ if parent_span_context.is_remote:
+ if parent_span_context.trace_flags.sampled:
+ sampler = self._remote_parent_sampled
+ else:
+ sampler = self._remote_parent_not_sampled
+ else:
+ if parent_span_context.trace_flags.sampled:
+ sampler = self._local_parent_sampled
+ else:
+ sampler = self._local_parent_not_sampled
+
+ return sampler.should_sample(
+ parent_context=parent_context,
+ trace_id=trace_id,
+ name=name,
+ kind=kind,
+ attributes=attributes,
+ links=links,
+ )
+
+ def get_description(self):
+ return f"ParentBased{{root:{self._root.get_description()},remoteParentSampled:{self._remote_parent_sampled.get_description()},remoteParentNotSampled:{self._remote_parent_not_sampled.get_description()},localParentSampled:{self._local_parent_sampled.get_description()},localParentNotSampled:{self._local_parent_not_sampled.get_description()}}}"
+
+
+DEFAULT_OFF = ParentBased(ALWAYS_OFF)
+"""Sampler that respects its parent span's sampling decision, but otherwise never samples."""
+
+DEFAULT_ON = ParentBased(ALWAYS_ON)
+"""Sampler that respects its parent span's sampling decision, but otherwise always samples."""
+
+
+class ParentBasedTraceIdRatio(ParentBased):
+ """
+ Sampler that respects its parent span's sampling decision, but otherwise
+ samples probabilistically based on `rate`.
+ """
+
+ def __init__(self, rate: float):
+ root = TraceIdRatioBased(rate=rate)
+ super().__init__(root=root)
+
+
+class _AlwaysOff(StaticSampler):
+ def __init__(self, _):
+ super().__init__(Decision.DROP)
+
+
+class _AlwaysOn(StaticSampler):
+ def __init__(self, _):
+ super().__init__(Decision.RECORD_AND_SAMPLE)
+
+
+class _ParentBasedAlwaysOff(ParentBased):
+ def __init__(self, _):
+ super().__init__(ALWAYS_OFF)
+
+
+class _ParentBasedAlwaysOn(ParentBased):
+ def __init__(self, _):
+ super().__init__(ALWAYS_ON)
+
+
+_KNOWN_SAMPLERS = {
+ "always_on": ALWAYS_ON,
+ "always_off": ALWAYS_OFF,
+ "parentbased_always_on": DEFAULT_ON,
+ "parentbased_always_off": DEFAULT_OFF,
+ "traceidratio": TraceIdRatioBased,
+ "parentbased_traceidratio": ParentBasedTraceIdRatio,
+}
+
+
+def _get_from_env_or_default() -> Sampler:
+ trace_sampler = os.getenv(
+ OTEL_TRACES_SAMPLER, "parentbased_always_on"
+ ).lower()
+ if trace_sampler not in _KNOWN_SAMPLERS:
+ _logger.warning("Couldn't recognize sampler %s.", trace_sampler)
+ trace_sampler = "parentbased_always_on"
+
+ if trace_sampler in ("traceidratio", "parentbased_traceidratio"):
+ try:
+ rate = float(os.getenv(OTEL_TRACES_SAMPLER_ARG))
+ except (ValueError, TypeError):
+ _logger.warning("Could not convert TRACES_SAMPLER_ARG to float.")
+ rate = 1.0
+ return _KNOWN_SAMPLERS[trace_sampler](rate)
+
+ return _KNOWN_SAMPLERS[trace_sampler]
+
+
+def _get_parent_trace_state(
+ parent_context: Optional[Context],
+) -> Optional["TraceState"]:
+ parent_span_context = get_current_span(parent_context).get_span_context()
+ if parent_span_context is None or not parent_span_context.is_valid:
+ return None
+ return parent_span_context.trace_state