diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export')
10 files changed, 1942 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/__init__.py diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/_base.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/_base.py new file mode 100644 index 00000000..d3da36ba --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/_base.py @@ -0,0 +1,435 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import logging +import os +import tempfile +import time +from enum import Enum +from typing import List, Optional, Any +from urllib.parse import urlparse + +from azure.core.exceptions import HttpResponseError, ServiceRequestError +from azure.core.pipeline.policies import ( + ContentDecodePolicy, + HttpLoggingPolicy, + RedirectPolicy, + RequestIdPolicy, +) +from azure.monitor.opentelemetry.exporter._generated import AzureMonitorClient +from azure.monitor.opentelemetry.exporter._generated._configuration import AzureMonitorClientConfiguration +from azure.monitor.opentelemetry.exporter._generated.models import ( + MessageData, + MetricsData, + MonitorDomain, + RemoteDependencyData, + RequestData, + TelemetryEventData, + TelemetryExceptionData, + TelemetryItem, +) +from azure.monitor.opentelemetry.exporter._constants import ( + _AZURE_MONITOR_DISTRO_VERSION_ARG, + _INVALID_STATUS_CODES, + _REACHED_INGESTION_STATUS_CODES, + _REDIRECT_STATUS_CODES, + _REQ_DURATION_NAME, + _REQ_EXCEPTION_NAME, + _REQ_FAILURE_NAME, + _REQ_RETRY_NAME, + _REQ_SUCCESS_NAME, + _REQ_THROTTLE_NAME, + _RETRYABLE_STATUS_CODES, + _THROTTLE_STATUS_CODES, +) +from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser +from azure.monitor.opentelemetry.exporter._storage import LocalFileStorage +from azure.monitor.opentelemetry.exporter._utils import _get_auth_policy +from azure.monitor.opentelemetry.exporter.statsbeat._state import ( + get_statsbeat_initial_success, + get_statsbeat_shutdown, + increment_and_check_statsbeat_failure_count, + is_statsbeat_enabled, + set_statsbeat_initial_success, +) +from azure.monitor.opentelemetry.exporter.statsbeat._utils import _update_requests_map + +logger = logging.getLogger(__name__) + +_AZURE_TEMPDIR_PREFIX = "Microsoft/AzureMonitor" +_TEMPDIR_PREFIX = "opentelemetry-python-" +_SERVICE_API_LATEST = "2020-09-15_Preview" + + +class ExportResult(Enum): + SUCCESS = 0 + FAILED_RETRYABLE = 1 + FAILED_NOT_RETRYABLE = 2 + + +# pylint: disable=broad-except +# pylint: disable=too-many-instance-attributes +# pylint: disable=C0301 +class BaseExporter: + """Azure Monitor base exporter for OpenTelemetry.""" + + def __init__(self, **kwargs: Any) -> None: + """Azure Monitor base exporter for OpenTelemetry. + + :keyword str api_version: The service API version used. Defaults to latest. + :keyword str connection_string: The connection string used for your Application Insights resource. + :keyword ManagedIdentityCredential/ClientSecretCredential credential: Token credential, such as ManagedIdentityCredential or ClientSecretCredential, used for Azure Active Directory (AAD) authentication. Defaults to None. + :keyword bool disable_offline_storage: Determines whether to disable storing failed telemetry records for retry. Defaults to `False`. + :keyword str storage_directory: Storage path in which to store retry files. Defaults to `<tempfile.gettempdir()>/opentelemetry-python-<your-instrumentation-key>`. + :rtype: None + """ + parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string")) + + self._api_version = kwargs.get("api_version") or _SERVICE_API_LATEST + self._credential = kwargs.get("credential") + self._consecutive_redirects = 0 # To prevent circular redirects + self._disable_offline_storage = kwargs.get("disable_offline_storage", False) + self._endpoint = parsed_connection_string.endpoint + self._instrumentation_key = parsed_connection_string.instrumentation_key + self._aad_audience = parsed_connection_string.aad_audience + self._storage_maintenance_period = kwargs.get( + "storage_maintenance_period", 60 + ) # Maintenance interval in seconds. + self._storage_max_size = kwargs.get( + "storage_max_size", 50 * 1024 * 1024 + ) # Maximum size in bytes (default 50MiB) + self._storage_min_retry_interval = kwargs.get( + "storage_min_retry_interval", 60 + ) # minimum retry interval in seconds + temp_suffix = self._instrumentation_key or "" + if "storage_directory" in kwargs: + self._storage_directory = kwargs.get("storage_directory") + elif not self._disable_offline_storage: + self._storage_directory = os.path.join( + tempfile.gettempdir(), _AZURE_TEMPDIR_PREFIX, _TEMPDIR_PREFIX + temp_suffix + ) + else: + self._storage_directory = None + self._storage_retention_period = kwargs.get( + "storage_retention_period", 48 * 60 * 60 + ) # Retention period in seconds (default 48 hrs) + self._timeout = kwargs.get("timeout", 10.0) # networking timeout in seconds + self._distro_version = kwargs.get( + _AZURE_MONITOR_DISTRO_VERSION_ARG, "" + ) # If set, indicates the exporter is instantiated via Azure monitor OpenTelemetry distro. Versions corresponds to distro version. + + config = AzureMonitorClientConfiguration(self._endpoint, **kwargs) + policies = [ + RequestIdPolicy(**kwargs), + config.headers_policy, + config.user_agent_policy, + config.proxy_policy, + ContentDecodePolicy(**kwargs), + # Handle redirects in exporter, set new endpoint if redirected + RedirectPolicy(permit_redirects=False), + config.retry_policy, + _get_auth_policy(self._credential, config.authentication_policy, self._aad_audience), + config.custom_hook_policy, + config.logging_policy, + # Explicitly disabling to avoid infinite loop of Span creation when data is exported + # DistributedTracingPolicy(**kwargs), + config.http_logging_policy or HttpLoggingPolicy(**kwargs), + ] + + self.client = AzureMonitorClient( + host=self._endpoint, connection_timeout=self._timeout, policies=policies, **kwargs + ) + self.storage = None + if not self._disable_offline_storage: + self.storage = LocalFileStorage( + path=self._storage_directory, + max_size=self._storage_max_size, + maintenance_period=self._storage_maintenance_period, + retention_period=self._storage_retention_period, + name="{} Storage".format(self.__class__.__name__), + lease_period=self._storage_min_retry_interval, + ) + # specifies whether current exporter is used for collection of instrumentation metrics + self._instrumentation_collection = kwargs.get("instrumentation_collection", False) + # statsbeat initialization + if self._should_collect_stats(): + # Import here to avoid circular dependencies + from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics + + collect_statsbeat_metrics(self) + + def _transmit_from_storage(self) -> None: + if not self.storage: + return + for blob in self.storage.gets(): + # give a few more seconds for blob lease operation + # to reduce the chance of race (for perf consideration) + if blob.lease(self._timeout + 5): + envelopes = [_format_storage_telemetry_item(TelemetryItem.from_dict(x)) for x in blob.get()] + result = self._transmit(envelopes) + if result == ExportResult.FAILED_RETRYABLE: + blob.lease(1) + else: + blob.delete() + + def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None: + if self.storage: + if result == ExportResult.FAILED_RETRYABLE: + envelopes_to_store = [x.as_dict() for x in envelopes] + self.storage.put(envelopes_to_store) + elif result == ExportResult.SUCCESS: + # Try to send any cached events + self._transmit_from_storage() + + # pylint: disable=too-many-branches + # pylint: disable=too-many-nested-blocks + # pylint: disable=too-many-statements + def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: + """ + Transmit the data envelopes to the ingestion service. + + Returns an ExportResult, this function should never + throw an exception. + :param envelopes: The list of telemetry items to transmit. + :type envelopes: list of ~azure.monitor.opentelemetry.exporter._generated.models.TelemetryItem + :return: The result of the export. + :rtype: ~azure.monitor.opentelemetry.exporter.export._base._ExportResult + """ + if len(envelopes) > 0: + result = ExportResult.SUCCESS + # Track whether or not exporter has successfully reached ingestion + # Currently only used for statsbeat exporter to detect shutdown cases + reach_ingestion = False + start_time = time.time() + try: + track_response = self.client.track(envelopes) + if not track_response.errors: # 200 + self._consecutive_redirects = 0 + if not self._is_stats_exporter(): + logger.info( + "Transmission succeeded: Item received: %s. Items accepted: %s", + track_response.items_received, + track_response.items_accepted, + ) + if self._should_collect_stats(): + _update_requests_map(_REQ_SUCCESS_NAME[1], 1) + reach_ingestion = True + result = ExportResult.SUCCESS + else: # 206 + reach_ingestion = True + resend_envelopes = [] + for error in track_response.errors: + if _is_retryable_code(error.status_code): + resend_envelopes.append(envelopes[error.index]) # type: ignore + else: + if not self._is_stats_exporter(): + logger.error( + "Data drop %s: %s %s.", + error.status_code, + error.message, + envelopes[error.index] if error.index is not None else "", + ) + if self.storage and resend_envelopes: + envelopes_to_store = [x.as_dict() for x in resend_envelopes] + self.storage.put(envelopes_to_store, 0) + self._consecutive_redirects = 0 + # Mark as not retryable because we already write to storage here + result = ExportResult.FAILED_NOT_RETRYABLE + except HttpResponseError as response_error: + # HttpResponseError is raised when a response is received + if _reached_ingestion_code(response_error.status_code): + reach_ingestion = True + if _is_retryable_code(response_error.status_code): + if self._should_collect_stats(): + _update_requests_map(_REQ_RETRY_NAME[1], value=response_error.status_code) + result = ExportResult.FAILED_RETRYABLE + elif _is_throttle_code(response_error.status_code): + if self._should_collect_stats(): + _update_requests_map(_REQ_THROTTLE_NAME[1], value=response_error.status_code) + result = ExportResult.FAILED_NOT_RETRYABLE + elif _is_redirect_code(response_error.status_code): + self._consecutive_redirects = self._consecutive_redirects + 1 + # pylint: disable=W0212 + if self._consecutive_redirects < self.client._config.redirect_policy.max_redirects: # type: ignore + if response_error.response and response_error.response.headers: # type: ignore + redirect_has_headers = True + location = response_error.response.headers.get("location") # type: ignore + url = urlparse(location) + else: + redirect_has_headers = False + if redirect_has_headers and url.scheme and url.netloc: # pylint: disable=E0606 + # Change the host to the new redirected host + self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212 + # Attempt to export again + result = self._transmit(envelopes) + else: + if not self._is_stats_exporter(): + logger.error( + "Error parsing redirect information.", + ) + result = ExportResult.FAILED_NOT_RETRYABLE + else: + if not self._is_stats_exporter(): + logger.error( + "Error sending telemetry because of circular redirects. " + "Please check the integrity of your connection string." + ) + # If redirect but did not return, exception occurred + if self._should_collect_stats(): + _update_requests_map(_REQ_EXCEPTION_NAME[1], value="Circular Redirect") + result = ExportResult.FAILED_NOT_RETRYABLE + else: + # Any other status code counts as failure (non-retryable) + # 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey, etc.) + # 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string + if self._should_collect_stats(): + _update_requests_map(_REQ_FAILURE_NAME[1], value=response_error.status_code) + if not self._is_stats_exporter(): + logger.error( + "Non-retryable server side error: %s.", + response_error.message, + ) + if _is_invalid_code(response_error.status_code): + # Shutdown statsbeat on invalid code from customer endpoint + # Import here to avoid circular dependencies + from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import ( + shutdown_statsbeat_metrics, + ) + + shutdown_statsbeat_metrics() + result = ExportResult.FAILED_NOT_RETRYABLE + except ServiceRequestError as request_error: + # Errors when we're fairly sure that the server did not receive the + # request, so it should be safe to retry. + # ServiceRequestError is raised by azure.core for these cases + logger.warning("Retrying due to server request error: %s.", request_error.message) + if self._should_collect_stats(): + exc_type = request_error.exc_type + if exc_type is None or exc_type is type(None): + exc_type = request_error.__class__.__name__ # type: ignore + _update_requests_map(_REQ_EXCEPTION_NAME[1], value=exc_type) + result = ExportResult.FAILED_RETRYABLE + except Exception as ex: + logger.exception("Envelopes could not be exported and are not retryable: %s.") + if self._should_collect_stats(): + _update_requests_map(_REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__) + result = ExportResult.FAILED_NOT_RETRYABLE + finally: + if self._should_collect_stats(): + end_time = time.time() + _update_requests_map("count", 1) + _update_requests_map(_REQ_DURATION_NAME[1], value=end_time - start_time) + if self._is_statsbeat_initializing_state(): + # Update statsbeat initial success state if reached ingestion + if reach_ingestion: + set_statsbeat_initial_success(True) + else: + # if didn't reach ingestion, increment and check if failure threshold + # has been reached during attempting statsbeat initialization + if increment_and_check_statsbeat_failure_count(): + # Import here to avoid circular dependencies + from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import ( + shutdown_statsbeat_metrics, + ) + + shutdown_statsbeat_metrics() + # pylint: disable=lost-exception + return ExportResult.FAILED_NOT_RETRYABLE # pylint: disable=W0134 + # pylint: disable=lost-exception + return result # pylint: disable=W0134 + + # No spans to export + self._consecutive_redirects = 0 + return ExportResult.SUCCESS + + # check to see whether its the case of stats collection + def _should_collect_stats(self): + return ( + is_statsbeat_enabled() + and not get_statsbeat_shutdown() + and not self._is_stats_exporter() + and not self._instrumentation_collection + ) + + # check to see if statsbeat is in "attempting to be initialized" state + def _is_statsbeat_initializing_state(self): + return self._is_stats_exporter() and not get_statsbeat_shutdown() and not get_statsbeat_initial_success() + + def _is_stats_exporter(self): + return self.__class__.__name__ == "_StatsBeatExporter" + + +def _is_invalid_code(response_code: Optional[int]) -> bool: + """Determine if response is a invalid response. + + :param int response_code: HTTP response code + :return: True if response is a invalid response + :rtype: bool + """ + return response_code in _INVALID_STATUS_CODES + + +def _is_redirect_code(response_code: Optional[int]) -> bool: + """Determine if response is a redirect response. + + :param int response_code: HTTP response code + :return: True if response is a redirect response + :rtype: bool + """ + return response_code in _REDIRECT_STATUS_CODES + + +def _is_retryable_code(response_code: Optional[int]) -> bool: + """Determine if response is retryable. + + :param int response_code: HTTP response code + :return: True if response is retryable + :rtype: bool + """ + return response_code in _RETRYABLE_STATUS_CODES + + +def _is_throttle_code(response_code: Optional[int]) -> bool: + """Determine if response is throttle response. + + :param int response_code: HTTP response code + :return: True if response is throttle response + :rtype: bool + """ + return response_code in _THROTTLE_STATUS_CODES + + +def _reached_ingestion_code(response_code: Optional[int]) -> bool: + """Determine if response indicates ingestion service has been reached. + + :param int response_code: HTTP response code + :return: True if response indicates ingestion service has been reached + :rtype: bool + """ + return response_code in _REACHED_INGESTION_STATUS_CODES + + +_MONITOR_DOMAIN_MAPPING = { + "EventData": TelemetryEventData, + "ExceptionData": TelemetryExceptionData, + "MessageData": MessageData, + "MetricData": MetricsData, + "RemoteDependencyData": RemoteDependencyData, + "RequestData": RequestData, +} + + +# from_dict() deserializes incorrectly, format TelemetryItem correctly after it +# is called +def _format_storage_telemetry_item(item: TelemetryItem) -> TelemetryItem: + # After TelemetryItem.from_dict, all base_data fields are stored in + # additional_properties as a dict instead of in item.data.base_data itself + # item.data.base_data is also of type MonitorDomain instead of a child class + if hasattr(item, "data") and item.data is not None: + if hasattr(item.data, "base_data") and isinstance(item.data.base_data, MonitorDomain): + if hasattr(item.data, "base_type") and isinstance(item.data.base_type, str): + base_type = _MONITOR_DOMAIN_MAPPING.get(item.data.base_type) + # Apply deserialization of additional_properties and store that as base_data + if base_type: + item.data.base_data = base_type.from_dict(item.data.base_data.additional_properties) # type: ignore + item.data.base_data.additional_properties = None # type: ignore + return item diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/__init__.py diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py new file mode 100644 index 00000000..e7ae4e89 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py @@ -0,0 +1,244 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import json +import logging +from typing import Optional, Sequence, Any + +from opentelemetry._logs.severity import SeverityNumber +from opentelemetry.semconv.attributes.exception_attributes import ( + EXCEPTION_ESCAPED, + EXCEPTION_MESSAGE, + EXCEPTION_STACKTRACE, + EXCEPTION_TYPE, +) +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import LogExporter, LogExportResult + +from azure.monitor.opentelemetry.exporter import _utils +from azure.monitor.opentelemetry.exporter._constants import ( + _EXCEPTION_ENVELOPE_NAME, + _MESSAGE_ENVELOPE_NAME, +) +from azure.monitor.opentelemetry.exporter._generated.models import ( + ContextTagKeys, + MessageData, + MonitorBase, + TelemetryEventData, + TelemetryExceptionData, + TelemetryExceptionDetails, + TelemetryItem, +) +from azure.monitor.opentelemetry.exporter.export._base import ( + BaseExporter, + ExportResult, +) +from azure.monitor.opentelemetry.exporter.export.trace import _utils as trace_utils +from azure.monitor.opentelemetry.exporter._constants import ( + _APPLICATION_INSIGHTS_EVENT_MARKER_ATTRIBUTE, + _MICROSOFT_CUSTOM_EVENT_NAME, +) +from azure.monitor.opentelemetry.exporter.statsbeat._state import ( + get_statsbeat_shutdown, + get_statsbeat_custom_events_feature_set, + is_statsbeat_enabled, + set_statsbeat_custom_events_feature_set, +) + +_logger = logging.getLogger(__name__) + +_DEFAULT_SPAN_ID = 0 +_DEFAULT_TRACE_ID = 0 + +__all__ = ["AzureMonitorLogExporter"] + + +class AzureMonitorLogExporter(BaseExporter, LogExporter): + """Azure Monitor Log exporter for OpenTelemetry.""" + + def export(self, batch: Sequence[LogData], **kwargs: Any) -> LogExportResult: # pylint: disable=unused-argument + """Export log data. + + :param batch: OpenTelemetry LogData(s) to export. + :type batch: ~typing.Sequence[~opentelemetry._logs.LogData] + :return: The result of the export. + :rtype: ~opentelemetry.sdk._logs.export.LogData + """ + envelopes = [self._log_to_envelope(log) for log in batch] + try: + result = self._transmit(envelopes) + self._handle_transmit_from_storage(envelopes, result) + return _get_log_export_result(result) + except Exception: # pylint: disable=broad-except + _logger.exception("Exception occurred while exporting the data.") + return _get_log_export_result(ExportResult.FAILED_NOT_RETRYABLE) + + def shutdown(self) -> None: + """Shuts down the exporter. + + Called when the SDK is shut down. + """ + if self.storage: + self.storage.close() + + def _log_to_envelope(self, log_data: LogData) -> TelemetryItem: + envelope = _convert_log_to_envelope(log_data) + envelope.instrumentation_key = self._instrumentation_key + return envelope + + # pylint: disable=docstring-keyword-should-match-keyword-only + @classmethod + def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorLogExporter": + """ + Create an AzureMonitorLogExporter from a connection string. This is the + recommended way of instantiation if a connection string is passed in + explicitly. If a user wants to use a connection string provided by + environment variable, the constructor of the exporter can be called + directly. + + :param str conn_str: The connection string to be used for + authentication. + :keyword str api_version: The service API version used. Defaults to + latest. + :return: an instance of ~AzureMonitorLogExporter + :rtype: ~azure.monitor.opentelemetry.exporter.AzureMonitorLogExporter + """ + return cls(connection_string=conn_str, **kwargs) + + +def _log_data_is_event(log_data: LogData) -> bool: + log_record = log_data.log_record + is_event = None + if log_record.attributes: + is_event = log_record.attributes.get(_MICROSOFT_CUSTOM_EVENT_NAME) or \ + log_record.attributes.get(_APPLICATION_INSIGHTS_EVENT_MARKER_ATTRIBUTE) # type: ignore + return is_event is not None + + +# pylint: disable=protected-access +def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem: + log_record = log_data.log_record + time_stamp = log_record.timestamp if log_record.timestamp is not None else log_record.observed_timestamp + envelope = _utils._create_telemetry_item(time_stamp) + envelope.tags.update(_utils._populate_part_a_fields(log_record.resource)) # type: ignore + envelope.tags[ContextTagKeys.AI_OPERATION_ID] = "{:032x}".format( # type: ignore + log_record.trace_id or _DEFAULT_TRACE_ID + ) + envelope.tags[ContextTagKeys.AI_OPERATION_PARENT_ID] = "{:016x}".format( # type: ignore + log_record.span_id or _DEFAULT_SPAN_ID + ) + # Special use case: Customers want to be able to set location ip on log records + location_ip = trace_utils._get_location_ip(log_record.attributes) + if location_ip: + envelope.tags[ContextTagKeys.AI_LOCATION_IP] = location_ip # type: ignore + properties = _utils._filter_custom_properties( + log_record.attributes, lambda key, val: not _is_ignored_attribute(key) + ) + exc_type = exc_message = stack_trace = None + if log_record.attributes: + exc_type = log_record.attributes.get(EXCEPTION_TYPE) + exc_message = log_record.attributes.get(EXCEPTION_MESSAGE) + stack_trace = log_record.attributes.get(EXCEPTION_STACKTRACE) + severity_level = _get_severity_level(log_record.severity_number) + + # Exception telemetry + if exc_type is not None or exc_message is not None: + envelope.name = _EXCEPTION_ENVELOPE_NAME + has_full_stack = stack_trace is not None + if not exc_type: + exc_type = "Exception" + # Log body takes priority for message + if log_record.body: + message = _map_body_to_message(log_record.body) + elif exc_message: + message = exc_message # type: ignore + else: + message = "Exception" + exc_details = TelemetryExceptionDetails( + type_name=str(exc_type)[:1024], # type: ignore + message=str(message)[:32768], + has_full_stack=has_full_stack, + stack=str(stack_trace)[:32768], + ) + data = TelemetryExceptionData( # type: ignore + severity_level=severity_level, # type: ignore + properties=properties, + exceptions=[exc_details], + ) + envelope.data = MonitorBase(base_data=data, base_type="ExceptionData") + elif _log_data_is_event(log_data): # Event telemetry + _set_statsbeat_custom_events_feature() + envelope.name = "Microsoft.ApplicationInsights.Event" + event_name = "" + if log_record.attributes.get(_MICROSOFT_CUSTOM_EVENT_NAME): # type: ignore + event_name = str(log_record.attributes.get(_MICROSOFT_CUSTOM_EVENT_NAME)) # type: ignore + else: + event_name = _map_body_to_message(log_record.body) + data = TelemetryEventData( # type: ignore + name=event_name, + properties=properties, + ) + envelope.data = MonitorBase(base_data=data, base_type="EventData") + else: # Message telemetry + envelope.name = _MESSAGE_ENVELOPE_NAME + # pylint: disable=line-too-long + # Severity number: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber + data = MessageData( # type: ignore + message=_map_body_to_message(log_record.body), + severity_level=severity_level, # type: ignore + properties=properties, + ) + envelope.data = MonitorBase(base_data=data, base_type="MessageData") + + return envelope + + +def _get_log_export_result(result: ExportResult) -> LogExportResult: + if result == ExportResult.SUCCESS: + return LogExportResult.SUCCESS + return LogExportResult.FAILURE + + +# pylint: disable=line-too-long +# Common schema: https://github.com/microsoft/common-schema/blob/main/v4.0/Mappings/AzureMonitor-AI.md#exceptionseveritylevel +# SeverityNumber specs: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber +def _get_severity_level(severity_number: Optional[SeverityNumber]): + if severity_number is None or severity_number.value < 9: + return 0 + return int((severity_number.value - 1) / 4 - 1) + + +def _map_body_to_message(log_body: Any) -> str: + if not log_body: + return "" + + if isinstance(log_body, str): + return log_body[:32768] + + if isinstance(log_body, Exception): + return str(log_body)[:32768] + + try: + return json.dumps(log_body)[:32768] + except: # pylint: disable=bare-except + return str(log_body)[:32768] + + +def _is_ignored_attribute(key: str) -> bool: + return key in _IGNORED_ATTRS + + +_IGNORED_ATTRS = frozenset( + ( + EXCEPTION_TYPE, + EXCEPTION_MESSAGE, + EXCEPTION_STACKTRACE, + EXCEPTION_ESCAPED, + _APPLICATION_INSIGHTS_EVENT_MARKER_ATTRIBUTE, + _MICROSOFT_CUSTOM_EVENT_NAME, + ) +) + + +def _set_statsbeat_custom_events_feature(): + if is_statsbeat_enabled() and not get_statsbeat_shutdown() and not get_statsbeat_custom_events_feature_set(): + set_statsbeat_custom_events_feature_set() diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/__init__.py diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py new file mode 100644 index 00000000..98ed6a47 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py @@ -0,0 +1,291 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import logging +import os + +from typing import Dict, Optional, Union, Any + +from opentelemetry.util.types import Attributes +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + DataPointT, + HistogramDataPoint, + MetricExporter, + MetricExportResult, + MetricsData as OTMetricsData, + NumberDataPoint, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.semconv.attributes.http_attributes import HTTP_RESPONSE_STATUS_CODE +from opentelemetry.semconv.metrics import MetricInstruments +from opentelemetry.semconv.metrics.http_metrics import ( + HTTP_CLIENT_REQUEST_DURATION, + HTTP_SERVER_REQUEST_DURATION, +) +from opentelemetry.semconv.trace import SpanAttributes + +from azure.monitor.opentelemetry.exporter._constants import ( + _APPLICATIONINSIGHTS_METRIC_NAMESPACE_OPT_IN, + _AUTOCOLLECTED_INSTRUMENT_NAMES, + _METRIC_ENVELOPE_NAME, +) +from azure.monitor.opentelemetry.exporter import _utils +from azure.monitor.opentelemetry.exporter._generated.models import ( + MetricDataPoint, + MetricsData, + MonitorBase, + TelemetryItem, +) +from azure.monitor.opentelemetry.exporter.export._base import ( + BaseExporter, + ExportResult, +) +from azure.monitor.opentelemetry.exporter.export.trace import _utils as trace_utils + + +_logger = logging.getLogger(__name__) + +__all__ = ["AzureMonitorMetricExporter"] + + +APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = { + Counter: AggregationTemporality.DELTA, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableGauge: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + UpDownCounter: AggregationTemporality.CUMULATIVE, +} + + +class AzureMonitorMetricExporter(BaseExporter, MetricExporter): + """Azure Monitor Metric exporter for OpenTelemetry.""" + + def __init__(self, **kwargs: Any) -> None: + BaseExporter.__init__(self, **kwargs) + MetricExporter.__init__( + self, + preferred_temporality=APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore + preferred_aggregation=kwargs.get("preferred_aggregation"), # type: ignore + ) + + # pylint: disable=R1702 + def export( + self, + metrics_data: OTMetricsData, + timeout_millis: float = 10_000, + **kwargs: Any, + ) -> MetricExportResult: + """Exports a batch of metric data + + :param metrics_data: OpenTelemetry Metric(s) to export. + :type metrics_data: Sequence[~opentelemetry.sdk.metrics._internal.point.MetricsData] + :param timeout_millis: The maximum amount of time to wait for each export. Not currently used. + :type timeout_millis: float + :return: The result of the export. + :rtype: ~opentelemetry.sdk.metrics.export.MetricExportResult + """ + envelopes = [] + if metrics_data is None: + return MetricExportResult.SUCCESS + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for point in metric.data.data_points: + if point is not None: + envelope = self._point_to_envelope( + point, + metric.name, + resource_metric.resource, + scope_metric.scope, + ) + if envelope is not None: + envelopes.append(envelope) + try: + result = self._transmit(envelopes) + self._handle_transmit_from_storage(envelopes, result) + return _get_metric_export_result(result) + except Exception: # pylint: disable=broad-except + _logger.exception("Exception occurred while exporting the data.") + return _get_metric_export_result(ExportResult.FAILED_NOT_RETRYABLE) + + def force_flush( + self, + timeout_millis: float = 10_000, + ) -> bool: + # Ensure that export of any metrics currently received by the exporter are completed as soon as possible. + + return True + + def shutdown( + self, + timeout_millis: float = 30_000, + **kwargs: Any, + ) -> None: + """Shuts down the exporter. + + Called when the SDK is shut down. + + :param timeout_millis: The maximum amount of time to wait for shutdown. Not currently used. + :type timeout_millis: float + """ + if self.storage: + self.storage.close() + + def _point_to_envelope( + self, + point: DataPointT, + name: str, + resource: Optional[Resource] = None, + scope: Optional[InstrumentationScope] = None, + ) -> Optional[TelemetryItem]: + envelope = _convert_point_to_envelope(point, name, resource, scope) + if name in _AUTOCOLLECTED_INSTRUMENT_NAMES: + envelope = _handle_std_metric_envelope(envelope, name, point.attributes) # type: ignore + if envelope is not None: + envelope.instrumentation_key = self._instrumentation_key + return envelope + + # pylint: disable=docstring-keyword-should-match-keyword-only + @classmethod + def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorMetricExporter": + """ + Create an AzureMonitorMetricExporter from a connection string. This is + the recommended way of instantiation if a connection string is passed in + explicitly. If a user wants to use a connection string provided by + environment variable, the constructor of the exporter can be called + directly. + + :param str conn_str: The connection string to be used for + authentication. + :keyword str api_version: The service API version used. Defaults to + latest. + :return: An instance of ~AzureMonitorMetricExporter + :rtype: ~azure.monitor.opentelemetry.exporter.AzureMonitorMetricExporter + """ + return cls(connection_string=conn_str, **kwargs) + + +# pylint: disable=protected-access +def _convert_point_to_envelope( + point: DataPointT, name: str, resource: Optional[Resource] = None, scope: Optional[InstrumentationScope] = None +) -> TelemetryItem: + envelope = _utils._create_telemetry_item(point.time_unix_nano) + envelope.name = _METRIC_ENVELOPE_NAME + envelope.tags.update(_utils._populate_part_a_fields(resource)) # type: ignore + namespace = None + if scope is not None and _is_metric_namespace_opted_in(): + namespace = str(scope.name)[:256] + value: Union[int, float] = 0 + count = 1 + min_ = None + max_ = None + # std_dev = None + + if isinstance(point, NumberDataPoint): + value = point.value + elif isinstance(point, HistogramDataPoint): + value = point.sum + count = int(point.count) + min_ = point.min + max_ = point.max + + # truncation logic + properties = _utils._filter_custom_properties(point.attributes) + + data_point = MetricDataPoint( + name=str(name)[:1024], + namespace=namespace, + value=value, + count=count, + min=min_, + max=max_, + ) + + data = MetricsData( + properties=properties, + metrics=[data_point], + ) + + envelope.data = MonitorBase(base_data=data, base_type="MetricData") + + return envelope + + +def _handle_std_metric_envelope( + envelope: TelemetryItem, + name: str, + attributes: Attributes, +) -> Optional[TelemetryItem]: + properties: Dict[str, str] = {} + tags = envelope.tags + if not attributes: + attributes = {} + status_code = attributes.get(HTTP_RESPONSE_STATUS_CODE) or attributes.get(SpanAttributes.HTTP_STATUS_CODE) + if status_code: + try: + status_code = int(status_code) # type: ignore + except ValueError: + status_code = 0 + else: + status_code = 0 + if name in (HTTP_CLIENT_REQUEST_DURATION, MetricInstruments.HTTP_CLIENT_DURATION): + properties["_MS.MetricId"] = "dependencies/duration" + properties["_MS.IsAutocollected"] = "True" + properties["Dependency.Type"] = "http" + properties["Dependency.Success"] = str(_is_status_code_success(status_code)) # type: ignore + target, _ = trace_utils._get_target_and_path_for_http_dependency(attributes) + properties["dependency/target"] = target # type: ignore + properties["dependency/resultCode"] = str(status_code) + properties["cloud/roleInstance"] = tags["ai.cloud.roleInstance"] # type: ignore + properties["cloud/roleName"] = tags["ai.cloud.role"] # type: ignore + elif name in (HTTP_SERVER_REQUEST_DURATION, MetricInstruments.HTTP_SERVER_DURATION): + properties["_MS.MetricId"] = "requests/duration" + properties["_MS.IsAutocollected"] = "True" + properties["request/resultCode"] = str(status_code) + # TODO: Change to symbol once released in upstream + if attributes.get("user_agent.synthetic.type"): + properties["operation/synthetic"] = "True" + properties["cloud/roleInstance"] = tags["ai.cloud.roleInstance"] # type: ignore + properties["cloud/roleName"] = tags["ai.cloud.role"] # type: ignore + properties["Request.Success"] = str(_is_status_code_success(status_code)) # type: ignore + else: + # Any other autocollected metrics are not supported yet for standard metrics + # We ignore these envelopes in these cases + return None + + # TODO: rpc, database, messaging + + envelope.data.base_data.properties = properties # type: ignore + + return envelope + + +def _is_status_code_success(status_code: Optional[str]) -> bool: + if status_code is None or status_code == 0: + return False + try: + # Success criteria based solely off status code is True only if status_code < 400 + # for both client and server spans + return int(status_code) < 400 + except ValueError: + return False + + +def _is_metric_namespace_opted_in() -> bool: + return os.environ.get(_APPLICATIONINSIGHTS_METRIC_NAMESPACE_OPT_IN, "False").lower() == "true" + + +def _get_metric_export_result(result: ExportResult) -> MetricExportResult: + if result == ExportResult.SUCCESS: + return MetricExportResult.SUCCESS + return MetricExportResult.FAILURE diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/__init__.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/__init__.py diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py new file mode 100644 index 00000000..c1d51b7d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py @@ -0,0 +1,553 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from os import environ +import json +import logging +from time import time_ns +from typing import no_type_check, Any, Dict, List, Sequence +from urllib.parse import urlparse + +from opentelemetry.semconv.attributes.client_attributes import CLIENT_ADDRESS +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_REQUEST_METHOD, + HTTP_RESPONSE_STATUS_CODE, +) +from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes +from opentelemetry.semconv._incubating.attributes import gen_ai_attributes +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.trace import SpanKind, get_tracer_provider + +from azure.monitor.opentelemetry.exporter._constants import ( + _APPLICATIONINSIGHTS_OPENTELEMETRY_RESOURCE_METRIC_DISABLED, + _AZURE_SDK_NAMESPACE_NAME, + _AZURE_SDK_OPENTELEMETRY_NAME, + _INSTRUMENTATION_SUPPORTING_METRICS_LIST, + _SAMPLE_RATE_KEY, + _METRIC_ENVELOPE_NAME, + _MESSAGE_ENVELOPE_NAME, + _REQUEST_ENVELOPE_NAME, + _EXCEPTION_ENVELOPE_NAME, + _REMOTE_DEPENDENCY_ENVELOPE_NAME, +) +from azure.monitor.opentelemetry.exporter import _utils +from azure.monitor.opentelemetry.exporter._generated.models import ( + ContextTagKeys, + MessageData, + MetricDataPoint, + MetricsData, + MonitorBase, + RemoteDependencyData, + RequestData, + TelemetryExceptionData, + TelemetryExceptionDetails, + TelemetryItem, +) +from azure.monitor.opentelemetry.exporter.export._base import ( + BaseExporter, + ExportResult, +) +from . import _utils as trace_utils + + +_logger = logging.getLogger(__name__) + +__all__ = ["AzureMonitorTraceExporter"] + +_STANDARD_OPENTELEMETRY_ATTRIBUTE_PREFIXES = [ + "http.", + "db.", + "message.", + "messaging.", + "rpc.", + "enduser.", + "net.", + "peer.", + "exception.", + "thread.", + "fass.", + "code.", +] + +_STANDARD_OPENTELEMETRY_HTTP_ATTRIBUTES = [ + "client.address", + "client.port", + "server.address", + "server.port", + "url.full", + "url.path", + "url.query", + "url.scheme", + "url.template", + "error.type", + "network.local.address", + "network.local.port", + "network.protocol.name", + "network.peer.address", + "network.peer.port", + "network.protocol.version", + "network.transport", + "user_agent.original", + "user_agent.synthetic.type", +] + +_STANDARD_AZURE_MONITOR_ATTRIBUTES = [ + _SAMPLE_RATE_KEY, +] + + +class AzureMonitorTraceExporter(BaseExporter, SpanExporter): + """Azure Monitor Trace exporter for OpenTelemetry.""" + + def __init__(self, **kwargs: Any): + self._tracer_provider = kwargs.pop("tracer_provider", None) + super().__init__(**kwargs) + + def export( + self, spans: Sequence[ReadableSpan], **kwargs: Any # pylint: disable=unused-argument + ) -> SpanExportResult: + """Export span data. + + :param spans: Open Telemetry Spans to export. + :type spans: ~typing.Sequence[~opentelemetry.trace.Span] + :return: The result of the export. + :rtype: ~opentelemetry.sdk.trace.export.SpanExportResult + """ + envelopes = [] + if spans and self._should_collect_otel_resource_metric(): + resource = None + try: + tracer_provider = self._tracer_provider or get_tracer_provider() + resource = tracer_provider.resource # type: ignore + envelopes.append(self._get_otel_resource_envelope(resource)) + except AttributeError as e: + _logger.exception("Failed to derive Resource from Tracer Provider: %s", e) + for span in spans: + envelopes.append(self._span_to_envelope(span)) + envelopes.extend(self._span_events_to_envelopes(span)) + try: + result = self._transmit(envelopes) + self._handle_transmit_from_storage(envelopes, result) + return _get_trace_export_result(result) + except Exception: # pylint: disable=broad-except + _logger.exception("Exception occurred while exporting the data.") + return _get_trace_export_result(ExportResult.FAILED_NOT_RETRYABLE) + + def shutdown(self) -> None: + """Shuts down the exporter. + + Called when the SDK is shut down. + """ + if self.storage: + self.storage.close() + + # pylint: disable=protected-access + def _get_otel_resource_envelope(self, resource: Resource) -> TelemetryItem: + attributes: Dict[str, str] = {} + if resource: + attributes = resource.attributes + envelope = _utils._create_telemetry_item(time_ns()) + envelope.name = _METRIC_ENVELOPE_NAME + envelope.tags.update(_utils._populate_part_a_fields(resource)) # pylint: disable=W0212 + envelope.instrumentation_key = self._instrumentation_key + data_point = MetricDataPoint( + name="_OTELRESOURCE_"[:1024], + value=0, + ) + + data = MetricsData( + properties=attributes, + metrics=[data_point], + ) + + envelope.data = MonitorBase(base_data=data, base_type="MetricData") + + return envelope + + def _span_to_envelope(self, span: ReadableSpan) -> TelemetryItem: + envelope = _convert_span_to_envelope(span) + envelope.instrumentation_key = self._instrumentation_key + return envelope # type: ignore + + def _span_events_to_envelopes(self, span: ReadableSpan) -> Sequence[TelemetryItem]: + if not span or len(span.events) == 0: + return [] + envelopes = _convert_span_events_to_envelopes(span) + for envelope in envelopes: + envelope.instrumentation_key = self._instrumentation_key + return envelopes + + def _should_collect_otel_resource_metric(self): + disabled = environ.get(_APPLICATIONINSIGHTS_OPENTELEMETRY_RESOURCE_METRIC_DISABLED) + return disabled is None or disabled.lower() != "true" + + # pylint: disable=docstring-keyword-should-match-keyword-only + @classmethod + def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorTraceExporter": + """ + Create an AzureMonitorTraceExporter from a connection string. This is + the recommended way of instantiation if a connection string is passed in + explicitly. If a user wants to use a connection string provided by + environment variable, the constructor of the exporter can be called + directly. + + :param str conn_str: The connection string to be used for + authentication. + :keyword str api_version: The service API version used. Defaults to + latest. + :return: an instance of ~AzureMonitorTraceExporter + :rtype: ~azure.monitor.opentelemetry.exporter.AzureMonitorTraceExporter + """ + return cls(connection_string=conn_str, **kwargs) + + +# pylint: disable=too-many-statements +# pylint: disable=too-many-branches +# pylint: disable=protected-access +# mypy: disable-error-code="assignment,attr-defined,index,operator,union-attr" +@no_type_check +def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem: + # Update instrumentation bitmap if span was generated from instrumentation + _check_instrumentation_span(span) + duration = 0 + start_time = 0 + if span.start_time: + start_time = span.start_time + if span.end_time: + duration = span.end_time - span.start_time + envelope = _utils._create_telemetry_item(start_time) + envelope.tags.update(_utils._populate_part_a_fields(span.resource)) + envelope.tags[ContextTagKeys.AI_OPERATION_ID] = "{:032x}".format(span.context.trace_id) + if SpanAttributes.ENDUSER_ID in span.attributes: + envelope.tags[ContextTagKeys.AI_USER_ID] = span.attributes[SpanAttributes.ENDUSER_ID] + if span.parent and span.parent.span_id: + envelope.tags[ContextTagKeys.AI_OPERATION_PARENT_ID] = "{:016x}".format(span.parent.span_id) + if span.kind in (SpanKind.CONSUMER, SpanKind.SERVER): + envelope.name = _REQUEST_ENVELOPE_NAME + data = RequestData( + name=span.name, + id="{:016x}".format(span.context.span_id), + duration=_utils.ns_to_duration(duration), + response_code="0", + success=span.status.is_ok, + properties={}, + measurements={}, + ) + envelope.data = MonitorBase(base_data=data, base_type="RequestData") + envelope.tags[ContextTagKeys.AI_OPERATION_NAME] = span.name + location_ip = trace_utils._get_location_ip(span.attributes) + if location_ip: + envelope.tags[ContextTagKeys.AI_LOCATION_IP] = location_ip + if _AZURE_SDK_NAMESPACE_NAME in span.attributes: # Azure specific resources + # Currently only eventhub and servicebus are supported (kind CONSUMER) + data.source = trace_utils._get_azure_sdk_target_source(span.attributes) + if span.links: + total = 0 + for link in span.links: + attributes = link.attributes + enqueued_time = attributes.get("enqueuedTime") + if isinstance(enqueued_time, int): + difference = (start_time / 1000000) - enqueued_time + total += difference + data.measurements["timeSinceEnqueued"] = max(0, total / len(span.links)) + elif HTTP_REQUEST_METHOD in span.attributes or SpanAttributes.HTTP_METHOD in span.attributes: # HTTP + path = "" + user_agent = trace_utils._get_user_agent(span.attributes) + if user_agent: + # TODO: Not exposed in Swagger, need to update def + envelope.tags["ai.user.userAgent"] = user_agent + # url + url = trace_utils._get_url_for_http_request(span.attributes) + data.url = url + # Http specific logic for ai.operation.name + if SpanAttributes.HTTP_ROUTE in span.attributes: + envelope.tags[ContextTagKeys.AI_OPERATION_NAME] = "{} {}".format( + span.attributes.get(HTTP_REQUEST_METHOD) or span.attributes.get(SpanAttributes.HTTP_METHOD), + span.attributes[SpanAttributes.HTTP_ROUTE], + ) + elif url: + try: + parse_url = urlparse(url) + path = parse_url.path + if not path: + path = "/" + envelope.tags[ContextTagKeys.AI_OPERATION_NAME] = "{} {}".format( + span.attributes.get(HTTP_REQUEST_METHOD) or span.attributes.get(SpanAttributes.HTTP_METHOD), + path, + ) + except Exception: # pylint: disable=broad-except + pass + status_code = span.attributes.get(HTTP_RESPONSE_STATUS_CODE) \ + or span.attributes.get(SpanAttributes.HTTP_STATUS_CODE) + if status_code: + try: + status_code = int(status_code) # type: ignore + except ValueError: + status_code = 0 + else: + status_code = 0 + data.response_code = str(status_code) + # Success criteria for server spans depends on span.success and the actual status code + data.success = span.status.is_ok and status_code and status_code not in range(400, 500) + elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging + if span.attributes.get(SpanAttributes.MESSAGING_DESTINATION): + if span.attributes.get(CLIENT_ADDRESS) or span.attributes.get(SpanAttributes.NET_PEER_NAME): + data.source = "{}/{}".format( + span.attributes.get(CLIENT_ADDRESS) or span.attributes.get(SpanAttributes.NET_PEER_NAME), + span.attributes.get(SpanAttributes.MESSAGING_DESTINATION), + ) + elif span.attributes.get(SpanAttributes.NET_PEER_IP): + data.source = "{}/{}".format( + span.attributes[SpanAttributes.NET_PEER_IP], + span.attributes.get(SpanAttributes.MESSAGING_DESTINATION), + ) + else: + data.source = span.attributes.get(SpanAttributes.MESSAGING_DESTINATION, "") + # Apply truncation + # See https://github.com/MohanGsk/ApplicationInsights-Home/tree/master/EndpointSpecs/Schemas/Bond + if envelope.tags.get(ContextTagKeys.AI_OPERATION_NAME): + data.name = envelope.tags[ContextTagKeys.AI_OPERATION_NAME][:1024] + if data.response_code: + data.response_code = data.response_code[:1024] + if data.source: + data.source = data.source[:1024] + if data.url: + data.url = data.url[:2048] + else: # INTERNAL, CLIENT, PRODUCER + envelope.name = _REMOTE_DEPENDENCY_ENVELOPE_NAME + # TODO: ai.operation.name for non-server spans + time = 0 + if span.end_time and span.start_time: + time = span.end_time - span.start_time + data = RemoteDependencyData( # type: ignore + name=span.name, + id="{:016x}".format(span.context.span_id), + result_code="0", + duration=_utils.ns_to_duration(time), + success=span.status.is_ok, # Success depends only on span status + properties={}, + ) + envelope.data = MonitorBase(base_data=data, base_type="RemoteDependencyData") + target = trace_utils._get_target_for_dependency_from_peer(span.attributes) + if span.kind is SpanKind.CLIENT: + if _AZURE_SDK_NAMESPACE_NAME in span.attributes: # Azure specific resources + # Currently only eventhub and servicebus are supported + # https://github.com/Azure/azure-sdk-for-python/issues/9256 + data.type = span.attributes[_AZURE_SDK_NAMESPACE_NAME] + data.target = trace_utils._get_azure_sdk_target_source(span.attributes) + elif HTTP_REQUEST_METHOD in span.attributes or SpanAttributes.HTTP_METHOD in span.attributes: # HTTP + data.type = "HTTP" + user_agent = trace_utils._get_user_agent(span.attributes) + if user_agent: + # TODO: Not exposed in Swagger, need to update def + envelope.tags["ai.user.userAgent"] = user_agent + url = trace_utils._get_url_for_http_dependency(span.attributes) + # data + if url: + data.data = url + target, path = trace_utils._get_target_and_path_for_http_dependency( + span.attributes, + url, + ) + # http specific logic for name + if path: + data.name = "{} {}".format( + span.attributes.get(HTTP_REQUEST_METHOD) or \ + span.attributes.get(SpanAttributes.HTTP_METHOD), + path, + ) + status_code = span.attributes.get(HTTP_RESPONSE_STATUS_CODE) or \ + span.attributes.get(SpanAttributes.HTTP_STATUS_CODE) + if status_code: + try: + status_code = int(status_code) # type: ignore + except ValueError: + status_code = 0 + else: + status_code = 0 + data.result_code = str(status_code) + elif SpanAttributes.DB_SYSTEM in span.attributes: # Database + db_system = span.attributes[SpanAttributes.DB_SYSTEM] + if db_system == DbSystemValues.MYSQL.value: + data.type = "mysql" + elif db_system == DbSystemValues.POSTGRESQL.value: + data.type = "postgresql" + elif db_system == DbSystemValues.MONGODB.value: + data.type = "mongodb" + elif db_system == DbSystemValues.REDIS.value: + data.type = "redis" + elif trace_utils._is_sql_db(str(db_system)): + data.type = "SQL" + else: + data.type = db_system + # data is the full statement or operation + if SpanAttributes.DB_STATEMENT in span.attributes: + data.data = span.attributes[SpanAttributes.DB_STATEMENT] + elif SpanAttributes.DB_OPERATION in span.attributes: + data.data = span.attributes[SpanAttributes.DB_OPERATION] + # db specific logic for target + target = trace_utils._get_target_for_db_dependency( + target, # type: ignore + db_system, # type: ignore + span.attributes, + ) + elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging + data.type = span.attributes[SpanAttributes.MESSAGING_SYSTEM] + target = trace_utils._get_target_for_messaging_dependency( + target, # type: ignore + span.attributes, + ) + elif SpanAttributes.RPC_SYSTEM in span.attributes: # Rpc + data.type = SpanAttributes.RPC_SYSTEM + target = trace_utils._get_target_for_rpc_dependency( + target, # type: ignore + span.attributes, + ) + elif gen_ai_attributes.GEN_AI_SYSTEM in span.attributes: # GenAI + data.type = span.attributes[gen_ai_attributes.GEN_AI_SYSTEM] + else: + data.type = "N/A" + elif span.kind is SpanKind.PRODUCER: # Messaging + # Currently only eventhub and servicebus are supported that produce PRODUCER spans + if _AZURE_SDK_NAMESPACE_NAME in span.attributes: + data.type = "Queue Message | {}".format(span.attributes[_AZURE_SDK_NAMESPACE_NAME]) + target = trace_utils._get_azure_sdk_target_source(span.attributes) + else: + data.type = "Queue Message" + msg_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM) + if msg_system: + data.type += " | {}".format(msg_system) + target = trace_utils._get_target_for_messaging_dependency( + target, # type: ignore + span.attributes, + ) + else: # SpanKind.INTERNAL + data.type = "InProc" + if _AZURE_SDK_NAMESPACE_NAME in span.attributes: + data.type += " | {}".format(span.attributes[_AZURE_SDK_NAMESPACE_NAME]) + # Apply truncation + # See https://github.com/MohanGsk/ApplicationInsights-Home/tree/master/EndpointSpecs/Schemas/Bond + if data.name: + data.name = str(data.name)[:1024] + if data.result_code: + data.result_code = str(data.result_code)[:1024] + if data.data: + data.data = str(data.data)[:8192] + if data.type: + data.type = str(data.type)[:1024] + if target: + data.target = str(target)[:1024] + + # sampleRate + if _SAMPLE_RATE_KEY in span.attributes: + envelope.sample_rate = span.attributes[_SAMPLE_RATE_KEY] + + data.properties = _utils._filter_custom_properties( + span.attributes, lambda key, val: not _is_standard_attribute(key) + ) + + # Standard metrics special properties + # Only add the property if span was generated from instrumentation that supports metrics collection + if ( + span.instrumentation_scope is not None + and span.instrumentation_scope.name in _INSTRUMENTATION_SUPPORTING_METRICS_LIST + ): + data.properties["_MS.ProcessedByMetricExtractors"] = "True" + + if span.links: + # Max length for value is 8192 + # Since links are a fixed length (80) in json, max number of links would be 102 + links: List[Dict[str, str]] = [] + for link in span.links: + if len(links) > 102: + break + operation_id = "{:032x}".format(link.context.trace_id) + span_id = "{:016x}".format(link.context.span_id) + links.append({"operation_Id": operation_id, "id": span_id}) + data.properties["_MS.links"] = json.dumps(links) + return envelope + + +# pylint: disable=protected-access +def _convert_span_events_to_envelopes(span: ReadableSpan) -> Sequence[TelemetryItem]: + envelopes = [] + for event in span.events: + envelope = _utils._create_telemetry_item(event.timestamp) + envelope.tags.update(_utils._populate_part_a_fields(span.resource)) + envelope.tags[ContextTagKeys.AI_OPERATION_ID] = "{:032x}".format(span.context.trace_id) + if span.context and span.context.span_id: + envelope.tags[ContextTagKeys.AI_OPERATION_PARENT_ID] = "{:016x}".format(span.context.span_id) + + # sampleRate + if span.attributes and _SAMPLE_RATE_KEY in span.attributes: + envelope.sample_rate = span.attributes[_SAMPLE_RATE_KEY] + + properties = _utils._filter_custom_properties( + event.attributes, lambda key, val: not _is_standard_attribute(key) + ) + if event.name == "exception": + envelope.name = _EXCEPTION_ENVELOPE_NAME + exc_type = exc_message = stack_trace = None + if event.attributes: + exc_type = event.attributes.get(SpanAttributes.EXCEPTION_TYPE) + exc_message = event.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) + stack_trace = event.attributes.get(SpanAttributes.EXCEPTION_STACKTRACE) + if not exc_type: + exc_type = "Exception" + if not exc_message: + exc_message = "Exception" + has_full_stack = stack_trace is not None + exc_details = TelemetryExceptionDetails( + type_name=str(exc_type)[:1024], + message=str(exc_message)[:32768], + has_full_stack=has_full_stack, + stack=str(stack_trace)[:32768], + ) + data = TelemetryExceptionData( + properties=properties, + exceptions=[exc_details], + ) + envelope.data = MonitorBase(base_data=data, base_type="ExceptionData") + else: + envelope.name = _MESSAGE_ENVELOPE_NAME + data = MessageData( # type: ignore + message=str(event.name)[:32768], + properties=properties, + ) + envelope.data = MonitorBase(base_data=data, base_type="MessageData") + + envelopes.append(envelope) + + return envelopes + + +def _check_instrumentation_span(span: ReadableSpan) -> None: + # Special use-case for spans generated from azure-sdk services + # Identified by having az.namespace as a span attribute + if span.attributes and _AZURE_SDK_NAMESPACE_NAME in span.attributes: + _utils.add_instrumentation(_AZURE_SDK_OPENTELEMETRY_NAME) + return + if span.instrumentation_scope is None: + return + # All instrumentation scope names from OpenTelemetry instrumentations have + # `opentelemetry.instrumentation.` as a prefix + if span.instrumentation_scope.name.startswith("opentelemetry.instrumentation."): + # The string after the prefix is the name of the instrumentation + name = span.instrumentation_scope.name.split("opentelemetry.instrumentation.", 1)[1] + # Update the bit map to indicate instrumentation is being used + _utils.add_instrumentation(name) + + +def _is_standard_attribute(key: str) -> bool: + for prefix in _STANDARD_OPENTELEMETRY_ATTRIBUTE_PREFIXES: + if key.startswith(prefix): + return True + return key in _STANDARD_AZURE_MONITOR_ATTRIBUTES or \ + key in _STANDARD_OPENTELEMETRY_HTTP_ATTRIBUTES + + +def _get_trace_export_result(result: ExportResult) -> SpanExportResult: + if result == ExportResult.SUCCESS: + return SpanExportResult.SUCCESS + return SpanExportResult.FAILURE diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py new file mode 100644 index 00000000..0b41e28e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py @@ -0,0 +1,98 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from typing import Optional, Sequence + +# pylint:disable=no-name-in-module +from fixedint import Int32 + +from opentelemetry.context import Context +from opentelemetry.trace import Link, SpanKind, format_trace_id +from opentelemetry.sdk.trace.sampling import ( + Decision, + Sampler, + SamplingResult, + _get_parent_trace_state, +) +from opentelemetry.trace.span import TraceState +from opentelemetry.util.types import Attributes + +from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY + + +_HASH = 5381 +_INTEGER_MAX: int = Int32.maxval +_INTEGER_MIN: int = Int32.minval + + +# Sampler is responsible for the following: +# Implements same trace id hashing algorithm so that traces are sampled the same across multiple nodes (via AI SDKS) +# Adds item count to span attribute if span is sampled (needed for ingestion service) +# Inherits from the Sampler interface as defined by OpenTelemetry +# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#sampler +class ApplicationInsightsSampler(Sampler): + """Sampler that implements the same probability sampling algorithm as the ApplicationInsights SDKs.""" + + # sampling_ratio must take a value in the range [0,1] + def __init__(self, sampling_ratio: float = 1.0): + if not 0.0 <= sampling_ratio <= 1.0: + raise ValueError("sampling_ratio must be in the range [0,1]") + self._ratio = sampling_ratio + self._sample_rate = sampling_ratio * 100 + + # pylint:disable=C0301 + # See https://github.com/microsoft/Telemetry-Collection-Spec/blob/main/OpenTelemetry/trace/ApplicationInsightsSampler.md + def should_sample( + self, + parent_context: Optional[Context], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence["Link"]] = None, + trace_state: Optional["TraceState"] = None, + ) -> "SamplingResult": + if self._sample_rate == 0: + decision = Decision.DROP + elif self._sample_rate == 100.0: + decision = Decision.RECORD_AND_SAMPLE + else: + # Determine if should sample from ratio and traceId + sample_score = self._get_DJB2_sample_score(format_trace_id(trace_id).lower()) + if sample_score < self._ratio: + decision = Decision.RECORD_AND_SAMPLE + else: + decision = Decision.DROP + # Add sample rate as span attribute + if attributes is None: + attributes = {} + attributes[_SAMPLE_RATE_KEY] = self._sample_rate # type: ignore + return SamplingResult( + decision, + attributes, + _get_parent_trace_state(parent_context), # type: ignore + ) + + def _get_DJB2_sample_score(self, trace_id_hex: str) -> float: + # This algorithm uses 32bit integers + hash_value = Int32(_HASH) + for char in trace_id_hex: + hash_value = ((hash_value << 5) + hash_value) + ord(char) + + if hash_value == _INTEGER_MIN: + hash_value = int(_INTEGER_MAX) + else: + hash_value = abs(hash_value) + + # divide by _INTEGER_MAX for value between 0 and 1 for sampling score + return float(hash_value) / _INTEGER_MAX + + def get_description(self) -> str: + return "ApplicationInsightsSampler{}".format(self._ratio) + + +def azure_monitor_opentelemetry_sampler_factory(sampler_argument): # pylint: disable=name-too-long + try: + rate = float(sampler_argument) + return ApplicationInsightsSampler(rate) + except (ValueError, TypeError): + return ApplicationInsightsSampler() diff --git a/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_utils.py b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_utils.py new file mode 100644 index 00000000..de012cfd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/exporter/export/trace/_utils.py @@ -0,0 +1,321 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import no_type_check, Optional, Tuple +from urllib.parse import urlparse + +from opentelemetry.semconv.attributes import ( + client_attributes, + server_attributes, + url_attributes, + user_agent_attributes, +) +from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes +from opentelemetry.util.types import Attributes + + +# pylint:disable=too-many-return-statements +def _get_default_port_db(db_system: str) -> int: + if db_system == DbSystemValues.POSTGRESQL.value: + return 5432 + if db_system == DbSystemValues.CASSANDRA.value: + return 9042 + if db_system in (DbSystemValues.MARIADB.value, DbSystemValues.MYSQL.value): + return 3306 + if db_system == DbSystemValues.MSSQL.value: + return 1433 + # TODO: Add in memcached + if db_system == "memcached": + return 11211 + if db_system == DbSystemValues.DB2.value: + return 50000 + if db_system == DbSystemValues.ORACLE.value: + return 1521 + if db_system == DbSystemValues.H2.value: + return 8082 + if db_system == DbSystemValues.DERBY.value: + return 1527 + if db_system == DbSystemValues.REDIS.value: + return 6379 + return 0 + + +def _get_default_port_http(attributes: Attributes) -> int: + scheme = _get_http_scheme(attributes) + if scheme == "http": + return 80 + if scheme == "https": + return 443 + return 0 + + +def _is_sql_db(db_system: str) -> bool: + return db_system in ( + DbSystemValues.DB2.value, + DbSystemValues.DERBY.value, + DbSystemValues.MARIADB.value, + DbSystemValues.MSSQL.value, + DbSystemValues.ORACLE.value, + DbSystemValues.SQLITE.value, + DbSystemValues.OTHER_SQL.value, + # spell-checker:ignore HSQLDB + DbSystemValues.HSQLDB.value, + DbSystemValues.H2.value, + ) + + +def _get_azure_sdk_target_source(attributes: Attributes) -> Optional[str]: + # Currently logic only works for ServiceBus and EventHub + if attributes: + # New semconv attributes: https://github.com/Azure/azure-sdk-for-python/pull/29203 + # TODO: Keep track of when azure-sdk supports stable semconv for these fields + peer_address = attributes.get("net.peer.name") or attributes.get("peer.address") + destination = attributes.get("messaging.destination.name") or attributes.get("message_bus.destination") + if peer_address and destination: + return str(peer_address) + "/" + str(destination) + return None + + +def _get_http_scheme(attributes: Attributes) -> Optional[str]: + if attributes: + scheme = attributes.get(url_attributes.URL_SCHEME) or \ + attributes.get(SpanAttributes.HTTP_SCHEME) + if scheme: + return str(scheme) + return None + + +# Dependency + + +@no_type_check +def _get_url_for_http_dependency(attributes: Attributes) -> Optional[str]: + url = "" + if attributes: + # Stable sem conv only supports populating url from `url.full` + if url_attributes.URL_FULL in attributes: + return attributes[url_attributes.URL_FULL] + if SpanAttributes.HTTP_URL in attributes: + return attributes[SpanAttributes.HTTP_URL] + # Scheme + scheme = _get_http_scheme(attributes) + if scheme and SpanAttributes.HTTP_TARGET in attributes: + http_target = attributes[SpanAttributes.HTTP_TARGET] + if SpanAttributes.HTTP_HOST in attributes: + url = "{}://{}{}".format( + str(scheme), + attributes[SpanAttributes.HTTP_HOST], + http_target, + ) + elif SpanAttributes.NET_PEER_PORT in attributes: + peer_port = attributes[SpanAttributes.NET_PEER_PORT] + if SpanAttributes.NET_PEER_NAME in attributes: + peer_name = attributes[SpanAttributes.NET_PEER_NAME] + url = "{}://{}:{}{}".format( + scheme, + peer_name, + peer_port, + http_target, + ) + elif SpanAttributes.NET_PEER_IP in attributes: + peer_ip = attributes[SpanAttributes.NET_PEER_IP] + url = "{}://{}:{}{}".format( + scheme, + peer_ip, + peer_port, + http_target, + ) + return url + + +@no_type_check +def _get_target_for_dependency_from_peer(attributes: Attributes) -> Optional[str]: + target = "" + if attributes: + if SpanAttributes.PEER_SERVICE in attributes: + target = attributes[SpanAttributes.PEER_SERVICE] + else: + if SpanAttributes.NET_PEER_NAME in attributes: + target = attributes[SpanAttributes.NET_PEER_NAME] + elif SpanAttributes.NET_PEER_IP in attributes: + target = attributes[SpanAttributes.NET_PEER_IP] + if SpanAttributes.NET_PEER_PORT in attributes: + port = attributes[SpanAttributes.NET_PEER_PORT] + # TODO: check default port for rpc + # This logic assumes default ports never conflict across dependency types + if port != _get_default_port_http(attributes) and \ + port != _get_default_port_db(str(attributes.get(SpanAttributes.DB_SYSTEM))): + target = "{}:{}".format(target, port) + return target + + +@no_type_check +def _get_target_and_path_for_http_dependency( + attributes: Attributes, + url: Optional[str] = "", # Usually populated by _get_url_for_http_dependency() +) -> Tuple[Optional[str], str]: + parsed_url = None + target = "" + path = "/" + default_port = _get_default_port_http(attributes) + # Find path from url + if not url: + url = _get_url_for_http_dependency(attributes) + try: + parsed_url = urlparse(url) + if parsed_url.path: + path = parsed_url.path + except Exception: # pylint: disable=broad-except + pass + # Derive target + if attributes: + # Target from server.* + if server_attributes.SERVER_ADDRESS in attributes: + target = attributes[server_attributes.SERVER_ADDRESS] + server_port = attributes.get(server_attributes.SERVER_PORT) + # if not default port, include port in target + if server_port != default_port: + target = "{}:{}".format(target, server_port) + # Target from peer.service + elif SpanAttributes.PEER_SERVICE in attributes: + target = attributes[SpanAttributes.PEER_SERVICE] + # Target from http.host + elif SpanAttributes.HTTP_HOST in attributes: + host = attributes[SpanAttributes.HTTP_HOST] + try: + # urlparse insists on absolute URLs starting with "//" + # This logic assumes host does not include a "//" + host_name = urlparse("//" + str(host)) + # Ignore port from target if default port + if host_name.port == default_port: + target = host_name.hostname + else: + # Else include the whole host as the target + target = str(host) + except Exception: # pylint: disable=broad-except + pass + elif parsed_url: + # Target from httpUrl + if parsed_url.port and parsed_url.port == default_port: + if parsed_url.hostname: + target = parsed_url.hostname + elif parsed_url.netloc: + target = parsed_url.netloc + if not target: + # Get target from peer.* attributes that are NOT peer.service + target = _get_target_for_dependency_from_peer(attributes) + return (target, path) + + +@no_type_check +def _get_target_for_db_dependency( + target: Optional[str], + db_system: Optional[str], + attributes: Attributes, +) -> Optional[str]: + if attributes: + db_name = attributes.get(SpanAttributes.DB_NAME) + if db_name: + if not target: + target = str(db_name) + else: + target = "{}|{}".format(target, db_name) + elif not target: + target = db_system + return target + + +@no_type_check +def _get_target_for_messaging_dependency(target: Optional[str], attributes: Attributes) -> Optional[str]: + if attributes: + if not target: + if SpanAttributes.MESSAGING_DESTINATION in attributes: + target = str(attributes[SpanAttributes.MESSAGING_DESTINATION]) + elif SpanAttributes.MESSAGING_SYSTEM in attributes: + target = str(attributes[SpanAttributes.MESSAGING_SYSTEM]) + return target + + +@no_type_check +def _get_target_for_rpc_dependency(target: Optional[str], attributes: Attributes) -> Optional[str]: + if attributes: + if not target: + if SpanAttributes.RPC_SYSTEM in attributes: + target = str(attributes[SpanAttributes.RPC_SYSTEM]) + return target + + +# Request + +@no_type_check +def _get_location_ip(attributes: Attributes) -> Optional[str]: + return attributes.get(client_attributes.CLIENT_ADDRESS) or \ + attributes.get(SpanAttributes.HTTP_CLIENT_IP) or \ + attributes.get(SpanAttributes.NET_PEER_IP) # We assume non-http spans don't have http related attributes + + +@no_type_check +def _get_user_agent(attributes: Attributes) -> Optional[str]: + return attributes.get(user_agent_attributes.USER_AGENT_ORIGINAL) or \ + attributes.get(SpanAttributes.HTTP_USER_AGENT) + + +@no_type_check +def _get_url_for_http_request(attributes: Attributes) -> Optional[str]: + url = "" + if attributes: + # Url + if url_attributes.URL_FULL in attributes: + return attributes[url_attributes.URL_FULL] + if SpanAttributes.HTTP_URL in attributes: + return attributes[SpanAttributes.HTTP_URL] + # Scheme + scheme = _get_http_scheme(attributes) + # Target + http_target = "" + if url_attributes.URL_PATH in attributes: + http_target = attributes.get(url_attributes.URL_PATH, "") + if http_target and url_attributes.URL_QUERY in attributes: + http_target = "{}?{}".format( + http_target, + attributes.get(url_attributes.URL_QUERY, "") + ) + elif SpanAttributes.HTTP_TARGET in attributes: + http_target = attributes.get(SpanAttributes.HTTP_TARGET) + if scheme and http_target: + # Host + http_host = "" + if server_attributes.SERVER_ADDRESS in attributes: + http_host = attributes.get(server_attributes.SERVER_ADDRESS, "") + if http_host and server_attributes.SERVER_PORT in attributes: + http_host = "{}:{}".format( + http_host, + attributes.get(server_attributes.SERVER_PORT, "") + ) + elif SpanAttributes.HTTP_HOST in attributes: + http_host = attributes.get(SpanAttributes.HTTP_HOST, "") + if http_host: + url = "{}://{}{}".format( + scheme, + http_host, + http_target, + ) + elif SpanAttributes.HTTP_SERVER_NAME in attributes: + server_name = attributes[SpanAttributes.HTTP_SERVER_NAME] + host_port = attributes.get(SpanAttributes.NET_HOST_PORT, "") + url = "{}://{}:{}{}".format( + scheme, + server_name, + host_port, + http_target, + ) + elif SpanAttributes.NET_HOST_NAME in attributes: + host_name = attributes[SpanAttributes.NET_HOST_NAME] + host_port = attributes.get(SpanAttributes.NET_HOST_PORT, "") + url = "{}://{}:{}{}".format( + scheme, + host_name, + host_port, + http_target, + ) + return url |