aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_exporter.py29
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_state.py70
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat.py77
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py417
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py69
6 files changed, 662 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_exporter.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_exporter.py
new file mode 100644
index 00000000..c476d3ac
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_exporter.py
@@ -0,0 +1,29 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+from typing import Optional
+from opentelemetry.sdk.metrics.export import DataPointT
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.util.instrumentation import InstrumentationScope
+
+from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
+from azure.monitor.opentelemetry.exporter import AzureMonitorMetricExporter
+from azure.monitor.opentelemetry.exporter._constants import _STATSBEAT_METRIC_NAME_MAPPINGS
+
+
+class _StatsBeatExporter(AzureMonitorMetricExporter):
+
+ def _point_to_envelope(
+ self,
+ point: DataPointT,
+ name: str,
+ resource: Optional[Resource] = None,
+ scope: Optional[InstrumentationScope] = None,
+ ) -> Optional[TelemetryItem]:
+ # map statsbeat name from OpenTelemetry name
+ name = _STATSBEAT_METRIC_NAME_MAPPINGS[name]
+ return super()._point_to_envelope(
+ point,
+ name,
+ resource,
+ None,
+ )
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_state.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_state.py
new file mode 100644
index 00000000..e555ba71
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_state.py
@@ -0,0 +1,70 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+import os
+import threading
+from typing import Dict, Union
+
+from azure.monitor.opentelemetry.exporter._constants import _APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL
+
+_REQUESTS_MAP: Dict[str, Union[int, Dict[int, int]]] = {}
+_REQUESTS_MAP_LOCK = threading.Lock()
+
+_STATSBEAT_STATE = {
+ "INITIAL_FAILURE_COUNT": 0,
+ "INITIAL_SUCCESS": False,
+ "SHUTDOWN": False,
+ "CUSTOM_EVENTS_FEATURE_SET": False,
+ "LIVE_METRICS_FEATURE_SET": False,
+}
+_STATSBEAT_STATE_LOCK = threading.Lock()
+_STATSBEAT_FAILURE_COUNT_THRESHOLD = 3
+
+
+def is_statsbeat_enabled():
+ disabled = os.environ.get(_APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL)
+ return disabled is None or disabled.lower() != "true"
+
+
+def increment_statsbeat_initial_failure_count(): # pylint: disable=name-too-long
+ with _STATSBEAT_STATE_LOCK:
+ _STATSBEAT_STATE["INITIAL_FAILURE_COUNT"] += 1
+
+
+def increment_and_check_statsbeat_failure_count(): # pylint: disable=name-too-long
+ increment_statsbeat_initial_failure_count()
+ return get_statsbeat_initial_failure_count() >= _STATSBEAT_FAILURE_COUNT_THRESHOLD
+
+
+def get_statsbeat_initial_failure_count():
+ return _STATSBEAT_STATE["INITIAL_FAILURE_COUNT"]
+
+
+def set_statsbeat_initial_success(success):
+ with _STATSBEAT_STATE_LOCK:
+ _STATSBEAT_STATE["INITIAL_SUCCESS"] = success
+
+
+def get_statsbeat_initial_success():
+ return _STATSBEAT_STATE["INITIAL_SUCCESS"]
+
+
+def get_statsbeat_shutdown():
+ return _STATSBEAT_STATE["SHUTDOWN"]
+
+
+def get_statsbeat_custom_events_feature_set():
+ return _STATSBEAT_STATE["CUSTOM_EVENTS_FEATURE_SET"]
+
+
+def set_statsbeat_custom_events_feature_set():
+ with _STATSBEAT_STATE_LOCK:
+ _STATSBEAT_STATE["CUSTOM_EVENTS_FEATURE_SET"] = True
+
+
+def get_statsbeat_live_metrics_feature_set():
+ return _STATSBEAT_STATE["LIVE_METRICS_FEATURE_SET"]
+
+
+def set_statsbeat_live_metrics_feature_set():
+ with _STATSBEAT_STATE_LOCK:
+ _STATSBEAT_STATE["LIVE_METRICS_FEATURE_SET"] = True
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat.py
new file mode 100644
index 00000000..e6dcee0c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat.py
@@ -0,0 +1,77 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+import threading
+
+from opentelemetry.sdk.metrics import MeterProvider
+from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
+from opentelemetry.sdk.resources import Resource
+
+from azure.monitor.opentelemetry.exporter.statsbeat._exporter import _StatsBeatExporter
+from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat_metrics import _StatsbeatMetrics
+from azure.monitor.opentelemetry.exporter.statsbeat._state import (
+ _STATSBEAT_STATE,
+ _STATSBEAT_STATE_LOCK,
+)
+from azure.monitor.opentelemetry.exporter.statsbeat._utils import (
+ _get_stats_connection_string,
+ _get_stats_long_export_interval,
+ _get_stats_short_export_interval,
+)
+
+
+_STATSBEAT_METRICS = None
+_STATSBEAT_LOCK = threading.Lock()
+
+
+# pylint: disable=global-statement
+# pylint: disable=protected-access
+def collect_statsbeat_metrics(exporter) -> None:
+ global _STATSBEAT_METRICS
+ # Only start statsbeat if did not exist before
+ if _STATSBEAT_METRICS is None:
+ with _STATSBEAT_LOCK:
+ statsbeat_exporter = _StatsBeatExporter(
+ connection_string=_get_stats_connection_string(exporter._endpoint),
+ disable_offline_storage=exporter._disable_offline_storage,
+ )
+ reader = PeriodicExportingMetricReader(
+ statsbeat_exporter,
+ export_interval_millis=_get_stats_short_export_interval() * 1000, # 15m by default
+ )
+ mp = MeterProvider(
+ metric_readers=[reader],
+ resource=Resource.get_empty(),
+ )
+ # long_interval_threshold represents how many collects for short interval
+ # should have passed before a long interval collect
+ long_interval_threshold = _get_stats_long_export_interval() // _get_stats_short_export_interval()
+ _STATSBEAT_METRICS = _StatsbeatMetrics(
+ mp,
+ exporter._instrumentation_key,
+ exporter._endpoint,
+ exporter._disable_offline_storage,
+ long_interval_threshold,
+ exporter._credential is not None,
+ exporter._distro_version,
+ )
+ # Export some initial stats on program start
+ mp.force_flush()
+ # initialize non-initial stats
+ _STATSBEAT_METRICS.init_non_initial_metrics()
+
+
+def shutdown_statsbeat_metrics() -> None:
+ global _STATSBEAT_METRICS
+ shutdown_success = False
+ if _STATSBEAT_METRICS is not None:
+ with _STATSBEAT_LOCK:
+ try:
+ if _STATSBEAT_METRICS._meter_provider is not None:
+ _STATSBEAT_METRICS._meter_provider.shutdown()
+ _STATSBEAT_METRICS = None
+ shutdown_success = True
+ except: # pylint: disable=bare-except
+ pass
+ if shutdown_success:
+ with _STATSBEAT_STATE_LOCK:
+ _STATSBEAT_STATE["SHUTDOWN"] = True
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py
new file mode 100644
index 00000000..db84e01b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py
@@ -0,0 +1,417 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+from enum import Enum
+import json
+import os
+import platform
+import re
+import sys
+import threading
+from typing import Any, Dict, Iterable, List
+
+import requests # pylint: disable=networking-import-outside-azure-core-transport
+
+from opentelemetry.metrics import CallbackOptions, Observation
+from opentelemetry.sdk.metrics import MeterProvider
+
+from azure.monitor.opentelemetry.exporter import VERSION
+from azure.monitor.opentelemetry.exporter._constants import (
+ _ATTACH_METRIC_NAME,
+ _FEATURE_METRIC_NAME,
+ _REQ_DURATION_NAME,
+ _REQ_EXCEPTION_NAME,
+ _REQ_FAILURE_NAME,
+ _REQ_RETRY_NAME,
+ _REQ_SUCCESS_NAME,
+ _REQ_THROTTLE_NAME,
+ _WEBSITE_HOME_STAMPNAME,
+ _WEBSITE_HOSTNAME,
+ _WEBSITE_SITE_NAME,
+ _AKS_ARM_NAMESPACE_ID,
+)
+from azure.monitor.opentelemetry.exporter.statsbeat._state import (
+ _REQUESTS_MAP_LOCK,
+ _REQUESTS_MAP,
+ get_statsbeat_live_metrics_feature_set,
+ get_statsbeat_custom_events_feature_set,
+)
+from azure.monitor.opentelemetry.exporter import _utils
+
+# cSpell:disable
+
+_AIMS_URI = "http://169.254.169.254/metadata/instance/compute"
+_AIMS_API_VERSION = "api-version=2017-12-01"
+_AIMS_FORMAT = "format=json"
+
+_ENDPOINT_TYPES = ["breeze"]
+
+
+class _RP_Names(Enum):
+ APP_SERVICE = "appsvc"
+ FUNCTIONS = "functions"
+ AKS = "aks"
+ VM = "vm"
+ UNKNOWN = "unknown"
+
+
+_HOST_PATTERN = re.compile("^https?://(?:www\\.)?([^/.]+)")
+
+
+class _FEATURE_TYPES:
+ FEATURE = 0
+ INSTRUMENTATION = 1
+
+
+class _StatsbeatFeature:
+ NONE = 0
+ DISK_RETRY = 1
+ AAD = 2
+ CUSTOM_EVENTS_EXTENSION = 4
+ DISTRO = 8
+ LIVE_METRICS = 16
+
+
+class _AttachTypes:
+ MANUAL = "Manual"
+ INTEGRATED = "IntegratedAuto"
+ STANDALONE = "StandaloneAuto"
+
+
+# pylint: disable=R0902
+class _StatsbeatMetrics:
+
+ _COMMON_ATTRIBUTES: Dict[str, Any] = {
+ "rp": _RP_Names.UNKNOWN.value,
+ "attach": _AttachTypes.MANUAL,
+ "cikey": None,
+ "runtimeVersion": platform.python_version(),
+ "os": platform.system(),
+ "language": "python",
+ "version": VERSION,
+ }
+
+ _NETWORK_ATTRIBUTES: Dict[str, Any] = {
+ "endpoint": _ENDPOINT_TYPES[0], # breeze
+ "host": None,
+ }
+
+ _FEATURE_ATTRIBUTES: Dict[str, Any] = {
+ "feature": None, # 64-bit long, bits represent features enabled
+ "type": _FEATURE_TYPES.FEATURE,
+ }
+
+ _INSTRUMENTATION_ATTRIBUTES: Dict[str, Any] = {
+ "feature": 0, # 64-bit long, bits represent instrumentations used
+ "type": _FEATURE_TYPES.INSTRUMENTATION,
+ }
+
+ def __init__(
+ self,
+ meter_provider: MeterProvider,
+ instrumentation_key: str,
+ endpoint: str,
+ disable_offline_storage: bool,
+ long_interval_threshold: int,
+ has_credential: bool,
+ distro_version: str = "",
+ ) -> None:
+ self._ikey = instrumentation_key
+ self._feature = _StatsbeatFeature.NONE
+ if not disable_offline_storage:
+ self._feature |= _StatsbeatFeature.DISK_RETRY
+ if has_credential:
+ self._feature |= _StatsbeatFeature.AAD
+ if distro_version:
+ self._feature |= _StatsbeatFeature.DISTRO
+ if get_statsbeat_custom_events_feature_set():
+ self._feature |= _StatsbeatFeature.CUSTOM_EVENTS_EXTENSION
+ if get_statsbeat_live_metrics_feature_set():
+ self._feature |= _StatsbeatFeature.LIVE_METRICS
+ self._ikey = instrumentation_key
+ self._meter_provider = meter_provider
+ self._meter = self._meter_provider.get_meter(__name__)
+ self._long_interval_threshold = long_interval_threshold
+ # Start internal count at the max size for initial statsbeat export
+ self._long_interval_count_map = {
+ _ATTACH_METRIC_NAME[0]: sys.maxsize,
+ _FEATURE_METRIC_NAME[0]: sys.maxsize,
+ }
+ self._long_interval_lock = threading.Lock()
+ _StatsbeatMetrics._COMMON_ATTRIBUTES["cikey"] = instrumentation_key
+ if _utils._is_attach_enabled():
+ _StatsbeatMetrics._COMMON_ATTRIBUTES["attach"] = _AttachTypes.INTEGRATED
+ _StatsbeatMetrics._NETWORK_ATTRIBUTES["host"] = _shorten_host(endpoint)
+ _StatsbeatMetrics._FEATURE_ATTRIBUTES["feature"] = self._feature
+ _StatsbeatMetrics._INSTRUMENTATION_ATTRIBUTES["feature"] = _utils.get_instrumentations()
+
+ self._vm_retry = True # True if we want to attempt to find if in VM
+ self._vm_data: Dict[str, str] = {}
+
+ # Initial metrics - metrics exported on application start
+
+ # Attach metrics - metrics related to identifying which rp is application being run in
+ self._attach_metric = self._meter.create_observable_gauge(
+ _ATTACH_METRIC_NAME[0],
+ callbacks=[self._get_attach_metric],
+ unit="",
+ description="Statsbeat metric tracking tracking rp information",
+ )
+
+ # Feature metrics - metrics related to features/instrumentations being used
+ self._feature_metric = self._meter.create_observable_gauge(
+ _FEATURE_METRIC_NAME[0],
+ callbacks=[self._get_feature_metric],
+ unit="",
+ description="Statsbeat metric tracking tracking enabled features",
+ )
+
+ # pylint: disable=unused-argument
+ # pylint: disable=protected-access
+ def _get_attach_metric(self, options: CallbackOptions) -> Iterable[Observation]:
+ observations: List[Observation] = []
+ # Check if it is time to observe long interval metrics
+ if not self._meets_long_interval_threshold(_ATTACH_METRIC_NAME[0]):
+ return observations
+ rp = ""
+ rpId = ""
+ os_type = platform.system()
+ # rp, rpId
+ if _utils._is_on_app_service():
+ # Web apps
+ rp = _RP_Names.APP_SERVICE.value
+ rpId = "{}/{}".format(os.environ.get(_WEBSITE_SITE_NAME), os.environ.get(_WEBSITE_HOME_STAMPNAME, ""))
+ elif _utils._is_on_functions():
+ # Function apps
+ rp = _RP_Names.FUNCTIONS.value
+ rpId = os.environ.get(_WEBSITE_HOSTNAME, "")
+ elif _utils._is_on_aks():
+ # AKS
+ rp = _RP_Names.AKS.value
+ rpId = os.environ.get(_AKS_ARM_NAMESPACE_ID, "")
+ elif self._vm_retry and self._get_azure_compute_metadata():
+ # VM
+ rp = _RP_Names.VM.value
+ rpId = "{}/{}".format(self._vm_data.get("vmId", ""), self._vm_data.get("subscriptionId", ""))
+ os_type = self._vm_data.get("osType", "")
+ else:
+ # Not in any rp or VM metadata failed
+ rp = _RP_Names.UNKNOWN.value
+ rpId = _RP_Names.UNKNOWN.value
+
+ _StatsbeatMetrics._COMMON_ATTRIBUTES["rp"] = rp
+ _StatsbeatMetrics._COMMON_ATTRIBUTES["os"] = os_type or platform.system()
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes["rpId"] = rpId
+ observations.append(Observation(1, dict(attributes))) # type: ignore
+ return observations
+
+ def _get_azure_compute_metadata(self) -> bool:
+ try:
+ request_url = "{0}?{1}&{2}".format(_AIMS_URI, _AIMS_API_VERSION, _AIMS_FORMAT)
+ response = requests.get(request_url, headers={"MetaData": "True"}, timeout=0.2)
+ except (requests.exceptions.ConnectionError, requests.Timeout):
+ # Not in VM
+ self._vm_retry = False
+ return False
+ except requests.exceptions.RequestException:
+ self._vm_retry = True # retry
+ return False
+
+ try:
+ text = response.text
+ self._vm_data = json.loads(text)
+ except Exception: # pylint: disable=broad-except
+ # Error in reading response body, retry
+ self._vm_retry = True
+ return False
+
+ # Vm data is perpetually updated
+ self._vm_retry = True
+ return True
+
+ # pylint: disable=unused-argument
+ def _get_feature_metric(self, options: CallbackOptions) -> Iterable[Observation]:
+ observations: List[Observation] = []
+ # Check if it is time to observe long interval metrics
+ if not self._meets_long_interval_threshold(_FEATURE_METRIC_NAME[0]):
+ return observations
+ # Feature metric
+ # Check if any features were enabled during runtime
+ if get_statsbeat_custom_events_feature_set():
+ self._feature |= _StatsbeatFeature.CUSTOM_EVENTS_EXTENSION
+ _StatsbeatMetrics._FEATURE_ATTRIBUTES["feature"] = self._feature
+ if get_statsbeat_live_metrics_feature_set():
+ self._feature |= _StatsbeatFeature.LIVE_METRICS
+ _StatsbeatMetrics._FEATURE_ATTRIBUTES["feature"] = self._feature
+
+ # Don't send observation if no features enabled
+ if self._feature is not _StatsbeatFeature.NONE:
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes.update(_StatsbeatMetrics._FEATURE_ATTRIBUTES) # type: ignore
+ observations.append(Observation(1, dict(attributes))) # type: ignore
+
+ # instrumentation metric
+ # Don't send observation if no instrumentations enabled
+ instrumentation_bits = _utils.get_instrumentations()
+ if instrumentation_bits != 0:
+ _StatsbeatMetrics._INSTRUMENTATION_ATTRIBUTES["feature"] = instrumentation_bits
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes.update(_StatsbeatMetrics._INSTRUMENTATION_ATTRIBUTES) # type: ignore
+ observations.append(Observation(1, dict(attributes))) # type: ignore
+
+ return observations
+
+ def _meets_long_interval_threshold(self, name) -> bool:
+ with self._long_interval_lock:
+ # if long interval theshold not met, it is not time to export
+ # statsbeat metrics that are long intervals
+ count = self._long_interval_count_map.get(name, sys.maxsize)
+ if count < self._long_interval_threshold:
+ return False
+ # reset the count if long interval theshold is met
+ self._long_interval_count_map[name] = 0
+ return True
+
+ # pylint: disable=W0201
+ def init_non_initial_metrics(self):
+ # Network metrics - metrics related to request calls to ingestion service
+ self._success_count = self._meter.create_observable_gauge(
+ _REQ_SUCCESS_NAME[0],
+ callbacks=[self._get_success_count],
+ unit="count",
+ description="Statsbeat metric tracking request success count",
+ )
+ self._failure_count = self._meter.create_observable_gauge(
+ _REQ_FAILURE_NAME[0],
+ callbacks=[self._get_failure_count],
+ unit="count",
+ description="Statsbeat metric tracking request failure count",
+ )
+ self._retry_count = self._meter.create_observable_gauge(
+ _REQ_RETRY_NAME[0],
+ callbacks=[self._get_retry_count],
+ unit="count",
+ description="Statsbeat metric tracking request retry count",
+ )
+ self._throttle_count = self._meter.create_observable_gauge(
+ _REQ_THROTTLE_NAME[0],
+ callbacks=[self._get_throttle_count],
+ unit="count",
+ description="Statsbeat metric tracking request throttle count",
+ )
+ self._exception_count = self._meter.create_observable_gauge(
+ _REQ_EXCEPTION_NAME[0],
+ callbacks=[self._get_exception_count],
+ unit="count",
+ description="Statsbeat metric tracking request exception count",
+ )
+ self._average_duration = self._meter.create_observable_gauge(
+ _REQ_DURATION_NAME[0],
+ callbacks=[self._get_average_duration],
+ unit="avg",
+ description="Statsbeat metric tracking average request duration",
+ )
+
+ # pylint: disable=unused-argument
+ def _get_success_count(self, options: CallbackOptions) -> Iterable[Observation]:
+ # get_success_count is special in such that it is the indicator of when
+ # a short interval collection has happened, which is why we increment
+ # the long_interval_count when it is called
+ with self._long_interval_lock:
+ for name, count in self._long_interval_count_map.items():
+ self._long_interval_count_map[name] = count + 1
+ observations = []
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes.update(_StatsbeatMetrics._NETWORK_ATTRIBUTES)
+ attributes["statusCode"] = 200
+ with _REQUESTS_MAP_LOCK:
+ # only observe if value is not 0
+ count = _REQUESTS_MAP.get(_REQ_SUCCESS_NAME[1], 0) # type: ignore
+ if count != 0:
+ observations.append(Observation(int(count), dict(attributes)))
+ _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 0
+ return observations
+
+ # pylint: disable=unused-argument
+ def _get_failure_count(self, options: CallbackOptions) -> Iterable[Observation]:
+ observations = []
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes.update(_StatsbeatMetrics._NETWORK_ATTRIBUTES)
+ with _REQUESTS_MAP_LOCK:
+ for code, count in _REQUESTS_MAP.get(_REQ_FAILURE_NAME[1], {}).items(): # type: ignore
+ # only observe if value is not 0
+ if count != 0:
+ attributes["statusCode"] = code
+ observations.append(Observation(int(count), dict(attributes)))
+ _REQUESTS_MAP[_REQ_FAILURE_NAME[1]][code] = 0 # type: ignore
+ return observations
+
+ # pylint: disable=unused-argument
+ def _get_average_duration(self, options: CallbackOptions) -> Iterable[Observation]:
+ observations = []
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes.update(_StatsbeatMetrics._NETWORK_ATTRIBUTES)
+ with _REQUESTS_MAP_LOCK:
+ interval_duration = _REQUESTS_MAP.get(_REQ_DURATION_NAME[1], 0)
+ interval_count = _REQUESTS_MAP.get("count", 0)
+ # only observe if value is not 0
+ if interval_duration > 0 and interval_count > 0: # type: ignore
+ result = interval_duration / interval_count # type: ignore
+ observations.append(Observation(result * 1000, dict(attributes)))
+ _REQUESTS_MAP[_REQ_DURATION_NAME[1]] = 0
+ _REQUESTS_MAP["count"] = 0
+ return observations
+
+ # pylint: disable=unused-argument
+ def _get_retry_count(self, options: CallbackOptions) -> Iterable[Observation]:
+ observations = []
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes.update(_StatsbeatMetrics._NETWORK_ATTRIBUTES)
+ with _REQUESTS_MAP_LOCK:
+ for code, count in _REQUESTS_MAP.get(_REQ_RETRY_NAME[1], {}).items(): # type: ignore
+ # only observe if value is not 0
+ if count != 0:
+ attributes["statusCode"] = code
+ observations.append(Observation(int(count), dict(attributes)))
+ _REQUESTS_MAP[_REQ_RETRY_NAME[1]][code] = 0 # type: ignore
+ return observations
+
+ # pylint: disable=unused-argument
+ def _get_throttle_count(self, options: CallbackOptions) -> Iterable[Observation]:
+ observations = []
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes.update(_StatsbeatMetrics._NETWORK_ATTRIBUTES)
+ with _REQUESTS_MAP_LOCK:
+ for code, count in _REQUESTS_MAP.get(_REQ_THROTTLE_NAME[1], {}).items(): # type: ignore
+ # only observe if value is not 0
+ if count != 0:
+ attributes["statusCode"] = code
+ observations.append(Observation(int(count), dict(attributes)))
+ _REQUESTS_MAP[_REQ_THROTTLE_NAME[1]][code] = 0 # type: ignore
+ return observations
+
+ # pylint: disable=unused-argument
+ def _get_exception_count(self, options: CallbackOptions) -> Iterable[Observation]:
+ observations = []
+ attributes = dict(_StatsbeatMetrics._COMMON_ATTRIBUTES)
+ attributes.update(_StatsbeatMetrics._NETWORK_ATTRIBUTES)
+ with _REQUESTS_MAP_LOCK:
+ for code, count in _REQUESTS_MAP.get(_REQ_EXCEPTION_NAME[1], {}).items(): # type: ignore
+ # only observe if value is not 0
+ if count != 0:
+ attributes["exceptionType"] = code
+ observations.append(Observation(int(count), dict(attributes)))
+ _REQUESTS_MAP[_REQ_EXCEPTION_NAME[1]][code] = 0 # type: ignore
+ return observations
+
+
+def _shorten_host(host: str) -> str:
+ if not host:
+ host = ""
+ match = _HOST_PATTERN.match(host)
+ if match:
+ host = match.group(1)
+ return host
+
+
+# cSpell:enable
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py
new file mode 100644
index 00000000..d1607c12
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py
@@ -0,0 +1,69 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+import os
+
+from azure.monitor.opentelemetry.exporter._constants import (
+ _APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME,
+ _APPLICATIONINSIGHTS_STATS_LONG_EXPORT_INTERVAL_ENV_NAME,
+ _APPLICATIONINSIGHTS_STATS_SHORT_EXPORT_INTERVAL_ENV_NAME,
+ _DEFAULT_NON_EU_STATS_CONNECTION_STRING,
+ _DEFAULT_EU_STATS_CONNECTION_STRING,
+ _DEFAULT_STATS_SHORT_EXPORT_INTERVAL,
+ _DEFAULT_STATS_LONG_EXPORT_INTERVAL,
+ _EU_ENDPOINTS,
+ _REQ_DURATION_NAME,
+ _REQ_SUCCESS_NAME,
+)
+from azure.monitor.opentelemetry.exporter.statsbeat._state import (
+ _REQUESTS_MAP_LOCK,
+ _REQUESTS_MAP,
+)
+
+
+def _get_stats_connection_string(endpoint: str) -> str:
+ cs_env = os.environ.get(_APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME)
+ if cs_env:
+ return cs_env
+ for endpoint_location in _EU_ENDPOINTS:
+ if endpoint_location in endpoint:
+ # Use statsbeat EU endpoint if user is in EU region
+ return _DEFAULT_EU_STATS_CONNECTION_STRING
+ return _DEFAULT_NON_EU_STATS_CONNECTION_STRING
+
+
+# seconds
+def _get_stats_short_export_interval() -> int:
+ ei_env = os.environ.get(_APPLICATIONINSIGHTS_STATS_SHORT_EXPORT_INTERVAL_ENV_NAME)
+ if ei_env:
+ try:
+ return int(ei_env)
+ except ValueError:
+ return _DEFAULT_STATS_SHORT_EXPORT_INTERVAL
+ return _DEFAULT_STATS_SHORT_EXPORT_INTERVAL
+
+
+# seconds
+def _get_stats_long_export_interval() -> int:
+ ei_env = os.environ.get(_APPLICATIONINSIGHTS_STATS_LONG_EXPORT_INTERVAL_ENV_NAME)
+ if ei_env:
+ try:
+ return int(ei_env)
+ except ValueError:
+ return _DEFAULT_STATS_LONG_EXPORT_INTERVAL
+ return _DEFAULT_STATS_LONG_EXPORT_INTERVAL
+
+
+def _update_requests_map(type_name, value):
+ # value can be either a count, duration, status_code or exc_name
+ with _REQUESTS_MAP_LOCK:
+ # Mapping is {type_name: count/duration}
+ if type_name in (_REQ_SUCCESS_NAME[1], "count", _REQ_DURATION_NAME[1]): # success, count, duration
+ _REQUESTS_MAP[type_name] = _REQUESTS_MAP.get(type_name, 0) + value
+ else: # exception, failure, retry, throttle
+ prev = 0
+ # Mapping is {type_name: {value: count}
+ if _REQUESTS_MAP.get(type_name):
+ prev = _REQUESTS_MAP.get(type_name).get(value, 0)
+ else:
+ _REQUESTS_MAP[type_name] = {}
+ _REQUESTS_MAP[type_name][value] = prev + 1