aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/monitor/opentelemetry/_configure.py
blob: c9910392fc1917342b0d82dc29362f400e4bf391 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License in the project root for
# license information.
# --------------------------------------------------------------------------
from functools import cached_property
from logging import getLogger
from typing import Dict, List, cast

from opentelemetry._events import _set_event_logger_provider
from opentelemetry._logs import set_logger_provider
from opentelemetry.instrumentation.dependencies import (
    get_dist_dependency_conflicts,
)
from opentelemetry.instrumentation.instrumentor import (  # type: ignore
    BaseInstrumentor,
)
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import set_tracer_provider
from opentelemetry.util._importlib_metadata import (
    EntryPoint,
    distributions,
    entry_points,
)

from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
from azure.monitor.opentelemetry._constants import (
    _ALL_SUPPORTED_INSTRUMENTED_LIBRARIES,
    _AZURE_SDK_INSTRUMENTATION_NAME,
    DISABLE_LOGGING_ARG,
    DISABLE_METRICS_ARG,
    DISABLE_TRACING_ARG,
    ENABLE_LIVE_METRICS_ARG,
    LOGGER_NAME_ARG,
    RESOURCE_ARG,
    SAMPLING_RATIO_ARG,
    SPAN_PROCESSORS_ARG,
    VIEWS_ARG,
)
from azure.monitor.opentelemetry._types import ConfigurationValue
from azure.monitor.opentelemetry.exporter._quickpulse import (  # pylint: disable=import-error,no-name-in-module
    enable_live_metrics,
)
from azure.monitor.opentelemetry.exporter._quickpulse._processor import (  # pylint: disable=import-error,no-name-in-module
    _QuickpulseLogRecordProcessor,
    _QuickpulseSpanProcessor,
)
from azure.monitor.opentelemetry.exporter import (  # pylint: disable=import-error,no-name-in-module
    ApplicationInsightsSampler,
    AzureMonitorLogExporter,
    AzureMonitorMetricExporter,
    AzureMonitorTraceExporter,
)
from azure.monitor.opentelemetry.exporter._utils import (  # pylint: disable=import-error,no-name-in-module
    _is_attach_enabled,
    _is_on_functions,
)
from azure.monitor.opentelemetry._diagnostics.diagnostic_logging import (
    _DISTRO_DETECTS_ATTACH,
    AzureDiagnosticLogging,
)
from azure.monitor.opentelemetry._utils.configurations import (
    _get_configurations,
    _is_instrumentation_enabled,
)

_logger = getLogger(__name__)


def configure_azure_monitor(**kwargs) -> None:  # pylint: disable=C4758
    """This function works as a configuration layer that allows the
    end user to configure OpenTelemetry and Azure monitor components. The
    configuration can be done via arguments passed to this function.

    :keyword str connection_string: Connection string for your Application Insights resource.
    :keyword credential: Token credential, such as `ManagedIdentityCredential` or `ClientSecretCredential`,
     used for Azure Active Directory (AAD) authentication. Defaults to `None`.
    :paramtype credential: ~azure.core.credentials.TokenCredential or None
    :keyword bool disable_offline_storage: Boolean value to determine whether to disable storing failed
     telemetry records for retry. Defaults to `False`.
    :keyword str logger_name: The name of the Python logger that telemetry will be collected.
    :keyword dict instrumentation_options: A nested dictionary that determines which instrumentations
     to enable or disable.  Instrumentations are referred to by their Library Names. For example,
     `{"azure_sdk": {"enabled": False}, "flask": {"enabled": False}, "django": {"enabled": True}}`
     will disable Azure Core Tracing and the Flask instrumentation but leave Django and the other default
     instrumentations enabled.
    :keyword ~opentelemetry.sdk.resources.Resource resource: OpenTelemetry Resource object. Passed in Resource
     Attributes take priority over default attributes and those from Resource Detectors.
    :keyword list[~opentelemetry.sdk.trace.SpanProcessor] span_processors: List of `SpanProcessor` objects
     to process every span prior to exporting. Will be run sequentially.
    :keyword bool enable_live_metrics: Boolean value to determine whether to enable live metrics feature.
     Defaults to `False`.
    :keyword str storage_directory: Storage directory in which to store retry files. Defaults to
     `<tempfile.gettempdir()>/Microsoft/AzureMonitor/opentelemetry-python-<your-instrumentation-key>`.
    :keyword list[~opentelemetry.sdk.metrics.view.View] views: List of `View` objects to configure and filter
     metric output.
    :rtype: None
    """

    _send_attach_warning()

    configurations = _get_configurations(**kwargs)

    disable_tracing = configurations[DISABLE_TRACING_ARG]
    disable_logging = configurations[DISABLE_LOGGING_ARG]
    disable_metrics = configurations[DISABLE_METRICS_ARG]
    enable_live_metrics_config = configurations[ENABLE_LIVE_METRICS_ARG]

    # Setup live metrics
    if enable_live_metrics_config:
        _setup_live_metrics(configurations)

    # Setup tracing pipeline
    if not disable_tracing:
        _setup_tracing(configurations)

    # Setup logging pipeline
    if not disable_logging:
        _setup_logging(configurations)

    # Setup metrics pipeline
    if not disable_metrics:
        _setup_metrics(configurations)

    # Setup instrumentations
    # Instrumentations need to be setup last so to use the global providers
    # instanstiated in the other setup steps
    _setup_instrumentations(configurations)


def _setup_tracing(configurations: Dict[str, ConfigurationValue]):
    resource: Resource = configurations[RESOURCE_ARG]  # type: ignore
    sampling_ratio = configurations[SAMPLING_RATIO_ARG]
    tracer_provider = TracerProvider(
        sampler=ApplicationInsightsSampler(sampling_ratio=cast(float, sampling_ratio)), resource=resource
    )
    for span_processor in configurations[SPAN_PROCESSORS_ARG]:  # type: ignore
        tracer_provider.add_span_processor(span_processor)  # type: ignore
    if configurations.get(ENABLE_LIVE_METRICS_ARG):
        qsp = _QuickpulseSpanProcessor()
        tracer_provider.add_span_processor(qsp)
    trace_exporter = AzureMonitorTraceExporter(**configurations)
    bsp = BatchSpanProcessor(
        trace_exporter,
    )
    tracer_provider.add_span_processor(bsp)
    set_tracer_provider(tracer_provider)
    if _is_instrumentation_enabled(configurations, _AZURE_SDK_INSTRUMENTATION_NAME):
        settings.tracing_implementation = OpenTelemetrySpan


def _setup_logging(configurations: Dict[str, ConfigurationValue]):
    resource: Resource = configurations[RESOURCE_ARG]  # type: ignore
    logger_provider = LoggerProvider(resource=resource)
    if configurations.get(ENABLE_LIVE_METRICS_ARG):
        qlp = _QuickpulseLogRecordProcessor()
        logger_provider.add_log_record_processor(qlp)
    log_exporter = AzureMonitorLogExporter(**configurations)
    log_record_processor = BatchLogRecordProcessor(
        log_exporter,
    )
    logger_provider.add_log_record_processor(log_record_processor)
    set_logger_provider(logger_provider)
    logger_name: str = configurations[LOGGER_NAME_ARG]  # type: ignore
    logger = getLogger(logger_name)
    # Only add OpenTelemetry LoggingHandler if logger does not already have the handler
    # This is to prevent most duplicate logging telemetry
    if not any(isinstance(handler, LoggingHandler) for handler in logger.handlers):
        handler = LoggingHandler(logger_provider=logger_provider)
        logger.addHandler(handler)

    # Setup EventLoggerProvider
    event_provider = EventLoggerProvider(logger_provider)
    _set_event_logger_provider(event_provider, False)


def _setup_metrics(configurations: Dict[str, ConfigurationValue]):
    resource: Resource = configurations[RESOURCE_ARG]  # type: ignore
    views: List[View] = configurations[VIEWS_ARG]  # type: ignore
    metric_exporter = AzureMonitorMetricExporter(**configurations)
    reader = PeriodicExportingMetricReader(metric_exporter)
    meter_provider = MeterProvider(
        metric_readers=[reader],
        resource=resource,
        views=views,
    )
    set_meter_provider(meter_provider)


def _setup_live_metrics(configurations):
    enable_live_metrics(**configurations)


class _EntryPointDistFinder:
    @cached_property
    def _mapping(self):
        return {self._key_for(ep): dist for dist in distributions() for ep in dist.entry_points}

    def dist_for(self, entry_point: EntryPoint):
        dist = getattr(entry_point, "dist", None)
        if dist:
            return dist

        return self._mapping.get(self._key_for(entry_point))

    @staticmethod
    def _key_for(entry_point: EntryPoint):
        return f"{entry_point.group}:{entry_point.name}:{entry_point.value}"


def _setup_instrumentations(configurations: Dict[str, ConfigurationValue]):
    entry_point_finder = _EntryPointDistFinder()
    # use pkg_resources for now until https://github.com/open-telemetry/opentelemetry-python/pull/3168 is merged
    for entry_point in entry_points(group="opentelemetry_instrumentor"):
        lib_name = entry_point.name
        if lib_name not in _ALL_SUPPORTED_INSTRUMENTED_LIBRARIES:
            continue
        if not _is_instrumentation_enabled(configurations, lib_name):
            _logger.debug("Instrumentation skipped for library %s", entry_point.name)
            continue
        try:
            # Check if dependent libraries/version are installed
            entry_point_dist = entry_point_finder.dist_for(entry_point)  # type: ignore
            conflict = get_dist_dependency_conflicts(entry_point_dist)  # type: ignore
            if conflict:
                _logger.debug(
                    "Skipping instrumentation %s: %s",
                    entry_point.name,
                    conflict,
                )
                continue
            # Load the instrumentor via entrypoint
            instrumentor: BaseInstrumentor = entry_point.load()
            # tell instrumentation to not run dep checks again as we already did it above
            instrumentor().instrument(skip_dep_check=True)
        except Exception as ex:  # pylint: disable=broad-except
            _logger.warning(
                "Exception occurred when instrumenting: %s.",
                lib_name,
                exc_info=ex,
            )
    _setup_additional_azure_sdk_instrumentations(configurations)


def _send_attach_warning():
    if _is_attach_enabled() and not _is_on_functions():
        AzureDiagnosticLogging.warning(
            "Distro detected that automatic attach may have occurred. Check your data to ensure "
            "that telemetry is not being duplicated. This may impact your cost.",
            _DISTRO_DETECTS_ATTACH,
        )


def _setup_additional_azure_sdk_instrumentations(configurations: Dict[str, ConfigurationValue]):
    if _AZURE_SDK_INSTRUMENTATION_NAME not in _ALL_SUPPORTED_INSTRUMENTED_LIBRARIES:
        return

    if not _is_instrumentation_enabled(configurations, _AZURE_SDK_INSTRUMENTATION_NAME):
        _logger.debug("Instrumentation skipped for library azure_sdk")
        return

    try:
        from azure.ai.inference.tracing import AIInferenceInstrumentor  # pylint: disable=import-error,no-name-in-module
    except Exception as ex:  # pylint: disable=broad-except
        _logger.debug(
            "Failed to import AIInferenceInstrumentor from azure-ai-inference",
            exc_info=ex,
        )
        return

    try:
        AIInferenceInstrumentor().instrument()
    except Exception as ex:  # pylint: disable=broad-except
        _logger.warning(
            "Exception occurred when instrumenting: %s.",
            "azure-ai-inference",
            exc_info=ex,
        )