diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/opentelemetry/sdk/util')
3 files changed, 393 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/__init__.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/__init__.py new file mode 100644 index 00000000..68f10ddc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/__init__.py @@ -0,0 +1,152 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import threading +from collections import deque +from collections.abc import MutableMapping, Sequence +from typing import Optional + +from deprecated import deprecated + + +def ns_to_iso_str(nanoseconds): + """Get an ISO 8601 string from time_ns value.""" + ts = datetime.datetime.fromtimestamp( + nanoseconds / 1e9, tz=datetime.timezone.utc + ) + return ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + +def get_dict_as_key(labels): + """Converts a dict to be used as a unique key""" + return tuple( + sorted( + map( + lambda kv: ( + (kv[0], tuple(kv[1])) if isinstance(kv[1], list) else kv + ), + labels.items(), + ) + ) + ) + + +class BoundedList(Sequence): + """An append only list with a fixed max size. + + Calls to `append` and `extend` will drop the oldest elements if there is + not enough room. + """ + + def __init__(self, maxlen: Optional[int]): + self.dropped = 0 + self._dq = deque(maxlen=maxlen) # type: deque + self._lock = threading.Lock() + + def __repr__(self): + return f"{type(self).__name__}({list(self._dq)}, maxlen={self._dq.maxlen})" + + def __getitem__(self, index): + return self._dq[index] + + def __len__(self): + return len(self._dq) + + def __iter__(self): + with self._lock: + return iter(deque(self._dq)) + + def append(self, item): + with self._lock: + if ( + self._dq.maxlen is not None + and len(self._dq) == self._dq.maxlen + ): + self.dropped += 1 + self._dq.append(item) + + def extend(self, seq): + with self._lock: + if self._dq.maxlen is not None: + to_drop = len(seq) + len(self._dq) - self._dq.maxlen + if to_drop > 0: + self.dropped += to_drop + self._dq.extend(seq) + + @classmethod + def from_seq(cls, maxlen, seq): + seq = tuple(seq) + bounded_list = cls(maxlen) + bounded_list.extend(seq) + return bounded_list + + +@deprecated(version="1.4.0") # type: ignore +class BoundedDict(MutableMapping): + """An ordered dict with a fixed max capacity. + + Oldest elements are dropped when the dict is full and a new element is + added. + """ + + def __init__(self, maxlen: Optional[int]): + if maxlen is not None: + if not isinstance(maxlen, int): + raise ValueError + if maxlen < 0: + raise ValueError + self.maxlen = maxlen + self.dropped = 0 + self._dict = {} # type: dict + self._lock = threading.Lock() # type: threading.Lock + + def __repr__(self): + return ( + f"{type(self).__name__}({dict(self._dict)}, maxlen={self.maxlen})" + ) + + def __getitem__(self, key): + return self._dict[key] + + def __setitem__(self, key, value): + with self._lock: + if self.maxlen is not None and self.maxlen == 0: + self.dropped += 1 + return + + if key in self._dict: + del self._dict[key] + elif self.maxlen is not None and len(self._dict) == self.maxlen: + del self._dict[next(iter(self._dict.keys()))] + self.dropped += 1 + self._dict[key] = value + + def __delitem__(self, key): + del self._dict[key] + + def __iter__(self): + with self._lock: + return iter(self._dict.copy()) + + def __len__(self): + return len(self._dict) + + @classmethod + def from_map(cls, maxlen, mapping): + mapping = dict(mapping) + bounded_dict = cls(maxlen) + for key, value in mapping.items(): + bounded_dict[key] = value + return bounded_dict diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/__init__.pyi b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/__init__.pyi new file mode 100644 index 00000000..55042fcf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/__init__.pyi @@ -0,0 +1,74 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import ( + Iterable, + Iterator, + Mapping, + MutableMapping, + Sequence, + TypeVar, + overload, +) + +from opentelemetry.util.types import AttributesAsKey, AttributeValue + +_T = TypeVar("_T") +_KT = TypeVar("_KT") +_VT = TypeVar("_VT") + +def ns_to_iso_str(nanoseconds: int) -> str: ... +def get_dict_as_key( + labels: Mapping[str, AttributeValue], +) -> AttributesAsKey: ... + +# pylint: disable=no-self-use +class BoundedList(Sequence[_T]): + """An append only list with a fixed max size. + + Calls to `append` and `extend` will drop the oldest elements if there is + not enough room. + """ + + dropped: int + def __init__(self, maxlen: int): ... + def insert(self, index: int, value: _T) -> None: ... + @overload + def __getitem__(self, i: int) -> _T: ... + @overload + def __getitem__(self, s: slice) -> Sequence[_T]: ... + def __len__(self) -> int: ... + def append(self, item: _T) -> None: ... + def extend(self, seq: Sequence[_T]) -> None: ... + @classmethod + def from_seq(cls, maxlen: int, seq: Iterable[_T]) -> BoundedList[_T]: ... # pylint: disable=undefined-variable + +class BoundedDict(MutableMapping[_KT, _VT]): + """An ordered dict with a fixed max capacity. + + Oldest elements are dropped when the dict is full and a new element is + added. + """ + + dropped: int + def __init__(self, maxlen: int): ... + def __getitem__(self, k: _KT) -> _VT: ... + def __setitem__(self, k: _KT, v: _VT) -> None: ... + def __delitem__(self, v: _KT) -> None: ... + def __iter__(self) -> Iterator[_KT]: ... + def __len__(self) -> int: ... + @classmethod + def from_map( + cls, maxlen: int, mapping: Mapping[_KT, _VT] + ) -> BoundedDict[_KT, _VT]: ... # pylint: disable=undefined-variable diff --git a/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/instrumentation.py b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/instrumentation.py new file mode 100644 index 00000000..6b45bf2a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/opentelemetry/sdk/util/instrumentation.py @@ -0,0 +1,167 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from json import dumps +from typing import Optional + +from deprecated import deprecated + +from opentelemetry.attributes import BoundedAttributes +from opentelemetry.util.types import Attributes + + +class InstrumentationInfo: + """Immutable information about an instrumentation library module. + + See `opentelemetry.trace.TracerProvider.get_tracer` for the meaning of these + properties. + """ + + __slots__ = ("_name", "_version", "_schema_url") + + @deprecated(version="1.11.1", reason="You should use InstrumentationScope") + def __init__( + self, + name: str, + version: Optional[str] = None, + schema_url: Optional[str] = None, + ): + self._name = name + self._version = version + if schema_url is None: + schema_url = "" + self._schema_url = schema_url + + def __repr__(self): + return f"{type(self).__name__}({self._name}, {self._version}, {self._schema_url})" + + def __hash__(self): + return hash((self._name, self._version, self._schema_url)) + + def __eq__(self, value): + return type(value) is type(self) and ( + self._name, + self._version, + self._schema_url, + ) == (value._name, value._version, value._schema_url) + + def __lt__(self, value): + if type(value) is not type(self): + return NotImplemented + return (self._name, self._version, self._schema_url) < ( + value._name, + value._version, + value._schema_url, + ) + + @property + def schema_url(self) -> Optional[str]: + return self._schema_url + + @property + def version(self) -> Optional[str]: + return self._version + + @property + def name(self) -> str: + return self._name + + +class InstrumentationScope: + """A logical unit of the application code with which the emitted telemetry can be + associated. + + See `opentelemetry.trace.TracerProvider.get_tracer` for the meaning of these + properties. + """ + + __slots__ = ("_name", "_version", "_schema_url", "_attributes") + + def __init__( + self, + name: str, + version: Optional[str] = None, + schema_url: Optional[str] = None, + attributes: Optional[Attributes] = None, + ) -> None: + self._name = name + self._version = version + if schema_url is None: + schema_url = "" + self._schema_url = schema_url + self._attributes = BoundedAttributes(attributes=attributes) + + def __repr__(self) -> str: + return f"{type(self).__name__}({self._name}, {self._version}, {self._schema_url}, {self._attributes})" + + def __hash__(self) -> int: + return hash((self._name, self._version, self._schema_url)) + + def __eq__(self, value: object) -> bool: + if not isinstance(value, InstrumentationScope): + return NotImplemented + return ( + self._name, + self._version, + self._schema_url, + self._attributes, + ) == ( + value._name, + value._version, + value._schema_url, + value._attributes, + ) + + def __lt__(self, value: object) -> bool: + if not isinstance(value, InstrumentationScope): + return NotImplemented + return ( + self._name, + self._version, + self._schema_url, + self._attributes, + ) < ( + value._name, + value._version, + value._schema_url, + value._attributes, + ) + + @property + def schema_url(self) -> Optional[str]: + return self._schema_url + + @property + def version(self) -> Optional[str]: + return self._version + + @property + def name(self) -> str: + return self._name + + @property + def attributes(self) -> Attributes: + return self._attributes + + def to_json(self, indent: Optional[int] = 4) -> str: + return dumps( + { + "name": self._name, + "version": self._version, + "schema_url": self._schema_url, + "attributes": ( + dict(self._attributes) if bool(self._attributes) else None + ), + }, + indent=indent, + ) |