aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/_utils.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/_utils.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/_utils.py302
1 files changed, 302 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/_utils.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/_utils.py
new file mode 100644
index 00000000..588fe6e3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/_utils.py
@@ -0,0 +1,302 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+
+import datetime
+import locale
+from os import environ
+from os.path import isdir
+import platform
+import threading
+import time
+import warnings
+from typing import Callable, Dict, Any
+
+from opentelemetry.semconv.attributes.service_attributes import SERVICE_NAME
+from opentelemetry.semconv.resource import ResourceAttributes
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.util import ns_to_iso_str
+from opentelemetry.util.types import Attributes
+
+from azure.core.pipeline.policies import BearerTokenCredentialPolicy
+from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys, TelemetryItem
+from azure.monitor.opentelemetry.exporter._version import VERSION as ext_version
+from azure.monitor.opentelemetry.exporter._constants import (
+ _AKS_ARM_NAMESPACE_ID,
+ _DEFAULT_AAD_SCOPE,
+ _INSTRUMENTATIONS_BIT_MAP,
+ _FUNCTIONS_WORKER_RUNTIME,
+ _PYTHON_APPLICATIONINSIGHTS_ENABLE_TELEMETRY,
+ _WEBSITE_SITE_NAME,
+)
+
+
+opentelemetry_version = ""
+
+# Workaround for missing version file
+try:
+ from importlib.metadata import version
+
+ opentelemetry_version = version("opentelemetry-sdk")
+except ImportError:
+ # Temporary workaround for <Py3.8
+ # importlib-metadata causing issues in CI
+ import pkg_resources # type: ignore
+
+ opentelemetry_version = pkg_resources.get_distribution("opentelemetry-sdk").version
+
+
+# Azure App Service
+
+
+def _is_on_app_service():
+ return environ.get(_WEBSITE_SITE_NAME) is not None
+
+
+# Functions
+
+
+def _is_on_functions():
+ return environ.get(_FUNCTIONS_WORKER_RUNTIME) is not None
+
+
+# AKS
+
+
+def _is_on_aks():
+ return _AKS_ARM_NAMESPACE_ID in environ
+
+
+# Attach
+
+
+def _is_attach_enabled():
+ if _is_on_app_service():
+ return isdir("/agents/python/")
+ if _is_on_functions():
+ return environ.get(_PYTHON_APPLICATIONINSIGHTS_ENABLE_TELEMETRY) == "true"
+ return False
+
+
+def _get_sdk_version_prefix():
+ sdk_version_prefix = ""
+ rp = "u"
+ if _is_on_functions():
+ rp = "f"
+ elif _is_on_app_service():
+ rp = "a"
+ # TODO: Add VM scenario outside statsbeat
+ # elif _is_on_vm():
+ # rp = 'v'
+ elif _is_on_aks():
+ rp = "k"
+
+ os = "u"
+ system = platform.system()
+ if system == "Linux":
+ os = "l"
+ elif system == "Windows":
+ os = "w"
+
+ attach_type = "m"
+ if _is_attach_enabled():
+ attach_type = "i"
+ sdk_version_prefix = "{}{}{}_".format(rp, os, attach_type)
+
+ return sdk_version_prefix
+
+
+def _get_sdk_version():
+ return "{}py{}:otel{}:ext{}".format(
+ _get_sdk_version_prefix(), platform.python_version(), opentelemetry_version, ext_version
+ )
+
+
+def _getlocale():
+ try:
+ with warnings.catch_warnings():
+ # temporary work-around for https://github.com/python/cpython/issues/82986
+ # by continuing to use getdefaultlocale() even though it has been deprecated.
+ # we ignore the deprecation warnings to reduce noise
+ warnings.simplefilter("ignore", category=DeprecationWarning)
+ return locale.getdefaultlocale()[0]
+ except AttributeError:
+ # locale.getlocal() has issues on Windows: https://github.com/python/cpython/issues/82986
+ # Use this as a fallback if locale.getdefaultlocale() doesn't exist (>Py3.13)
+ return locale.getlocale()[0]
+
+
+azure_monitor_context = {
+ ContextTagKeys.AI_DEVICE_ID: platform.node(),
+ ContextTagKeys.AI_DEVICE_LOCALE: _getlocale(),
+ ContextTagKeys.AI_DEVICE_OS_VERSION: platform.version(),
+ ContextTagKeys.AI_DEVICE_TYPE: "Other",
+ ContextTagKeys.AI_INTERNAL_SDK_VERSION: _get_sdk_version(),
+}
+
+
+def ns_to_duration(nanoseconds: int) -> str:
+ value = (nanoseconds + 500000) // 1000000 # duration in milliseconds
+ value, milliseconds = divmod(value, 1000)
+ value, seconds = divmod(value, 60)
+ value, minutes = divmod(value, 60)
+ days, hours = divmod(value, 24)
+ return "{:d}.{:02d}:{:02d}:{:02d}.{:03d}".format(days, hours, minutes, seconds, milliseconds)
+
+
+# Replicate .netDateTime.Ticks(), which is the UTC time, expressed as the number
+# of 100-nanosecond intervals that have elapsed since 12:00:00 midnight on
+# January 1, 0001.
+def _ticks_since_dot_net_epoch():
+ # Since time.time() is the elapsed time since UTC January 1, 1970, we have
+ # to shift this start time, and then multiply by 10^7 to get the number of
+ # 100-nanosecond intervals
+ shift_time = int((datetime.datetime(1970, 1, 1, 0, 0, 0) - datetime.datetime(1, 1, 1, 0, 0, 0)).total_seconds()) * (
+ 10**7
+ )
+ # Add shift time to 100-ns intervals since time.time()
+ return int(time.time() * (10**7)) + shift_time
+
+
+_INSTRUMENTATIONS_BIT_MASK = 0
+_INSTRUMENTATIONS_BIT_MASK_LOCK = threading.Lock()
+
+
+def get_instrumentations():
+ return _INSTRUMENTATIONS_BIT_MASK
+
+
+def add_instrumentation(instrumentation_name: str):
+ with _INSTRUMENTATIONS_BIT_MASK_LOCK:
+ global _INSTRUMENTATIONS_BIT_MASK # pylint: disable=global-statement
+ instrumentation_bits = _INSTRUMENTATIONS_BIT_MAP.get(instrumentation_name, 0)
+ _INSTRUMENTATIONS_BIT_MASK |= instrumentation_bits
+
+
+def remove_instrumentation(instrumentation_name: str):
+ with _INSTRUMENTATIONS_BIT_MASK_LOCK:
+ global _INSTRUMENTATIONS_BIT_MASK # pylint: disable=global-statement
+ instrumentation_bits = _INSTRUMENTATIONS_BIT_MAP.get(instrumentation_name, 0)
+ _INSTRUMENTATIONS_BIT_MASK &= ~instrumentation_bits
+
+
+class PeriodicTask(threading.Thread):
+ """Thread that periodically calls a given function.
+
+ :type interval: int or float
+ :param interval: Seconds between calls to the function.
+
+ :type function: function
+ :param function: The function to call.
+
+ :type args: list
+ :param args: The args passed in while calling `function`.
+
+ :type kwargs: dict
+ :param args: The kwargs passed in while calling `function`.
+ """
+
+ def __init__(self, interval: int, function: Callable, *args: Any, **kwargs: Any):
+ super().__init__(name=kwargs.pop("name", None))
+ self.interval = interval
+ self.function = function
+ self.args = args or [] # type: ignore
+ self.kwargs = kwargs or {}
+ self.finished = threading.Event()
+
+ def run(self):
+ wait_time = self.interval
+ while not self.finished.wait(wait_time):
+ start_time = time.time()
+ self.function(*self.args, **self.kwargs)
+ elapsed_time = time.time() - start_time
+ wait_time = max(self.interval - elapsed_time, 0)
+
+ def cancel(self):
+ self.finished.set()
+
+
+def _create_telemetry_item(timestamp: int) -> TelemetryItem:
+ return TelemetryItem(
+ name="",
+ instrumentation_key="",
+ tags=dict(azure_monitor_context), # type: ignore
+ time=ns_to_iso_str(timestamp), # type: ignore
+ )
+
+
+def _populate_part_a_fields(resource: Resource):
+ tags = {}
+ if resource and resource.attributes:
+ service_name = resource.attributes.get(SERVICE_NAME)
+ service_namespace = resource.attributes.get(ResourceAttributes.SERVICE_NAMESPACE)
+ service_instance_id = resource.attributes.get(ResourceAttributes.SERVICE_INSTANCE_ID)
+ device_id = resource.attributes.get(ResourceAttributes.DEVICE_ID)
+ device_model = resource.attributes.get(ResourceAttributes.DEVICE_MODEL_NAME)
+ device_make = resource.attributes.get(ResourceAttributes.DEVICE_MANUFACTURER)
+ app_version = resource.attributes.get(ResourceAttributes.SERVICE_VERSION)
+ if service_name:
+ if service_namespace:
+ tags[ContextTagKeys.AI_CLOUD_ROLE] = str(service_namespace) + "." + str(service_name)
+ else:
+ tags[ContextTagKeys.AI_CLOUD_ROLE] = service_name # type: ignore
+ if service_instance_id:
+ tags[ContextTagKeys.AI_CLOUD_ROLE_INSTANCE] = service_instance_id # type: ignore
+ else:
+ tags[ContextTagKeys.AI_CLOUD_ROLE_INSTANCE] = platform.node() # hostname default
+ tags[ContextTagKeys.AI_INTERNAL_NODE_NAME] = tags[ContextTagKeys.AI_CLOUD_ROLE_INSTANCE]
+ if device_id:
+ tags[ContextTagKeys.AI_DEVICE_ID] = device_id # type: ignore
+ if device_model:
+ tags[ContextTagKeys.AI_DEVICE_MODEL] = device_model # type: ignore
+ if device_make:
+ tags[ContextTagKeys.AI_DEVICE_OEM_NAME] = device_make # type: ignore
+ if app_version:
+ tags[ContextTagKeys.AI_APPLICATION_VER] = app_version # type: ignore
+
+ return tags
+
+
+# pylint: disable=W0622
+def _filter_custom_properties(properties: Attributes, filter=None) -> Dict[str, str]:
+ truncated_properties: Dict[str, str] = {}
+ if not properties:
+ return truncated_properties
+ for key, val in properties.items():
+ # Apply filter function
+ if filter is not None:
+ if not filter(key, val):
+ continue
+ # Apply truncation rules
+ # Max key length is 150, value is 8192
+ if not key or len(key) > 150 or val is None:
+ continue
+ truncated_properties[key] = str(val)[:8192]
+ return truncated_properties
+
+
+def _get_auth_policy(credential, default_auth_policy, aad_audience=None):
+ if credential:
+ if hasattr(credential, "get_token"):
+ return BearerTokenCredentialPolicy(
+ credential,
+ _get_scope(aad_audience),
+ )
+ raise ValueError("Must pass in valid TokenCredential.")
+ return default_auth_policy
+
+
+def _get_scope(aad_audience=None):
+ # The AUDIENCE is a url that identifies Azure Monitor in a specific cloud
+ # (For example: "https://monitor.azure.com/").
+ # The SCOPE is the audience + the permission
+ # (For example: "https://monitor.azure.com//.default").
+ return _DEFAULT_AAD_SCOPE if not aad_audience else "{}/.default".format(aad_audience)
+
+
+class Singleton(type):
+ _instance = None
+
+ def __call__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
+ return cls._instance