aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/_base.py435
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py244
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py291
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py553
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py98
-rw-r--r--.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_utils.py321
10 files changed, 1942 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/_base.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/_base.py
new file mode 100644
index 00000000..d3da36ba
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/_base.py
@@ -0,0 +1,435 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+import logging
+import os
+import tempfile
+import time
+from enum import Enum
+from typing import List, Optional, Any
+from urllib.parse import urlparse
+
+from azure.core.exceptions import HttpResponseError, ServiceRequestError
+from azure.core.pipeline.policies import (
+ ContentDecodePolicy,
+ HttpLoggingPolicy,
+ RedirectPolicy,
+ RequestIdPolicy,
+)
+from azure.monitor.opentelemetry.exporter._generated import AzureMonitorClient
+from azure.monitor.opentelemetry.exporter._generated._configuration import AzureMonitorClientConfiguration
+from azure.monitor.opentelemetry.exporter._generated.models import (
+ MessageData,
+ MetricsData,
+ MonitorDomain,
+ RemoteDependencyData,
+ RequestData,
+ TelemetryEventData,
+ TelemetryExceptionData,
+ TelemetryItem,
+)
+from azure.monitor.opentelemetry.exporter._constants import (
+ _AZURE_MONITOR_DISTRO_VERSION_ARG,
+ _INVALID_STATUS_CODES,
+ _REACHED_INGESTION_STATUS_CODES,
+ _REDIRECT_STATUS_CODES,
+ _REQ_DURATION_NAME,
+ _REQ_EXCEPTION_NAME,
+ _REQ_FAILURE_NAME,
+ _REQ_RETRY_NAME,
+ _REQ_SUCCESS_NAME,
+ _REQ_THROTTLE_NAME,
+ _RETRYABLE_STATUS_CODES,
+ _THROTTLE_STATUS_CODES,
+)
+from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
+from azure.monitor.opentelemetry.exporter._storage import LocalFileStorage
+from azure.monitor.opentelemetry.exporter._utils import _get_auth_policy
+from azure.monitor.opentelemetry.exporter.statsbeat._state import (
+ get_statsbeat_initial_success,
+ get_statsbeat_shutdown,
+ increment_and_check_statsbeat_failure_count,
+ is_statsbeat_enabled,
+ set_statsbeat_initial_success,
+)
+from azure.monitor.opentelemetry.exporter.statsbeat._utils import _update_requests_map
+
+logger = logging.getLogger(__name__)
+
+_AZURE_TEMPDIR_PREFIX = "Microsoft/AzureMonitor"
+_TEMPDIR_PREFIX = "opentelemetry-python-"
+_SERVICE_API_LATEST = "2020-09-15_Preview"
+
+
+class ExportResult(Enum):
+ SUCCESS = 0
+ FAILED_RETRYABLE = 1
+ FAILED_NOT_RETRYABLE = 2
+
+
+# pylint: disable=broad-except
+# pylint: disable=too-many-instance-attributes
+# pylint: disable=C0301
+class BaseExporter:
+ """Azure Monitor base exporter for OpenTelemetry."""
+
+ def __init__(self, **kwargs: Any) -> None:
+ """Azure Monitor base exporter for OpenTelemetry.
+
+ :keyword str api_version: The service API version used. Defaults to latest.
+ :keyword str connection_string: The connection string used for your Application Insights resource.
+ :keyword ManagedIdentityCredential/ClientSecretCredential credential: Token credential, such as ManagedIdentityCredential or ClientSecretCredential, used for Azure Active Directory (AAD) authentication. Defaults to None.
+ :keyword bool disable_offline_storage: Determines whether to disable storing failed telemetry records for retry. Defaults to `False`.
+ :keyword str storage_directory: Storage path in which to store retry files. Defaults to `<tempfile.gettempdir()>/opentelemetry-python-<your-instrumentation-key>`.
+ :rtype: None
+ """
+ parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string"))
+
+ self._api_version = kwargs.get("api_version") or _SERVICE_API_LATEST
+ self._credential = kwargs.get("credential")
+ self._consecutive_redirects = 0 # To prevent circular redirects
+ self._disable_offline_storage = kwargs.get("disable_offline_storage", False)
+ self._endpoint = parsed_connection_string.endpoint
+ self._instrumentation_key = parsed_connection_string.instrumentation_key
+ self._aad_audience = parsed_connection_string.aad_audience
+ self._storage_maintenance_period = kwargs.get(
+ "storage_maintenance_period", 60
+ ) # Maintenance interval in seconds.
+ self._storage_max_size = kwargs.get(
+ "storage_max_size", 50 * 1024 * 1024
+ ) # Maximum size in bytes (default 50MiB)
+ self._storage_min_retry_interval = kwargs.get(
+ "storage_min_retry_interval", 60
+ ) # minimum retry interval in seconds
+ temp_suffix = self._instrumentation_key or ""
+ if "storage_directory" in kwargs:
+ self._storage_directory = kwargs.get("storage_directory")
+ elif not self._disable_offline_storage:
+ self._storage_directory = os.path.join(
+ tempfile.gettempdir(), _AZURE_TEMPDIR_PREFIX, _TEMPDIR_PREFIX + temp_suffix
+ )
+ else:
+ self._storage_directory = None
+ self._storage_retention_period = kwargs.get(
+ "storage_retention_period", 48 * 60 * 60
+ ) # Retention period in seconds (default 48 hrs)
+ self._timeout = kwargs.get("timeout", 10.0) # networking timeout in seconds
+ self._distro_version = kwargs.get(
+ _AZURE_MONITOR_DISTRO_VERSION_ARG, ""
+ ) # If set, indicates the exporter is instantiated via Azure monitor OpenTelemetry distro. Versions corresponds to distro version.
+
+ config = AzureMonitorClientConfiguration(self._endpoint, **kwargs)
+ policies = [
+ RequestIdPolicy(**kwargs),
+ config.headers_policy,
+ config.user_agent_policy,
+ config.proxy_policy,
+ ContentDecodePolicy(**kwargs),
+ # Handle redirects in exporter, set new endpoint if redirected
+ RedirectPolicy(permit_redirects=False),
+ config.retry_policy,
+ _get_auth_policy(self._credential, config.authentication_policy, self._aad_audience),
+ config.custom_hook_policy,
+ config.logging_policy,
+ # Explicitly disabling to avoid infinite loop of Span creation when data is exported
+ # DistributedTracingPolicy(**kwargs),
+ config.http_logging_policy or HttpLoggingPolicy(**kwargs),
+ ]
+
+ self.client = AzureMonitorClient(
+ host=self._endpoint, connection_timeout=self._timeout, policies=policies, **kwargs
+ )
+ self.storage = None
+ if not self._disable_offline_storage:
+ self.storage = LocalFileStorage(
+ path=self._storage_directory,
+ max_size=self._storage_max_size,
+ maintenance_period=self._storage_maintenance_period,
+ retention_period=self._storage_retention_period,
+ name="{} Storage".format(self.__class__.__name__),
+ lease_period=self._storage_min_retry_interval,
+ )
+ # specifies whether current exporter is used for collection of instrumentation metrics
+ self._instrumentation_collection = kwargs.get("instrumentation_collection", False)
+ # statsbeat initialization
+ if self._should_collect_stats():
+ # Import here to avoid circular dependencies
+ from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
+
+ collect_statsbeat_metrics(self)
+
+ def _transmit_from_storage(self) -> None:
+ if not self.storage:
+ return
+ for blob in self.storage.gets():
+ # give a few more seconds for blob lease operation
+ # to reduce the chance of race (for perf consideration)
+ if blob.lease(self._timeout + 5):
+ envelopes = [_format_storage_telemetry_item(TelemetryItem.from_dict(x)) for x in blob.get()]
+ result = self._transmit(envelopes)
+ if result == ExportResult.FAILED_RETRYABLE:
+ blob.lease(1)
+ else:
+ blob.delete()
+
+ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None:
+ if self.storage:
+ if result == ExportResult.FAILED_RETRYABLE:
+ envelopes_to_store = [x.as_dict() for x in envelopes]
+ self.storage.put(envelopes_to_store)
+ elif result == ExportResult.SUCCESS:
+ # Try to send any cached events
+ self._transmit_from_storage()
+
+ # pylint: disable=too-many-branches
+ # pylint: disable=too-many-nested-blocks
+ # pylint: disable=too-many-statements
+ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
+ """
+ Transmit the data envelopes to the ingestion service.
+
+ Returns an ExportResult, this function should never
+ throw an exception.
+ :param envelopes: The list of telemetry items to transmit.
+ :type envelopes: list of ~azure.monitor.opentelemetry.exporter._generated.models.TelemetryItem
+ :return: The result of the export.
+ :rtype: ~azure.monitor.opentelemetry.exporter.export._base._ExportResult
+ """
+ if len(envelopes) > 0:
+ result = ExportResult.SUCCESS
+ # Track whether or not exporter has successfully reached ingestion
+ # Currently only used for statsbeat exporter to detect shutdown cases
+ reach_ingestion = False
+ start_time = time.time()
+ try:
+ track_response = self.client.track(envelopes)
+ if not track_response.errors: # 200
+ self._consecutive_redirects = 0
+ if not self._is_stats_exporter():
+ logger.info(
+ "Transmission succeeded: Item received: %s. Items accepted: %s",
+ track_response.items_received,
+ track_response.items_accepted,
+ )
+ if self._should_collect_stats():
+ _update_requests_map(_REQ_SUCCESS_NAME[1], 1)
+ reach_ingestion = True
+ result = ExportResult.SUCCESS
+ else: # 206
+ reach_ingestion = True
+ resend_envelopes = []
+ for error in track_response.errors:
+ if _is_retryable_code(error.status_code):
+ resend_envelopes.append(envelopes[error.index]) # type: ignore
+ else:
+ if not self._is_stats_exporter():
+ logger.error(
+ "Data drop %s: %s %s.",
+ error.status_code,
+ error.message,
+ envelopes[error.index] if error.index is not None else "",
+ )
+ if self.storage and resend_envelopes:
+ envelopes_to_store = [x.as_dict() for x in resend_envelopes]
+ self.storage.put(envelopes_to_store, 0)
+ self._consecutive_redirects = 0
+ # Mark as not retryable because we already write to storage here
+ result = ExportResult.FAILED_NOT_RETRYABLE
+ except HttpResponseError as response_error:
+ # HttpResponseError is raised when a response is received
+ if _reached_ingestion_code(response_error.status_code):
+ reach_ingestion = True
+ if _is_retryable_code(response_error.status_code):
+ if self._should_collect_stats():
+ _update_requests_map(_REQ_RETRY_NAME[1], value=response_error.status_code)
+ result = ExportResult.FAILED_RETRYABLE
+ elif _is_throttle_code(response_error.status_code):
+ if self._should_collect_stats():
+ _update_requests_map(_REQ_THROTTLE_NAME[1], value=response_error.status_code)
+ result = ExportResult.FAILED_NOT_RETRYABLE
+ elif _is_redirect_code(response_error.status_code):
+ self._consecutive_redirects = self._consecutive_redirects + 1
+ # pylint: disable=W0212
+ if self._consecutive_redirects < self.client._config.redirect_policy.max_redirects: # type: ignore
+ if response_error.response and response_error.response.headers: # type: ignore
+ redirect_has_headers = True
+ location = response_error.response.headers.get("location") # type: ignore
+ url = urlparse(location)
+ else:
+ redirect_has_headers = False
+ if redirect_has_headers and url.scheme and url.netloc: # pylint: disable=E0606
+ # Change the host to the new redirected host
+ self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212
+ # Attempt to export again
+ result = self._transmit(envelopes)
+ else:
+ if not self._is_stats_exporter():
+ logger.error(
+ "Error parsing redirect information.",
+ )
+ result = ExportResult.FAILED_NOT_RETRYABLE
+ else:
+ if not self._is_stats_exporter():
+ logger.error(
+ "Error sending telemetry because of circular redirects. "
+ "Please check the integrity of your connection string."
+ )
+ # If redirect but did not return, exception occurred
+ if self._should_collect_stats():
+ _update_requests_map(_REQ_EXCEPTION_NAME[1], value="Circular Redirect")
+ result = ExportResult.FAILED_NOT_RETRYABLE
+ else:
+ # Any other status code counts as failure (non-retryable)
+ # 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey, etc.)
+ # 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string
+ if self._should_collect_stats():
+ _update_requests_map(_REQ_FAILURE_NAME[1], value=response_error.status_code)
+ if not self._is_stats_exporter():
+ logger.error(
+ "Non-retryable server side error: %s.",
+ response_error.message,
+ )
+ if _is_invalid_code(response_error.status_code):
+ # Shutdown statsbeat on invalid code from customer endpoint
+ # Import here to avoid circular dependencies
+ from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import (
+ shutdown_statsbeat_metrics,
+ )
+
+ shutdown_statsbeat_metrics()
+ result = ExportResult.FAILED_NOT_RETRYABLE
+ except ServiceRequestError as request_error:
+ # Errors when we're fairly sure that the server did not receive the
+ # request, so it should be safe to retry.
+ # ServiceRequestError is raised by azure.core for these cases
+ logger.warning("Retrying due to server request error: %s.", request_error.message)
+ if self._should_collect_stats():
+ exc_type = request_error.exc_type
+ if exc_type is None or exc_type is type(None):
+ exc_type = request_error.__class__.__name__ # type: ignore
+ _update_requests_map(_REQ_EXCEPTION_NAME[1], value=exc_type)
+ result = ExportResult.FAILED_RETRYABLE
+ except Exception as ex:
+ logger.exception("Envelopes could not be exported and are not retryable: %s.")
+ if self._should_collect_stats():
+ _update_requests_map(_REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__)
+ result = ExportResult.FAILED_NOT_RETRYABLE
+ finally:
+ if self._should_collect_stats():
+ end_time = time.time()
+ _update_requests_map("count", 1)
+ _update_requests_map(_REQ_DURATION_NAME[1], value=end_time - start_time)
+ if self._is_statsbeat_initializing_state():
+ # Update statsbeat initial success state if reached ingestion
+ if reach_ingestion:
+ set_statsbeat_initial_success(True)
+ else:
+ # if didn't reach ingestion, increment and check if failure threshold
+ # has been reached during attempting statsbeat initialization
+ if increment_and_check_statsbeat_failure_count():
+ # Import here to avoid circular dependencies
+ from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import (
+ shutdown_statsbeat_metrics,
+ )
+
+ shutdown_statsbeat_metrics()
+ # pylint: disable=lost-exception
+ return ExportResult.FAILED_NOT_RETRYABLE # pylint: disable=W0134
+ # pylint: disable=lost-exception
+ return result # pylint: disable=W0134
+
+ # No spans to export
+ self._consecutive_redirects = 0
+ return ExportResult.SUCCESS
+
+ # check to see whether its the case of stats collection
+ def _should_collect_stats(self):
+ return (
+ is_statsbeat_enabled()
+ and not get_statsbeat_shutdown()
+ and not self._is_stats_exporter()
+ and not self._instrumentation_collection
+ )
+
+ # check to see if statsbeat is in "attempting to be initialized" state
+ def _is_statsbeat_initializing_state(self):
+ return self._is_stats_exporter() and not get_statsbeat_shutdown() and not get_statsbeat_initial_success()
+
+ def _is_stats_exporter(self):
+ return self.__class__.__name__ == "_StatsBeatExporter"
+
+
+def _is_invalid_code(response_code: Optional[int]) -> bool:
+ """Determine if response is a invalid response.
+
+ :param int response_code: HTTP response code
+ :return: True if response is a invalid response
+ :rtype: bool
+ """
+ return response_code in _INVALID_STATUS_CODES
+
+
+def _is_redirect_code(response_code: Optional[int]) -> bool:
+ """Determine if response is a redirect response.
+
+ :param int response_code: HTTP response code
+ :return: True if response is a redirect response
+ :rtype: bool
+ """
+ return response_code in _REDIRECT_STATUS_CODES
+
+
+def _is_retryable_code(response_code: Optional[int]) -> bool:
+ """Determine if response is retryable.
+
+ :param int response_code: HTTP response code
+ :return: True if response is retryable
+ :rtype: bool
+ """
+ return response_code in _RETRYABLE_STATUS_CODES
+
+
+def _is_throttle_code(response_code: Optional[int]) -> bool:
+ """Determine if response is throttle response.
+
+ :param int response_code: HTTP response code
+ :return: True if response is throttle response
+ :rtype: bool
+ """
+ return response_code in _THROTTLE_STATUS_CODES
+
+
+def _reached_ingestion_code(response_code: Optional[int]) -> bool:
+ """Determine if response indicates ingestion service has been reached.
+
+ :param int response_code: HTTP response code
+ :return: True if response indicates ingestion service has been reached
+ :rtype: bool
+ """
+ return response_code in _REACHED_INGESTION_STATUS_CODES
+
+
+_MONITOR_DOMAIN_MAPPING = {
+ "EventData": TelemetryEventData,
+ "ExceptionData": TelemetryExceptionData,
+ "MessageData": MessageData,
+ "MetricData": MetricsData,
+ "RemoteDependencyData": RemoteDependencyData,
+ "RequestData": RequestData,
+}
+
+
+# from_dict() deserializes incorrectly, format TelemetryItem correctly after it
+# is called
+def _format_storage_telemetry_item(item: TelemetryItem) -> TelemetryItem:
+ # After TelemetryItem.from_dict, all base_data fields are stored in
+ # additional_properties as a dict instead of in item.data.base_data itself
+ # item.data.base_data is also of type MonitorDomain instead of a child class
+ if hasattr(item, "data") and item.data is not None:
+ if hasattr(item.data, "base_data") and isinstance(item.data.base_data, MonitorDomain):
+ if hasattr(item.data, "base_type") and isinstance(item.data.base_type, str):
+ base_type = _MONITOR_DOMAIN_MAPPING.get(item.data.base_type)
+ # Apply deserialization of additional_properties and store that as base_data
+ if base_type:
+ item.data.base_data = base_type.from_dict(item.data.base_data.additional_properties) # type: ignore
+ item.data.base_data.additional_properties = None # type: ignore
+ return item
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py
new file mode 100644
index 00000000..e7ae4e89
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py
@@ -0,0 +1,244 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+import json
+import logging
+from typing import Optional, Sequence, Any
+
+from opentelemetry._logs.severity import SeverityNumber
+from opentelemetry.semconv.attributes.exception_attributes import (
+ EXCEPTION_ESCAPED,
+ EXCEPTION_MESSAGE,
+ EXCEPTION_STACKTRACE,
+ EXCEPTION_TYPE,
+)
+from opentelemetry.sdk._logs import LogData
+from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
+
+from azure.monitor.opentelemetry.exporter import _utils
+from azure.monitor.opentelemetry.exporter._constants import (
+ _EXCEPTION_ENVELOPE_NAME,
+ _MESSAGE_ENVELOPE_NAME,
+)
+from azure.monitor.opentelemetry.exporter._generated.models import (
+ ContextTagKeys,
+ MessageData,
+ MonitorBase,
+ TelemetryEventData,
+ TelemetryExceptionData,
+ TelemetryExceptionDetails,
+ TelemetryItem,
+)
+from azure.monitor.opentelemetry.exporter.export._base import (
+ BaseExporter,
+ ExportResult,
+)
+from azure.monitor.opentelemetry.exporter.export.trace import _utils as trace_utils
+from azure.monitor.opentelemetry.exporter._constants import (
+ _APPLICATION_INSIGHTS_EVENT_MARKER_ATTRIBUTE,
+ _MICROSOFT_CUSTOM_EVENT_NAME,
+)
+from azure.monitor.opentelemetry.exporter.statsbeat._state import (
+ get_statsbeat_shutdown,
+ get_statsbeat_custom_events_feature_set,
+ is_statsbeat_enabled,
+ set_statsbeat_custom_events_feature_set,
+)
+
+_logger = logging.getLogger(__name__)
+
+_DEFAULT_SPAN_ID = 0
+_DEFAULT_TRACE_ID = 0
+
+__all__ = ["AzureMonitorLogExporter"]
+
+
+class AzureMonitorLogExporter(BaseExporter, LogExporter):
+ """Azure Monitor Log exporter for OpenTelemetry."""
+
+ def export(self, batch: Sequence[LogData], **kwargs: Any) -> LogExportResult: # pylint: disable=unused-argument
+ """Export log data.
+
+ :param batch: OpenTelemetry LogData(s) to export.
+ :type batch: ~typing.Sequence[~opentelemetry._logs.LogData]
+ :return: The result of the export.
+ :rtype: ~opentelemetry.sdk._logs.export.LogData
+ """
+ envelopes = [self._log_to_envelope(log) for log in batch]
+ try:
+ result = self._transmit(envelopes)
+ self._handle_transmit_from_storage(envelopes, result)
+ return _get_log_export_result(result)
+ except Exception: # pylint: disable=broad-except
+ _logger.exception("Exception occurred while exporting the data.")
+ return _get_log_export_result(ExportResult.FAILED_NOT_RETRYABLE)
+
+ def shutdown(self) -> None:
+ """Shuts down the exporter.
+
+ Called when the SDK is shut down.
+ """
+ if self.storage:
+ self.storage.close()
+
+ def _log_to_envelope(self, log_data: LogData) -> TelemetryItem:
+ envelope = _convert_log_to_envelope(log_data)
+ envelope.instrumentation_key = self._instrumentation_key
+ return envelope
+
+ # pylint: disable=docstring-keyword-should-match-keyword-only
+ @classmethod
+ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorLogExporter":
+ """
+ Create an AzureMonitorLogExporter from a connection string. This is the
+ recommended way of instantiation if a connection string is passed in
+ explicitly. If a user wants to use a connection string provided by
+ environment variable, the constructor of the exporter can be called
+ directly.
+
+ :param str conn_str: The connection string to be used for
+ authentication.
+ :keyword str api_version: The service API version used. Defaults to
+ latest.
+ :return: an instance of ~AzureMonitorLogExporter
+ :rtype: ~azure.monitor.opentelemetry.exporter.AzureMonitorLogExporter
+ """
+ return cls(connection_string=conn_str, **kwargs)
+
+
+def _log_data_is_event(log_data: LogData) -> bool:
+ log_record = log_data.log_record
+ is_event = None
+ if log_record.attributes:
+ is_event = log_record.attributes.get(_MICROSOFT_CUSTOM_EVENT_NAME) or \
+ log_record.attributes.get(_APPLICATION_INSIGHTS_EVENT_MARKER_ATTRIBUTE) # type: ignore
+ return is_event is not None
+
+
+# pylint: disable=protected-access
+def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
+ log_record = log_data.log_record
+ time_stamp = log_record.timestamp if log_record.timestamp is not None else log_record.observed_timestamp
+ envelope = _utils._create_telemetry_item(time_stamp)
+ envelope.tags.update(_utils._populate_part_a_fields(log_record.resource)) # type: ignore
+ envelope.tags[ContextTagKeys.AI_OPERATION_ID] = "{:032x}".format( # type: ignore
+ log_record.trace_id or _DEFAULT_TRACE_ID
+ )
+ envelope.tags[ContextTagKeys.AI_OPERATION_PARENT_ID] = "{:016x}".format( # type: ignore
+ log_record.span_id or _DEFAULT_SPAN_ID
+ )
+ # Special use case: Customers want to be able to set location ip on log records
+ location_ip = trace_utils._get_location_ip(log_record.attributes)
+ if location_ip:
+ envelope.tags[ContextTagKeys.AI_LOCATION_IP] = location_ip # type: ignore
+ properties = _utils._filter_custom_properties(
+ log_record.attributes, lambda key, val: not _is_ignored_attribute(key)
+ )
+ exc_type = exc_message = stack_trace = None
+ if log_record.attributes:
+ exc_type = log_record.attributes.get(EXCEPTION_TYPE)
+ exc_message = log_record.attributes.get(EXCEPTION_MESSAGE)
+ stack_trace = log_record.attributes.get(EXCEPTION_STACKTRACE)
+ severity_level = _get_severity_level(log_record.severity_number)
+
+ # Exception telemetry
+ if exc_type is not None or exc_message is not None:
+ envelope.name = _EXCEPTION_ENVELOPE_NAME
+ has_full_stack = stack_trace is not None
+ if not exc_type:
+ exc_type = "Exception"
+ # Log body takes priority for message
+ if log_record.body:
+ message = _map_body_to_message(log_record.body)
+ elif exc_message:
+ message = exc_message # type: ignore
+ else:
+ message = "Exception"
+ exc_details = TelemetryExceptionDetails(
+ type_name=str(exc_type)[:1024], # type: ignore
+ message=str(message)[:32768],
+ has_full_stack=has_full_stack,
+ stack=str(stack_trace)[:32768],
+ )
+ data = TelemetryExceptionData( # type: ignore
+ severity_level=severity_level, # type: ignore
+ properties=properties,
+ exceptions=[exc_details],
+ )
+ envelope.data = MonitorBase(base_data=data, base_type="ExceptionData")
+ elif _log_data_is_event(log_data): # Event telemetry
+ _set_statsbeat_custom_events_feature()
+ envelope.name = "Microsoft.ApplicationInsights.Event"
+ event_name = ""
+ if log_record.attributes.get(_MICROSOFT_CUSTOM_EVENT_NAME): # type: ignore
+ event_name = str(log_record.attributes.get(_MICROSOFT_CUSTOM_EVENT_NAME)) # type: ignore
+ else:
+ event_name = _map_body_to_message(log_record.body)
+ data = TelemetryEventData( # type: ignore
+ name=event_name,
+ properties=properties,
+ )
+ envelope.data = MonitorBase(base_data=data, base_type="EventData")
+ else: # Message telemetry
+ envelope.name = _MESSAGE_ENVELOPE_NAME
+ # pylint: disable=line-too-long
+ # Severity number: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber
+ data = MessageData( # type: ignore
+ message=_map_body_to_message(log_record.body),
+ severity_level=severity_level, # type: ignore
+ properties=properties,
+ )
+ envelope.data = MonitorBase(base_data=data, base_type="MessageData")
+
+ return envelope
+
+
+def _get_log_export_result(result: ExportResult) -> LogExportResult:
+ if result == ExportResult.SUCCESS:
+ return LogExportResult.SUCCESS
+ return LogExportResult.FAILURE
+
+
+# pylint: disable=line-too-long
+# Common schema: https://github.com/microsoft/common-schema/blob/main/v4.0/Mappings/AzureMonitor-AI.md#exceptionseveritylevel
+# SeverityNumber specs: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber
+def _get_severity_level(severity_number: Optional[SeverityNumber]):
+ if severity_number is None or severity_number.value < 9:
+ return 0
+ return int((severity_number.value - 1) / 4 - 1)
+
+
+def _map_body_to_message(log_body: Any) -> str:
+ if not log_body:
+ return ""
+
+ if isinstance(log_body, str):
+ return log_body[:32768]
+
+ if isinstance(log_body, Exception):
+ return str(log_body)[:32768]
+
+ try:
+ return json.dumps(log_body)[:32768]
+ except: # pylint: disable=bare-except
+ return str(log_body)[:32768]
+
+
+def _is_ignored_attribute(key: str) -> bool:
+ return key in _IGNORED_ATTRS
+
+
+_IGNORED_ATTRS = frozenset(
+ (
+ EXCEPTION_TYPE,
+ EXCEPTION_MESSAGE,
+ EXCEPTION_STACKTRACE,
+ EXCEPTION_ESCAPED,
+ _APPLICATION_INSIGHTS_EVENT_MARKER_ATTRIBUTE,
+ _MICROSOFT_CUSTOM_EVENT_NAME,
+ )
+)
+
+
+def _set_statsbeat_custom_events_feature():
+ if is_statsbeat_enabled() and not get_statsbeat_shutdown() and not get_statsbeat_custom_events_feature_set():
+ set_statsbeat_custom_events_feature_set()
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py
new file mode 100644
index 00000000..98ed6a47
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py
@@ -0,0 +1,291 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+import logging
+import os
+
+from typing import Dict, Optional, Union, Any
+
+from opentelemetry.util.types import Attributes
+from opentelemetry.sdk.metrics import (
+ Counter,
+ Histogram,
+ ObservableCounter,
+ ObservableGauge,
+ ObservableUpDownCounter,
+ UpDownCounter,
+)
+from opentelemetry.sdk.metrics.export import (
+ AggregationTemporality,
+ DataPointT,
+ HistogramDataPoint,
+ MetricExporter,
+ MetricExportResult,
+ MetricsData as OTMetricsData,
+ NumberDataPoint,
+)
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.util.instrumentation import InstrumentationScope
+from opentelemetry.semconv.attributes.http_attributes import HTTP_RESPONSE_STATUS_CODE
+from opentelemetry.semconv.metrics import MetricInstruments
+from opentelemetry.semconv.metrics.http_metrics import (
+ HTTP_CLIENT_REQUEST_DURATION,
+ HTTP_SERVER_REQUEST_DURATION,
+)
+from opentelemetry.semconv.trace import SpanAttributes
+
+from azure.monitor.opentelemetry.exporter._constants import (
+ _APPLICATIONINSIGHTS_METRIC_NAMESPACE_OPT_IN,
+ _AUTOCOLLECTED_INSTRUMENT_NAMES,
+ _METRIC_ENVELOPE_NAME,
+)
+from azure.monitor.opentelemetry.exporter import _utils
+from azure.monitor.opentelemetry.exporter._generated.models import (
+ MetricDataPoint,
+ MetricsData,
+ MonitorBase,
+ TelemetryItem,
+)
+from azure.monitor.opentelemetry.exporter.export._base import (
+ BaseExporter,
+ ExportResult,
+)
+from azure.monitor.opentelemetry.exporter.export.trace import _utils as trace_utils
+
+
+_logger = logging.getLogger(__name__)
+
+__all__ = ["AzureMonitorMetricExporter"]
+
+
+APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = {
+ Counter: AggregationTemporality.DELTA,
+ Histogram: AggregationTemporality.DELTA,
+ ObservableCounter: AggregationTemporality.DELTA,
+ ObservableGauge: AggregationTemporality.CUMULATIVE,
+ ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
+ UpDownCounter: AggregationTemporality.CUMULATIVE,
+}
+
+
+class AzureMonitorMetricExporter(BaseExporter, MetricExporter):
+ """Azure Monitor Metric exporter for OpenTelemetry."""
+
+ def __init__(self, **kwargs: Any) -> None:
+ BaseExporter.__init__(self, **kwargs)
+ MetricExporter.__init__(
+ self,
+ preferred_temporality=APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore
+ preferred_aggregation=kwargs.get("preferred_aggregation"), # type: ignore
+ )
+
+ # pylint: disable=R1702
+ def export(
+ self,
+ metrics_data: OTMetricsData,
+ timeout_millis: float = 10_000,
+ **kwargs: Any,
+ ) -> MetricExportResult:
+ """Exports a batch of metric data
+
+ :param metrics_data: OpenTelemetry Metric(s) to export.
+ :type metrics_data: Sequence[~opentelemetry.sdk.metrics._internal.point.MetricsData]
+ :param timeout_millis: The maximum amount of time to wait for each export. Not currently used.
+ :type timeout_millis: float
+ :return: The result of the export.
+ :rtype: ~opentelemetry.sdk.metrics.export.MetricExportResult
+ """
+ envelopes = []
+ if metrics_data is None:
+ return MetricExportResult.SUCCESS
+ for resource_metric in metrics_data.resource_metrics:
+ for scope_metric in resource_metric.scope_metrics:
+ for metric in scope_metric.metrics:
+ for point in metric.data.data_points:
+ if point is not None:
+ envelope = self._point_to_envelope(
+ point,
+ metric.name,
+ resource_metric.resource,
+ scope_metric.scope,
+ )
+ if envelope is not None:
+ envelopes.append(envelope)
+ try:
+ result = self._transmit(envelopes)
+ self._handle_transmit_from_storage(envelopes, result)
+ return _get_metric_export_result(result)
+ except Exception: # pylint: disable=broad-except
+ _logger.exception("Exception occurred while exporting the data.")
+ return _get_metric_export_result(ExportResult.FAILED_NOT_RETRYABLE)
+
+ def force_flush(
+ self,
+ timeout_millis: float = 10_000,
+ ) -> bool:
+ # Ensure that export of any metrics currently received by the exporter are completed as soon as possible.
+
+ return True
+
+ def shutdown(
+ self,
+ timeout_millis: float = 30_000,
+ **kwargs: Any,
+ ) -> None:
+ """Shuts down the exporter.
+
+ Called when the SDK is shut down.
+
+ :param timeout_millis: The maximum amount of time to wait for shutdown. Not currently used.
+ :type timeout_millis: float
+ """
+ if self.storage:
+ self.storage.close()
+
+ def _point_to_envelope(
+ self,
+ point: DataPointT,
+ name: str,
+ resource: Optional[Resource] = None,
+ scope: Optional[InstrumentationScope] = None,
+ ) -> Optional[TelemetryItem]:
+ envelope = _convert_point_to_envelope(point, name, resource, scope)
+ if name in _AUTOCOLLECTED_INSTRUMENT_NAMES:
+ envelope = _handle_std_metric_envelope(envelope, name, point.attributes) # type: ignore
+ if envelope is not None:
+ envelope.instrumentation_key = self._instrumentation_key
+ return envelope
+
+ # pylint: disable=docstring-keyword-should-match-keyword-only
+ @classmethod
+ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorMetricExporter":
+ """
+ Create an AzureMonitorMetricExporter from a connection string. This is
+ the recommended way of instantiation if a connection string is passed in
+ explicitly. If a user wants to use a connection string provided by
+ environment variable, the constructor of the exporter can be called
+ directly.
+
+ :param str conn_str: The connection string to be used for
+ authentication.
+ :keyword str api_version: The service API version used. Defaults to
+ latest.
+ :return: An instance of ~AzureMonitorMetricExporter
+ :rtype: ~azure.monitor.opentelemetry.exporter.AzureMonitorMetricExporter
+ """
+ return cls(connection_string=conn_str, **kwargs)
+
+
+# pylint: disable=protected-access
+def _convert_point_to_envelope(
+ point: DataPointT, name: str, resource: Optional[Resource] = None, scope: Optional[InstrumentationScope] = None
+) -> TelemetryItem:
+ envelope = _utils._create_telemetry_item(point.time_unix_nano)
+ envelope.name = _METRIC_ENVELOPE_NAME
+ envelope.tags.update(_utils._populate_part_a_fields(resource)) # type: ignore
+ namespace = None
+ if scope is not None and _is_metric_namespace_opted_in():
+ namespace = str(scope.name)[:256]
+ value: Union[int, float] = 0
+ count = 1
+ min_ = None
+ max_ = None
+ # std_dev = None
+
+ if isinstance(point, NumberDataPoint):
+ value = point.value
+ elif isinstance(point, HistogramDataPoint):
+ value = point.sum
+ count = int(point.count)
+ min_ = point.min
+ max_ = point.max
+
+ # truncation logic
+ properties = _utils._filter_custom_properties(point.attributes)
+
+ data_point = MetricDataPoint(
+ name=str(name)[:1024],
+ namespace=namespace,
+ value=value,
+ count=count,
+ min=min_,
+ max=max_,
+ )
+
+ data = MetricsData(
+ properties=properties,
+ metrics=[data_point],
+ )
+
+ envelope.data = MonitorBase(base_data=data, base_type="MetricData")
+
+ return envelope
+
+
+def _handle_std_metric_envelope(
+ envelope: TelemetryItem,
+ name: str,
+ attributes: Attributes,
+) -> Optional[TelemetryItem]:
+ properties: Dict[str, str] = {}
+ tags = envelope.tags
+ if not attributes:
+ attributes = {}
+ status_code = attributes.get(HTTP_RESPONSE_STATUS_CODE) or attributes.get(SpanAttributes.HTTP_STATUS_CODE)
+ if status_code:
+ try:
+ status_code = int(status_code) # type: ignore
+ except ValueError:
+ status_code = 0
+ else:
+ status_code = 0
+ if name in (HTTP_CLIENT_REQUEST_DURATION, MetricInstruments.HTTP_CLIENT_DURATION):
+ properties["_MS.MetricId"] = "dependencies/duration"
+ properties["_MS.IsAutocollected"] = "True"
+ properties["Dependency.Type"] = "http"
+ properties["Dependency.Success"] = str(_is_status_code_success(status_code)) # type: ignore
+ target, _ = trace_utils._get_target_and_path_for_http_dependency(attributes)
+ properties["dependency/target"] = target # type: ignore
+ properties["dependency/resultCode"] = str(status_code)
+ properties["cloud/roleInstance"] = tags["ai.cloud.roleInstance"] # type: ignore
+ properties["cloud/roleName"] = tags["ai.cloud.role"] # type: ignore
+ elif name in (HTTP_SERVER_REQUEST_DURATION, MetricInstruments.HTTP_SERVER_DURATION):
+ properties["_MS.MetricId"] = "requests/duration"
+ properties["_MS.IsAutocollected"] = "True"
+ properties["request/resultCode"] = str(status_code)
+ # TODO: Change to symbol once released in upstream
+ if attributes.get("user_agent.synthetic.type"):
+ properties["operation/synthetic"] = "True"
+ properties["cloud/roleInstance"] = tags["ai.cloud.roleInstance"] # type: ignore
+ properties["cloud/roleName"] = tags["ai.cloud.role"] # type: ignore
+ properties["Request.Success"] = str(_is_status_code_success(status_code)) # type: ignore
+ else:
+ # Any other autocollected metrics are not supported yet for standard metrics
+ # We ignore these envelopes in these cases
+ return None
+
+ # TODO: rpc, database, messaging
+
+ envelope.data.base_data.properties = properties # type: ignore
+
+ return envelope
+
+
+def _is_status_code_success(status_code: Optional[str]) -> bool:
+ if status_code is None or status_code == 0:
+ return False
+ try:
+ # Success criteria based solely off status code is True only if status_code < 400
+ # for both client and server spans
+ return int(status_code) < 400
+ except ValueError:
+ return False
+
+
+def _is_metric_namespace_opted_in() -> bool:
+ return os.environ.get(_APPLICATIONINSIGHTS_METRIC_NAMESPACE_OPT_IN, "False").lower() == "true"
+
+
+def _get_metric_export_result(result: ExportResult) -> MetricExportResult:
+ if result == ExportResult.SUCCESS:
+ return MetricExportResult.SUCCESS
+ return MetricExportResult.FAILURE
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py
new file mode 100644
index 00000000..c1d51b7d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py
@@ -0,0 +1,553 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+from os import environ
+import json
+import logging
+from time import time_ns
+from typing import no_type_check, Any, Dict, List, Sequence
+from urllib.parse import urlparse
+
+from opentelemetry.semconv.attributes.client_attributes import CLIENT_ADDRESS
+from opentelemetry.semconv.attributes.http_attributes import (
+ HTTP_REQUEST_METHOD,
+ HTTP_RESPONSE_STATUS_CODE,
+)
+from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
+from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.trace import ReadableSpan
+from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
+from opentelemetry.trace import SpanKind, get_tracer_provider
+
+from azure.monitor.opentelemetry.exporter._constants import (
+ _APPLICATIONINSIGHTS_OPENTELEMETRY_RESOURCE_METRIC_DISABLED,
+ _AZURE_SDK_NAMESPACE_NAME,
+ _AZURE_SDK_OPENTELEMETRY_NAME,
+ _INSTRUMENTATION_SUPPORTING_METRICS_LIST,
+ _SAMPLE_RATE_KEY,
+ _METRIC_ENVELOPE_NAME,
+ _MESSAGE_ENVELOPE_NAME,
+ _REQUEST_ENVELOPE_NAME,
+ _EXCEPTION_ENVELOPE_NAME,
+ _REMOTE_DEPENDENCY_ENVELOPE_NAME,
+)
+from azure.monitor.opentelemetry.exporter import _utils
+from azure.monitor.opentelemetry.exporter._generated.models import (
+ ContextTagKeys,
+ MessageData,
+ MetricDataPoint,
+ MetricsData,
+ MonitorBase,
+ RemoteDependencyData,
+ RequestData,
+ TelemetryExceptionData,
+ TelemetryExceptionDetails,
+ TelemetryItem,
+)
+from azure.monitor.opentelemetry.exporter.export._base import (
+ BaseExporter,
+ ExportResult,
+)
+from . import _utils as trace_utils
+
+
+_logger = logging.getLogger(__name__)
+
+__all__ = ["AzureMonitorTraceExporter"]
+
+_STANDARD_OPENTELEMETRY_ATTRIBUTE_PREFIXES = [
+ "http.",
+ "db.",
+ "message.",
+ "messaging.",
+ "rpc.",
+ "enduser.",
+ "net.",
+ "peer.",
+ "exception.",
+ "thread.",
+ "fass.",
+ "code.",
+]
+
+_STANDARD_OPENTELEMETRY_HTTP_ATTRIBUTES = [
+ "client.address",
+ "client.port",
+ "server.address",
+ "server.port",
+ "url.full",
+ "url.path",
+ "url.query",
+ "url.scheme",
+ "url.template",
+ "error.type",
+ "network.local.address",
+ "network.local.port",
+ "network.protocol.name",
+ "network.peer.address",
+ "network.peer.port",
+ "network.protocol.version",
+ "network.transport",
+ "user_agent.original",
+ "user_agent.synthetic.type",
+]
+
+_STANDARD_AZURE_MONITOR_ATTRIBUTES = [
+ _SAMPLE_RATE_KEY,
+]
+
+
+class AzureMonitorTraceExporter(BaseExporter, SpanExporter):
+ """Azure Monitor Trace exporter for OpenTelemetry."""
+
+ def __init__(self, **kwargs: Any):
+ self._tracer_provider = kwargs.pop("tracer_provider", None)
+ super().__init__(**kwargs)
+
+ def export(
+ self, spans: Sequence[ReadableSpan], **kwargs: Any # pylint: disable=unused-argument
+ ) -> SpanExportResult:
+ """Export span data.
+
+ :param spans: Open Telemetry Spans to export.
+ :type spans: ~typing.Sequence[~opentelemetry.trace.Span]
+ :return: The result of the export.
+ :rtype: ~opentelemetry.sdk.trace.export.SpanExportResult
+ """
+ envelopes = []
+ if spans and self._should_collect_otel_resource_metric():
+ resource = None
+ try:
+ tracer_provider = self._tracer_provider or get_tracer_provider()
+ resource = tracer_provider.resource # type: ignore
+ envelopes.append(self._get_otel_resource_envelope(resource))
+ except AttributeError as e:
+ _logger.exception("Failed to derive Resource from Tracer Provider: %s", e)
+ for span in spans:
+ envelopes.append(self._span_to_envelope(span))
+ envelopes.extend(self._span_events_to_envelopes(span))
+ try:
+ result = self._transmit(envelopes)
+ self._handle_transmit_from_storage(envelopes, result)
+ return _get_trace_export_result(result)
+ except Exception: # pylint: disable=broad-except
+ _logger.exception("Exception occurred while exporting the data.")
+ return _get_trace_export_result(ExportResult.FAILED_NOT_RETRYABLE)
+
+ def shutdown(self) -> None:
+ """Shuts down the exporter.
+
+ Called when the SDK is shut down.
+ """
+ if self.storage:
+ self.storage.close()
+
+ # pylint: disable=protected-access
+ def _get_otel_resource_envelope(self, resource: Resource) -> TelemetryItem:
+ attributes: Dict[str, str] = {}
+ if resource:
+ attributes = resource.attributes
+ envelope = _utils._create_telemetry_item(time_ns())
+ envelope.name = _METRIC_ENVELOPE_NAME
+ envelope.tags.update(_utils._populate_part_a_fields(resource)) # pylint: disable=W0212
+ envelope.instrumentation_key = self._instrumentation_key
+ data_point = MetricDataPoint(
+ name="_OTELRESOURCE_"[:1024],
+ value=0,
+ )
+
+ data = MetricsData(
+ properties=attributes,
+ metrics=[data_point],
+ )
+
+ envelope.data = MonitorBase(base_data=data, base_type="MetricData")
+
+ return envelope
+
+ def _span_to_envelope(self, span: ReadableSpan) -> TelemetryItem:
+ envelope = _convert_span_to_envelope(span)
+ envelope.instrumentation_key = self._instrumentation_key
+ return envelope # type: ignore
+
+ def _span_events_to_envelopes(self, span: ReadableSpan) -> Sequence[TelemetryItem]:
+ if not span or len(span.events) == 0:
+ return []
+ envelopes = _convert_span_events_to_envelopes(span)
+ for envelope in envelopes:
+ envelope.instrumentation_key = self._instrumentation_key
+ return envelopes
+
+ def _should_collect_otel_resource_metric(self):
+ disabled = environ.get(_APPLICATIONINSIGHTS_OPENTELEMETRY_RESOURCE_METRIC_DISABLED)
+ return disabled is None or disabled.lower() != "true"
+
+ # pylint: disable=docstring-keyword-should-match-keyword-only
+ @classmethod
+ def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorTraceExporter":
+ """
+ Create an AzureMonitorTraceExporter from a connection string. This is
+ the recommended way of instantiation if a connection string is passed in
+ explicitly. If a user wants to use a connection string provided by
+ environment variable, the constructor of the exporter can be called
+ directly.
+
+ :param str conn_str: The connection string to be used for
+ authentication.
+ :keyword str api_version: The service API version used. Defaults to
+ latest.
+ :return: an instance of ~AzureMonitorTraceExporter
+ :rtype: ~azure.monitor.opentelemetry.exporter.AzureMonitorTraceExporter
+ """
+ return cls(connection_string=conn_str, **kwargs)
+
+
+# pylint: disable=too-many-statements
+# pylint: disable=too-many-branches
+# pylint: disable=protected-access
+# mypy: disable-error-code="assignment,attr-defined,index,operator,union-attr"
+@no_type_check
+def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem:
+ # Update instrumentation bitmap if span was generated from instrumentation
+ _check_instrumentation_span(span)
+ duration = 0
+ start_time = 0
+ if span.start_time:
+ start_time = span.start_time
+ if span.end_time:
+ duration = span.end_time - span.start_time
+ envelope = _utils._create_telemetry_item(start_time)
+ envelope.tags.update(_utils._populate_part_a_fields(span.resource))
+ envelope.tags[ContextTagKeys.AI_OPERATION_ID] = "{:032x}".format(span.context.trace_id)
+ if SpanAttributes.ENDUSER_ID in span.attributes:
+ envelope.tags[ContextTagKeys.AI_USER_ID] = span.attributes[SpanAttributes.ENDUSER_ID]
+ if span.parent and span.parent.span_id:
+ envelope.tags[ContextTagKeys.AI_OPERATION_PARENT_ID] = "{:016x}".format(span.parent.span_id)
+ if span.kind in (SpanKind.CONSUMER, SpanKind.SERVER):
+ envelope.name = _REQUEST_ENVELOPE_NAME
+ data = RequestData(
+ name=span.name,
+ id="{:016x}".format(span.context.span_id),
+ duration=_utils.ns_to_duration(duration),
+ response_code="0",
+ success=span.status.is_ok,
+ properties={},
+ measurements={},
+ )
+ envelope.data = MonitorBase(base_data=data, base_type="RequestData")
+ envelope.tags[ContextTagKeys.AI_OPERATION_NAME] = span.name
+ location_ip = trace_utils._get_location_ip(span.attributes)
+ if location_ip:
+ envelope.tags[ContextTagKeys.AI_LOCATION_IP] = location_ip
+ if _AZURE_SDK_NAMESPACE_NAME in span.attributes: # Azure specific resources
+ # Currently only eventhub and servicebus are supported (kind CONSUMER)
+ data.source = trace_utils._get_azure_sdk_target_source(span.attributes)
+ if span.links:
+ total = 0
+ for link in span.links:
+ attributes = link.attributes
+ enqueued_time = attributes.get("enqueuedTime")
+ if isinstance(enqueued_time, int):
+ difference = (start_time / 1000000) - enqueued_time
+ total += difference
+ data.measurements["timeSinceEnqueued"] = max(0, total / len(span.links))
+ elif HTTP_REQUEST_METHOD in span.attributes or SpanAttributes.HTTP_METHOD in span.attributes: # HTTP
+ path = ""
+ user_agent = trace_utils._get_user_agent(span.attributes)
+ if user_agent:
+ # TODO: Not exposed in Swagger, need to update def
+ envelope.tags["ai.user.userAgent"] = user_agent
+ # url
+ url = trace_utils._get_url_for_http_request(span.attributes)
+ data.url = url
+ # Http specific logic for ai.operation.name
+ if SpanAttributes.HTTP_ROUTE in span.attributes:
+ envelope.tags[ContextTagKeys.AI_OPERATION_NAME] = "{} {}".format(
+ span.attributes.get(HTTP_REQUEST_METHOD) or span.attributes.get(SpanAttributes.HTTP_METHOD),
+ span.attributes[SpanAttributes.HTTP_ROUTE],
+ )
+ elif url:
+ try:
+ parse_url = urlparse(url)
+ path = parse_url.path
+ if not path:
+ path = "/"
+ envelope.tags[ContextTagKeys.AI_OPERATION_NAME] = "{} {}".format(
+ span.attributes.get(HTTP_REQUEST_METHOD) or span.attributes.get(SpanAttributes.HTTP_METHOD),
+ path,
+ )
+ except Exception: # pylint: disable=broad-except
+ pass
+ status_code = span.attributes.get(HTTP_RESPONSE_STATUS_CODE) \
+ or span.attributes.get(SpanAttributes.HTTP_STATUS_CODE)
+ if status_code:
+ try:
+ status_code = int(status_code) # type: ignore
+ except ValueError:
+ status_code = 0
+ else:
+ status_code = 0
+ data.response_code = str(status_code)
+ # Success criteria for server spans depends on span.success and the actual status code
+ data.success = span.status.is_ok and status_code and status_code not in range(400, 500)
+ elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging
+ if span.attributes.get(SpanAttributes.MESSAGING_DESTINATION):
+ if span.attributes.get(CLIENT_ADDRESS) or span.attributes.get(SpanAttributes.NET_PEER_NAME):
+ data.source = "{}/{}".format(
+ span.attributes.get(CLIENT_ADDRESS) or span.attributes.get(SpanAttributes.NET_PEER_NAME),
+ span.attributes.get(SpanAttributes.MESSAGING_DESTINATION),
+ )
+ elif span.attributes.get(SpanAttributes.NET_PEER_IP):
+ data.source = "{}/{}".format(
+ span.attributes[SpanAttributes.NET_PEER_IP],
+ span.attributes.get(SpanAttributes.MESSAGING_DESTINATION),
+ )
+ else:
+ data.source = span.attributes.get(SpanAttributes.MESSAGING_DESTINATION, "")
+ # Apply truncation
+ # See https://github.com/MohanGsk/ApplicationInsights-Home/tree/master/EndpointSpecs/Schemas/Bond
+ if envelope.tags.get(ContextTagKeys.AI_OPERATION_NAME):
+ data.name = envelope.tags[ContextTagKeys.AI_OPERATION_NAME][:1024]
+ if data.response_code:
+ data.response_code = data.response_code[:1024]
+ if data.source:
+ data.source = data.source[:1024]
+ if data.url:
+ data.url = data.url[:2048]
+ else: # INTERNAL, CLIENT, PRODUCER
+ envelope.name = _REMOTE_DEPENDENCY_ENVELOPE_NAME
+ # TODO: ai.operation.name for non-server spans
+ time = 0
+ if span.end_time and span.start_time:
+ time = span.end_time - span.start_time
+ data = RemoteDependencyData( # type: ignore
+ name=span.name,
+ id="{:016x}".format(span.context.span_id),
+ result_code="0",
+ duration=_utils.ns_to_duration(time),
+ success=span.status.is_ok, # Success depends only on span status
+ properties={},
+ )
+ envelope.data = MonitorBase(base_data=data, base_type="RemoteDependencyData")
+ target = trace_utils._get_target_for_dependency_from_peer(span.attributes)
+ if span.kind is SpanKind.CLIENT:
+ if _AZURE_SDK_NAMESPACE_NAME in span.attributes: # Azure specific resources
+ # Currently only eventhub and servicebus are supported
+ # https://github.com/Azure/azure-sdk-for-python/issues/9256
+ data.type = span.attributes[_AZURE_SDK_NAMESPACE_NAME]
+ data.target = trace_utils._get_azure_sdk_target_source(span.attributes)
+ elif HTTP_REQUEST_METHOD in span.attributes or SpanAttributes.HTTP_METHOD in span.attributes: # HTTP
+ data.type = "HTTP"
+ user_agent = trace_utils._get_user_agent(span.attributes)
+ if user_agent:
+ # TODO: Not exposed in Swagger, need to update def
+ envelope.tags["ai.user.userAgent"] = user_agent
+ url = trace_utils._get_url_for_http_dependency(span.attributes)
+ # data
+ if url:
+ data.data = url
+ target, path = trace_utils._get_target_and_path_for_http_dependency(
+ span.attributes,
+ url,
+ )
+ # http specific logic for name
+ if path:
+ data.name = "{} {}".format(
+ span.attributes.get(HTTP_REQUEST_METHOD) or \
+ span.attributes.get(SpanAttributes.HTTP_METHOD),
+ path,
+ )
+ status_code = span.attributes.get(HTTP_RESPONSE_STATUS_CODE) or \
+ span.attributes.get(SpanAttributes.HTTP_STATUS_CODE)
+ if status_code:
+ try:
+ status_code = int(status_code) # type: ignore
+ except ValueError:
+ status_code = 0
+ else:
+ status_code = 0
+ data.result_code = str(status_code)
+ elif SpanAttributes.DB_SYSTEM in span.attributes: # Database
+ db_system = span.attributes[SpanAttributes.DB_SYSTEM]
+ if db_system == DbSystemValues.MYSQL.value:
+ data.type = "mysql"
+ elif db_system == DbSystemValues.POSTGRESQL.value:
+ data.type = "postgresql"
+ elif db_system == DbSystemValues.MONGODB.value:
+ data.type = "mongodb"
+ elif db_system == DbSystemValues.REDIS.value:
+ data.type = "redis"
+ elif trace_utils._is_sql_db(str(db_system)):
+ data.type = "SQL"
+ else:
+ data.type = db_system
+ # data is the full statement or operation
+ if SpanAttributes.DB_STATEMENT in span.attributes:
+ data.data = span.attributes[SpanAttributes.DB_STATEMENT]
+ elif SpanAttributes.DB_OPERATION in span.attributes:
+ data.data = span.attributes[SpanAttributes.DB_OPERATION]
+ # db specific logic for target
+ target = trace_utils._get_target_for_db_dependency(
+ target, # type: ignore
+ db_system, # type: ignore
+ span.attributes,
+ )
+ elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging
+ data.type = span.attributes[SpanAttributes.MESSAGING_SYSTEM]
+ target = trace_utils._get_target_for_messaging_dependency(
+ target, # type: ignore
+ span.attributes,
+ )
+ elif SpanAttributes.RPC_SYSTEM in span.attributes: # Rpc
+ data.type = SpanAttributes.RPC_SYSTEM
+ target = trace_utils._get_target_for_rpc_dependency(
+ target, # type: ignore
+ span.attributes,
+ )
+ elif gen_ai_attributes.GEN_AI_SYSTEM in span.attributes: # GenAI
+ data.type = span.attributes[gen_ai_attributes.GEN_AI_SYSTEM]
+ else:
+ data.type = "N/A"
+ elif span.kind is SpanKind.PRODUCER: # Messaging
+ # Currently only eventhub and servicebus are supported that produce PRODUCER spans
+ if _AZURE_SDK_NAMESPACE_NAME in span.attributes:
+ data.type = "Queue Message | {}".format(span.attributes[_AZURE_SDK_NAMESPACE_NAME])
+ target = trace_utils._get_azure_sdk_target_source(span.attributes)
+ else:
+ data.type = "Queue Message"
+ msg_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM)
+ if msg_system:
+ data.type += " | {}".format(msg_system)
+ target = trace_utils._get_target_for_messaging_dependency(
+ target, # type: ignore
+ span.attributes,
+ )
+ else: # SpanKind.INTERNAL
+ data.type = "InProc"
+ if _AZURE_SDK_NAMESPACE_NAME in span.attributes:
+ data.type += " | {}".format(span.attributes[_AZURE_SDK_NAMESPACE_NAME])
+ # Apply truncation
+ # See https://github.com/MohanGsk/ApplicationInsights-Home/tree/master/EndpointSpecs/Schemas/Bond
+ if data.name:
+ data.name = str(data.name)[:1024]
+ if data.result_code:
+ data.result_code = str(data.result_code)[:1024]
+ if data.data:
+ data.data = str(data.data)[:8192]
+ if data.type:
+ data.type = str(data.type)[:1024]
+ if target:
+ data.target = str(target)[:1024]
+
+ # sampleRate
+ if _SAMPLE_RATE_KEY in span.attributes:
+ envelope.sample_rate = span.attributes[_SAMPLE_RATE_KEY]
+
+ data.properties = _utils._filter_custom_properties(
+ span.attributes, lambda key, val: not _is_standard_attribute(key)
+ )
+
+ # Standard metrics special properties
+ # Only add the property if span was generated from instrumentation that supports metrics collection
+ if (
+ span.instrumentation_scope is not None
+ and span.instrumentation_scope.name in _INSTRUMENTATION_SUPPORTING_METRICS_LIST
+ ):
+ data.properties["_MS.ProcessedByMetricExtractors"] = "True"
+
+ if span.links:
+ # Max length for value is 8192
+ # Since links are a fixed length (80) in json, max number of links would be 102
+ links: List[Dict[str, str]] = []
+ for link in span.links:
+ if len(links) > 102:
+ break
+ operation_id = "{:032x}".format(link.context.trace_id)
+ span_id = "{:016x}".format(link.context.span_id)
+ links.append({"operation_Id": operation_id, "id": span_id})
+ data.properties["_MS.links"] = json.dumps(links)
+ return envelope
+
+
+# pylint: disable=protected-access
+def _convert_span_events_to_envelopes(span: ReadableSpan) -> Sequence[TelemetryItem]:
+ envelopes = []
+ for event in span.events:
+ envelope = _utils._create_telemetry_item(event.timestamp)
+ envelope.tags.update(_utils._populate_part_a_fields(span.resource))
+ envelope.tags[ContextTagKeys.AI_OPERATION_ID] = "{:032x}".format(span.context.trace_id)
+ if span.context and span.context.span_id:
+ envelope.tags[ContextTagKeys.AI_OPERATION_PARENT_ID] = "{:016x}".format(span.context.span_id)
+
+ # sampleRate
+ if span.attributes and _SAMPLE_RATE_KEY in span.attributes:
+ envelope.sample_rate = span.attributes[_SAMPLE_RATE_KEY]
+
+ properties = _utils._filter_custom_properties(
+ event.attributes, lambda key, val: not _is_standard_attribute(key)
+ )
+ if event.name == "exception":
+ envelope.name = _EXCEPTION_ENVELOPE_NAME
+ exc_type = exc_message = stack_trace = None
+ if event.attributes:
+ exc_type = event.attributes.get(SpanAttributes.EXCEPTION_TYPE)
+ exc_message = event.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
+ stack_trace = event.attributes.get(SpanAttributes.EXCEPTION_STACKTRACE)
+ if not exc_type:
+ exc_type = "Exception"
+ if not exc_message:
+ exc_message = "Exception"
+ has_full_stack = stack_trace is not None
+ exc_details = TelemetryExceptionDetails(
+ type_name=str(exc_type)[:1024],
+ message=str(exc_message)[:32768],
+ has_full_stack=has_full_stack,
+ stack=str(stack_trace)[:32768],
+ )
+ data = TelemetryExceptionData(
+ properties=properties,
+ exceptions=[exc_details],
+ )
+ envelope.data = MonitorBase(base_data=data, base_type="ExceptionData")
+ else:
+ envelope.name = _MESSAGE_ENVELOPE_NAME
+ data = MessageData( # type: ignore
+ message=str(event.name)[:32768],
+ properties=properties,
+ )
+ envelope.data = MonitorBase(base_data=data, base_type="MessageData")
+
+ envelopes.append(envelope)
+
+ return envelopes
+
+
+def _check_instrumentation_span(span: ReadableSpan) -> None:
+ # Special use-case for spans generated from azure-sdk services
+ # Identified by having az.namespace as a span attribute
+ if span.attributes and _AZURE_SDK_NAMESPACE_NAME in span.attributes:
+ _utils.add_instrumentation(_AZURE_SDK_OPENTELEMETRY_NAME)
+ return
+ if span.instrumentation_scope is None:
+ return
+ # All instrumentation scope names from OpenTelemetry instrumentations have
+ # `opentelemetry.instrumentation.` as a prefix
+ if span.instrumentation_scope.name.startswith("opentelemetry.instrumentation."):
+ # The string after the prefix is the name of the instrumentation
+ name = span.instrumentation_scope.name.split("opentelemetry.instrumentation.", 1)[1]
+ # Update the bit map to indicate instrumentation is being used
+ _utils.add_instrumentation(name)
+
+
+def _is_standard_attribute(key: str) -> bool:
+ for prefix in _STANDARD_OPENTELEMETRY_ATTRIBUTE_PREFIXES:
+ if key.startswith(prefix):
+ return True
+ return key in _STANDARD_AZURE_MONITOR_ATTRIBUTES or \
+ key in _STANDARD_OPENTELEMETRY_HTTP_ATTRIBUTES
+
+
+def _get_trace_export_result(result: ExportResult) -> SpanExportResult:
+ if result == ExportResult.SUCCESS:
+ return SpanExportResult.SUCCESS
+ return SpanExportResult.FAILURE
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py
new file mode 100644
index 00000000..0b41e28e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py
@@ -0,0 +1,98 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+from typing import Optional, Sequence
+
+# pylint:disable=no-name-in-module
+from fixedint import Int32
+
+from opentelemetry.context import Context
+from opentelemetry.trace import Link, SpanKind, format_trace_id
+from opentelemetry.sdk.trace.sampling import (
+ Decision,
+ Sampler,
+ SamplingResult,
+ _get_parent_trace_state,
+)
+from opentelemetry.trace.span import TraceState
+from opentelemetry.util.types import Attributes
+
+from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY
+
+
+_HASH = 5381
+_INTEGER_MAX: int = Int32.maxval
+_INTEGER_MIN: int = Int32.minval
+
+
+# Sampler is responsible for the following:
+# Implements same trace id hashing algorithm so that traces are sampled the same across multiple nodes (via AI SDKS)
+# Adds item count to span attribute if span is sampled (needed for ingestion service)
+# Inherits from the Sampler interface as defined by OpenTelemetry
+# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#sampler
+class ApplicationInsightsSampler(Sampler):
+ """Sampler that implements the same probability sampling algorithm as the ApplicationInsights SDKs."""
+
+ # sampling_ratio must take a value in the range [0,1]
+ def __init__(self, sampling_ratio: float = 1.0):
+ if not 0.0 <= sampling_ratio <= 1.0:
+ raise ValueError("sampling_ratio must be in the range [0,1]")
+ self._ratio = sampling_ratio
+ self._sample_rate = sampling_ratio * 100
+
+ # pylint:disable=C0301
+ # See https://github.com/microsoft/Telemetry-Collection-Spec/blob/main/OpenTelemetry/trace/ApplicationInsightsSampler.md
+ def should_sample(
+ self,
+ parent_context: Optional[Context],
+ trace_id: int,
+ name: str,
+ kind: Optional[SpanKind] = None,
+ attributes: Attributes = None,
+ links: Optional[Sequence["Link"]] = None,
+ trace_state: Optional["TraceState"] = None,
+ ) -> "SamplingResult":
+ if self._sample_rate == 0:
+ decision = Decision.DROP
+ elif self._sample_rate == 100.0:
+ decision = Decision.RECORD_AND_SAMPLE
+ else:
+ # Determine if should sample from ratio and traceId
+ sample_score = self._get_DJB2_sample_score(format_trace_id(trace_id).lower())
+ if sample_score < self._ratio:
+ decision = Decision.RECORD_AND_SAMPLE
+ else:
+ decision = Decision.DROP
+ # Add sample rate as span attribute
+ if attributes is None:
+ attributes = {}
+ attributes[_SAMPLE_RATE_KEY] = self._sample_rate # type: ignore
+ return SamplingResult(
+ decision,
+ attributes,
+ _get_parent_trace_state(parent_context), # type: ignore
+ )
+
+ def _get_DJB2_sample_score(self, trace_id_hex: str) -> float:
+ # This algorithm uses 32bit integers
+ hash_value = Int32(_HASH)
+ for char in trace_id_hex:
+ hash_value = ((hash_value << 5) + hash_value) + ord(char)
+
+ if hash_value == _INTEGER_MIN:
+ hash_value = int(_INTEGER_MAX)
+ else:
+ hash_value = abs(hash_value)
+
+ # divide by _INTEGER_MAX for value between 0 and 1 for sampling score
+ return float(hash_value) / _INTEGER_MAX
+
+ def get_description(self) -> str:
+ return "ApplicationInsightsSampler{}".format(self._ratio)
+
+
+def azure_monitor_opentelemetry_sampler_factory(sampler_argument): # pylint: disable=name-too-long
+ try:
+ rate = float(sampler_argument)
+ return ApplicationInsightsSampler(rate)
+ except (ValueError, TypeError):
+ return ApplicationInsightsSampler()
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_utils.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_utils.py
new file mode 100644
index 00000000..de012cfd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_utils.py
@@ -0,0 +1,321 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License.
+
+from typing import no_type_check, Optional, Tuple
+from urllib.parse import urlparse
+
+from opentelemetry.semconv.attributes import (
+ client_attributes,
+ server_attributes,
+ url_attributes,
+ user_agent_attributes,
+)
+from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
+from opentelemetry.util.types import Attributes
+
+
+# pylint:disable=too-many-return-statements
+def _get_default_port_db(db_system: str) -> int:
+ if db_system == DbSystemValues.POSTGRESQL.value:
+ return 5432
+ if db_system == DbSystemValues.CASSANDRA.value:
+ return 9042
+ if db_system in (DbSystemValues.MARIADB.value, DbSystemValues.MYSQL.value):
+ return 3306
+ if db_system == DbSystemValues.MSSQL.value:
+ return 1433
+ # TODO: Add in memcached
+ if db_system == "memcached":
+ return 11211
+ if db_system == DbSystemValues.DB2.value:
+ return 50000
+ if db_system == DbSystemValues.ORACLE.value:
+ return 1521
+ if db_system == DbSystemValues.H2.value:
+ return 8082
+ if db_system == DbSystemValues.DERBY.value:
+ return 1527
+ if db_system == DbSystemValues.REDIS.value:
+ return 6379
+ return 0
+
+
+def _get_default_port_http(attributes: Attributes) -> int:
+ scheme = _get_http_scheme(attributes)
+ if scheme == "http":
+ return 80
+ if scheme == "https":
+ return 443
+ return 0
+
+
+def _is_sql_db(db_system: str) -> bool:
+ return db_system in (
+ DbSystemValues.DB2.value,
+ DbSystemValues.DERBY.value,
+ DbSystemValues.MARIADB.value,
+ DbSystemValues.MSSQL.value,
+ DbSystemValues.ORACLE.value,
+ DbSystemValues.SQLITE.value,
+ DbSystemValues.OTHER_SQL.value,
+ # spell-checker:ignore HSQLDB
+ DbSystemValues.HSQLDB.value,
+ DbSystemValues.H2.value,
+ )
+
+
+def _get_azure_sdk_target_source(attributes: Attributes) -> Optional[str]:
+ # Currently logic only works for ServiceBus and EventHub
+ if attributes:
+ # New semconv attributes: https://github.com/Azure/azure-sdk-for-python/pull/29203
+ # TODO: Keep track of when azure-sdk supports stable semconv for these fields
+ peer_address = attributes.get("net.peer.name") or attributes.get("peer.address")
+ destination = attributes.get("messaging.destination.name") or attributes.get("message_bus.destination")
+ if peer_address and destination:
+ return str(peer_address) + "/" + str(destination)
+ return None
+
+
+def _get_http_scheme(attributes: Attributes) -> Optional[str]:
+ if attributes:
+ scheme = attributes.get(url_attributes.URL_SCHEME) or \
+ attributes.get(SpanAttributes.HTTP_SCHEME)
+ if scheme:
+ return str(scheme)
+ return None
+
+
+# Dependency
+
+
+@no_type_check
+def _get_url_for_http_dependency(attributes: Attributes) -> Optional[str]:
+ url = ""
+ if attributes:
+ # Stable sem conv only supports populating url from `url.full`
+ if url_attributes.URL_FULL in attributes:
+ return attributes[url_attributes.URL_FULL]
+ if SpanAttributes.HTTP_URL in attributes:
+ return attributes[SpanAttributes.HTTP_URL]
+ # Scheme
+ scheme = _get_http_scheme(attributes)
+ if scheme and SpanAttributes.HTTP_TARGET in attributes:
+ http_target = attributes[SpanAttributes.HTTP_TARGET]
+ if SpanAttributes.HTTP_HOST in attributes:
+ url = "{}://{}{}".format(
+ str(scheme),
+ attributes[SpanAttributes.HTTP_HOST],
+ http_target,
+ )
+ elif SpanAttributes.NET_PEER_PORT in attributes:
+ peer_port = attributes[SpanAttributes.NET_PEER_PORT]
+ if SpanAttributes.NET_PEER_NAME in attributes:
+ peer_name = attributes[SpanAttributes.NET_PEER_NAME]
+ url = "{}://{}:{}{}".format(
+ scheme,
+ peer_name,
+ peer_port,
+ http_target,
+ )
+ elif SpanAttributes.NET_PEER_IP in attributes:
+ peer_ip = attributes[SpanAttributes.NET_PEER_IP]
+ url = "{}://{}:{}{}".format(
+ scheme,
+ peer_ip,
+ peer_port,
+ http_target,
+ )
+ return url
+
+
+@no_type_check
+def _get_target_for_dependency_from_peer(attributes: Attributes) -> Optional[str]:
+ target = ""
+ if attributes:
+ if SpanAttributes.PEER_SERVICE in attributes:
+ target = attributes[SpanAttributes.PEER_SERVICE]
+ else:
+ if SpanAttributes.NET_PEER_NAME in attributes:
+ target = attributes[SpanAttributes.NET_PEER_NAME]
+ elif SpanAttributes.NET_PEER_IP in attributes:
+ target = attributes[SpanAttributes.NET_PEER_IP]
+ if SpanAttributes.NET_PEER_PORT in attributes:
+ port = attributes[SpanAttributes.NET_PEER_PORT]
+ # TODO: check default port for rpc
+ # This logic assumes default ports never conflict across dependency types
+ if port != _get_default_port_http(attributes) and \
+ port != _get_default_port_db(str(attributes.get(SpanAttributes.DB_SYSTEM))):
+ target = "{}:{}".format(target, port)
+ return target
+
+
+@no_type_check
+def _get_target_and_path_for_http_dependency(
+ attributes: Attributes,
+ url: Optional[str] = "", # Usually populated by _get_url_for_http_dependency()
+) -> Tuple[Optional[str], str]:
+ parsed_url = None
+ target = ""
+ path = "/"
+ default_port = _get_default_port_http(attributes)
+ # Find path from url
+ if not url:
+ url = _get_url_for_http_dependency(attributes)
+ try:
+ parsed_url = urlparse(url)
+ if parsed_url.path:
+ path = parsed_url.path
+ except Exception: # pylint: disable=broad-except
+ pass
+ # Derive target
+ if attributes:
+ # Target from server.*
+ if server_attributes.SERVER_ADDRESS in attributes:
+ target = attributes[server_attributes.SERVER_ADDRESS]
+ server_port = attributes.get(server_attributes.SERVER_PORT)
+ # if not default port, include port in target
+ if server_port != default_port:
+ target = "{}:{}".format(target, server_port)
+ # Target from peer.service
+ elif SpanAttributes.PEER_SERVICE in attributes:
+ target = attributes[SpanAttributes.PEER_SERVICE]
+ # Target from http.host
+ elif SpanAttributes.HTTP_HOST in attributes:
+ host = attributes[SpanAttributes.HTTP_HOST]
+ try:
+ # urlparse insists on absolute URLs starting with "//"
+ # This logic assumes host does not include a "//"
+ host_name = urlparse("//" + str(host))
+ # Ignore port from target if default port
+ if host_name.port == default_port:
+ target = host_name.hostname
+ else:
+ # Else include the whole host as the target
+ target = str(host)
+ except Exception: # pylint: disable=broad-except
+ pass
+ elif parsed_url:
+ # Target from httpUrl
+ if parsed_url.port and parsed_url.port == default_port:
+ if parsed_url.hostname:
+ target = parsed_url.hostname
+ elif parsed_url.netloc:
+ target = parsed_url.netloc
+ if not target:
+ # Get target from peer.* attributes that are NOT peer.service
+ target = _get_target_for_dependency_from_peer(attributes)
+ return (target, path)
+
+
+@no_type_check
+def _get_target_for_db_dependency(
+ target: Optional[str],
+ db_system: Optional[str],
+ attributes: Attributes,
+) -> Optional[str]:
+ if attributes:
+ db_name = attributes.get(SpanAttributes.DB_NAME)
+ if db_name:
+ if not target:
+ target = str(db_name)
+ else:
+ target = "{}|{}".format(target, db_name)
+ elif not target:
+ target = db_system
+ return target
+
+
+@no_type_check
+def _get_target_for_messaging_dependency(target: Optional[str], attributes: Attributes) -> Optional[str]:
+ if attributes:
+ if not target:
+ if SpanAttributes.MESSAGING_DESTINATION in attributes:
+ target = str(attributes[SpanAttributes.MESSAGING_DESTINATION])
+ elif SpanAttributes.MESSAGING_SYSTEM in attributes:
+ target = str(attributes[SpanAttributes.MESSAGING_SYSTEM])
+ return target
+
+
+@no_type_check
+def _get_target_for_rpc_dependency(target: Optional[str], attributes: Attributes) -> Optional[str]:
+ if attributes:
+ if not target:
+ if SpanAttributes.RPC_SYSTEM in attributes:
+ target = str(attributes[SpanAttributes.RPC_SYSTEM])
+ return target
+
+
+# Request
+
+@no_type_check
+def _get_location_ip(attributes: Attributes) -> Optional[str]:
+ return attributes.get(client_attributes.CLIENT_ADDRESS) or \
+ attributes.get(SpanAttributes.HTTP_CLIENT_IP) or \
+ attributes.get(SpanAttributes.NET_PEER_IP) # We assume non-http spans don't have http related attributes
+
+
+@no_type_check
+def _get_user_agent(attributes: Attributes) -> Optional[str]:
+ return attributes.get(user_agent_attributes.USER_AGENT_ORIGINAL) or \
+ attributes.get(SpanAttributes.HTTP_USER_AGENT)
+
+
+@no_type_check
+def _get_url_for_http_request(attributes: Attributes) -> Optional[str]:
+ url = ""
+ if attributes:
+ # Url
+ if url_attributes.URL_FULL in attributes:
+ return attributes[url_attributes.URL_FULL]
+ if SpanAttributes.HTTP_URL in attributes:
+ return attributes[SpanAttributes.HTTP_URL]
+ # Scheme
+ scheme = _get_http_scheme(attributes)
+ # Target
+ http_target = ""
+ if url_attributes.URL_PATH in attributes:
+ http_target = attributes.get(url_attributes.URL_PATH, "")
+ if http_target and url_attributes.URL_QUERY in attributes:
+ http_target = "{}?{}".format(
+ http_target,
+ attributes.get(url_attributes.URL_QUERY, "")
+ )
+ elif SpanAttributes.HTTP_TARGET in attributes:
+ http_target = attributes.get(SpanAttributes.HTTP_TARGET)
+ if scheme and http_target:
+ # Host
+ http_host = ""
+ if server_attributes.SERVER_ADDRESS in attributes:
+ http_host = attributes.get(server_attributes.SERVER_ADDRESS, "")
+ if http_host and server_attributes.SERVER_PORT in attributes:
+ http_host = "{}:{}".format(
+ http_host,
+ attributes.get(server_attributes.SERVER_PORT, "")
+ )
+ elif SpanAttributes.HTTP_HOST in attributes:
+ http_host = attributes.get(SpanAttributes.HTTP_HOST, "")
+ if http_host:
+ url = "{}://{}{}".format(
+ scheme,
+ http_host,
+ http_target,
+ )
+ elif SpanAttributes.HTTP_SERVER_NAME in attributes:
+ server_name = attributes[SpanAttributes.HTTP_SERVER_NAME]
+ host_port = attributes.get(SpanAttributes.NET_HOST_PORT, "")
+ url = "{}://{}:{}{}".format(
+ scheme,
+ server_name,
+ host_port,
+ http_target,
+ )
+ elif SpanAttributes.NET_HOST_NAME in attributes:
+ host_name = attributes[SpanAttributes.NET_HOST_NAME]
+ host_port = attributes.get(SpanAttributes.NET_HOST_PORT, "")
+ url = "{}://{}:{}{}".format(
+ scheme,
+ host_name,
+ host_port,
+ http_target,
+ )
+ return url