diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py new file mode 100644 index 00000000..169fa5ef --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py @@ -0,0 +1,379 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +"""Defines functionality for collecting telemetry about code. + +An activity is a logical block of code that consumers want to monitor. +To monitor an activity, either wrap the logical block of code with the +``log_activity()`` method or use the ``@monitor_with_activity`` +decorator. +""" +import contextlib +import functools +import inspect +import logging +import os +import uuid +from datetime import datetime +from typing import Dict, Tuple +from uuid import uuid4 + +from marshmallow import ValidationError + +from azure.ai.ml._utils._logger_utils import OpsLogger +from azure.ai.ml._utils.utils import _is_user_error_from_exception_type, _is_user_error_from_status_code, _str_to_bool +from azure.ai.ml.exceptions import ErrorCategory, MlException +from azure.core.exceptions import HttpResponseError + +# Get environment variable IS_IN_CI_PIPELINE to decide whether it's in CI test +IS_IN_CI_PIPELINE = _str_to_bool(os.environ.get("IS_IN_CI_PIPELINE", "False")) + +ACTIVITY_SPAN = "activity_span" + + +class ActivityType(object): + """The type of activity (code) monitored. + + The default type is "PublicAPI". + """ + + PUBLICAPI = "PublicApi" # incoming public API call (default) + INTERNALCALL = "InternalCall" # internal (function) call + CLIENTPROXY = "ClientProxy" # an outgoing service API call + + +class ActivityCompletionStatus(object): + """The activity (code) completion status, success, or failure.""" + + SUCCESS = "Success" + FAILURE = "Failure" + + +class ActivityLoggerAdapter(logging.LoggerAdapter): + """An adapter for loggers to keep activity contextual information in + logging output. + + :param logger: The activity logger adapter. + :type logger: logging.LoggerAdapter + :param activity_info: The info to write to the logger. + :type activity_info: str + """ + + def __init__(self, logger: logging.Logger, activity_info: Dict): + """Initialize a new instance of the class. + + :param logger: The activity logger. + :type logger: logging.Logger + :param activity_info: The info to write to the logger. + :type activity_info: Dict + """ + self._activity_info = activity_info + super(ActivityLoggerAdapter, self).__init__(logger, None) # type: ignore[arg-type] + + @property + def activity_info(self) -> Dict: + """Return current activity info. + + :return: The info to write to the logger + :rtype: Dict + """ + return self._activity_info + + def process(self, msg: str, kwargs: Dict) -> Tuple[str, Dict]: # type: ignore[override] + """Process the log message. + + :param msg: The log message. + :type msg: str + :param kwargs: The arguments with properties. + :type kwargs: dict + :return: A tuple of the processed msg and kwargs + :rtype: Tuple[str, Dict] + """ + if "extra" not in kwargs: + kwargs["extra"] = {} + + kwargs["extra"].update(self._activity_info) + + return msg, kwargs + + +def error_preprocess(activityLogger, exception): + """Try to update activityLogger if exception is HttpResponseError or + builtin error (such as KeyError, TypeError) + + For HttpResponseError, will log exception message and classify error category according to http response status + code if have, otherwise classify as Unknown. For builtin error, will log exception message and classify error + category as Unknown. + + :param activityLogger: The logger adapter. + :type activityLogger: ActivityLoggerAdapter + :param exception: The raised exception to be preprocessed. + :type exception: BaseException + :return: The provided exception + :rtype: Exception + """ + + if isinstance(exception, HttpResponseError): + activityLogger.activity_info["errorMessage"] = exception.message + if exception.response: + http_status_code = exception.response.status_code + error_category = ( + ErrorCategory.USER_ERROR + if _is_user_error_from_status_code(http_status_code) + else ErrorCategory.SYSTEM_ERROR + ) + else: + error_category = ErrorCategory.UNKNOWN + activityLogger.activity_info["errorCategory"] = error_category + if exception.inner_exception: + activityLogger.activity_info["innerException"] = type(exception.inner_exception).__name__ + elif isinstance(exception, MlException): + # If exception is MlException, it will have error_category, message and target attributes and will log those + # information in log_activity, no need more actions here. + pass + elif isinstance(exception, ValidationError): + # Validation error should be user error + activityLogger.activity_info["errorMessage"] = str(exception) + activityLogger.activity_info["errorCategory"] = ErrorCategory.USER_ERROR + else: + activityLogger.activity_info["errorMessage"] = "Got error {0}: '{1}' while calling {2}".format( + exception.__class__.__name__, + exception, + activityLogger.activity_info["activity_name"], + ) + if _is_user_error_from_exception_type(exception) or _is_user_error_from_exception_type(exception.__cause__): + activityLogger.activity_info["errorCategory"] = ErrorCategory.USER_ERROR + else: + # Todo: should check KeyError, TypeError, ValueError caused by user before request or raise in code directly + activityLogger.activity_info["errorCategory"] = ErrorCategory.UNKNOWN + return exception + + +@contextlib.contextmanager +def log_activity( + logger, + activity_name, + activity_type=ActivityType.INTERNALCALL, + custom_dimensions=None, +): + """Log an activity. + + An activity is a logical block of code that consumers want to monitor. + To monitor, wrap the logical block of code with the ``log_activity()`` method. As an alternative, you can + also use the ``@monitor_with_activity`` decorator. + + :param logger: The logger adapter. + :type logger: logging.LoggerAdapter + :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block. + :type activity_name: str + :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call, + an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used. + :type activity_type: str + :param custom_dimensions: The custom properties of the activity. + :type custom_dimensions: dict + :return: An activity logger + :rtype: Iterable[ActivityLoggerAdapter] + """ + activity_info = { + "activity_id": str(uuid.uuid4()), + "activity_name": activity_name, + "activity_type": activity_type, + } + custom_dimensions = custom_dimensions or {} + custom_dimensions.update({"client_request_id": str(uuid4())}) + activity_info.update(custom_dimensions) + + start_time = datetime.utcnow() + completion_status = ActivityCompletionStatus.SUCCESS + + message = "ActivityStarted, {}".format(activity_name) + activityLogger = ActivityLoggerAdapter(logger, activity_info) # type: ignore[arg-type] + activityLogger.info(message) + exception = None + + try: + yield activityLogger + except BaseException as e: + exception = error_preprocess(activityLogger, e) + completion_status = ActivityCompletionStatus.FAILURE + # All the system and unknown errors except for NotImplementedError will be wrapped with a new exception. + if IS_IN_CI_PIPELINE and not isinstance(e, NotImplementedError): + if ( + isinstance(exception, MlException) + and exception.error_category # pylint: disable=no-member + in [ErrorCategory.SYSTEM_ERROR, ErrorCategory.UNKNOWN] + ) or ( + "errorCategory" in activityLogger.activity_info + and activityLogger.activity_info["errorCategory"] # type: ignore[index] + in [ErrorCategory.SYSTEM_ERROR, ErrorCategory.UNKNOWN] + ): + # pylint: disable=W0719 + raise Exception("Got InternalSDKError", e) from e + raise + raise + finally: + try: + end_time = datetime.utcnow() + duration_ms = round((end_time - start_time).total_seconds() * 1000, 2) + + activityLogger.activity_info["completionStatus"] = completion_status # type: ignore[index] + activityLogger.activity_info["durationMs"] = duration_ms # type: ignore[index] + message = "ActivityCompleted: Activity={}, HowEnded={}, Duration={} [ms]".format( + activity_name, completion_status, duration_ms + ) + if exception: + activityLogger.activity_info["exception"] = type(exception).__name__ # type: ignore[index] + if isinstance(exception, MlException): + activityLogger.activity_info[ # type: ignore[index] + "errorMessage" + ] = exception.no_personal_data_message # pylint: disable=no-member + # pylint: disable=no-member + activityLogger.activity_info["errorTarget"] = exception.target # type: ignore[index] + activityLogger.activity_info[ # type: ignore[index] + "errorCategory" + ] = exception.error_category # pylint: disable=no-member + if exception.inner_exception: # pylint: disable=no-member + # pylint: disable=no-member + activityLogger.activity_info["innerException"] = type( # type: ignore[index] + exception.inner_exception + ).__name__ + message += ", Exception={}".format(activityLogger.activity_info["exception"]) + message += ", ErrorCategory={}".format(activityLogger.activity_info["errorCategory"]) + message += ", ErrorMessage={}".format(activityLogger.activity_info["errorMessage"]) + + activityLogger.error(message) + else: + activityLogger.info(message) + except Exception: # pylint: disable=W0718 + pass + + +# pylint: disable-next=docstring-missing-rtype +def monitor_with_activity( + logger, + activity_name, + activity_type=ActivityType.INTERNALCALL, + custom_dimensions=None, +): + """Add a wrapper for monitoring an activity (code). + + An activity is a logical block of code that consumers want to monitor. + To monitor, use the ``@monitor_with_activity`` decorator. As an alternative, you can also wrap the + logical block of code with the ``log_activity()`` method. + + :param logger: The operations logging class, containing loggers and tracer for the package and module + :type logger: ~azure.ai.ml._utils._logger_utils.OpsLogger + :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block. + :type activity_name: str + :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call, + an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used. + :type activity_type: str + :param custom_dimensions: The custom properties of the activity. + :type custom_dimensions: dict + :return: + """ + + def monitor(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + tracer = logger.package_tracer if isinstance(logger, OpsLogger) else None + if tracer: + with tracer.start_as_current_span(ACTIVITY_SPAN): + with log_activity( + logger.package_logger, activity_name or f.__name__, activity_type, custom_dimensions + ): + return f(*args, **kwargs) + elif hasattr(logger, "package_logger"): + with log_activity(logger.package_logger, activity_name or f.__name__, activity_type, custom_dimensions): + return f(*args, **kwargs) + else: + with log_activity(logger, activity_name or f.__name__, activity_type, custom_dimensions): + return f(*args, **kwargs) + + return wrapper + + return monitor + + +# pylint: disable-next=docstring-missing-rtype +def monitor_with_telemetry_mixin( + logger, + activity_name, + activity_type=ActivityType.INTERNALCALL, + custom_dimensions=None, + extra_keys=None, +): + """Add a wrapper for monitoring an activity (code) with telemetry mixin. + + An activity is a logical block of code that consumers want to monitor. + Object telemetry values will be added into custom dimensions if activity parameter or return object is + an instance of TelemetryMixin, which means log dimension will include: 1. telemetry collected from + parameter object. 2. custom dimensions passed in. 3.(optional) if no telemetry found in parameter, + will collect from return value. + To monitor, use the ``@monitor_with_telemetry_mixin`` decorator. + + :param logger: The operations logging class, containing loggers and tracer for the package and module + :type logger: logging.LoggerAdapter + :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block. + :type activity_name: str + :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call, + an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used. + :type activity_type: str + :param custom_dimensions: The custom properties of the activity. + :type custom_dimensions: dict + :param extra_keys: Extra keys from parameter to be tracked. The key in extra_keys will always be added into + dimensions, the value of it will be str(obj) if it exists in parameters, or 'None' if not. + :type extra_keys: list[str] + :return: + """ + + logger = logger.package_logger if isinstance(logger, OpsLogger) else logger + + def monitor(f): + def _collect_from_parameters(f, args, kwargs, extra_keys): + dimensions = {} + named_args = dict(zip(inspect.signature(f).parameters.keys(), args)) + parameters = {**named_args, **kwargs} + from azure.ai.ml.entities._mixins import TelemetryMixin + + # extract mixin dimensions from arguments + for key, obj in parameters.items(): + try: + if isinstance(obj, TelemetryMixin): + dimensions.update(obj._get_telemetry_values()) + elif extra_keys and key in extra_keys: + dimensions[key] = str(obj) + except Exception: # pylint: disable=W0718 + pass + # add left keys with None + if extra_keys: + for key in extra_keys: + if key not in parameters: + dimensions[key] = "None" + return dimensions + + def _collect_from_return_value(value): + from azure.ai.ml.entities._mixins import TelemetryMixin + + try: + return value._get_telemetry_values() if isinstance(value, TelemetryMixin) else {} + except Exception: # pylint: disable=W0718 + return {} + + @functools.wraps(f) + def wrapper(*args, **kwargs): + parameter_dimensions = _collect_from_parameters(f, args, kwargs, extra_keys) + dimensions = {**parameter_dimensions, **(custom_dimensions or {})} + with log_activity(logger, activity_name or f.__name__, activity_type, dimensions) as activityLogger: + return_value = f(*args, **kwargs) + if not parameter_dimensions: + # collect from return if no dimensions from parameter + activityLogger.activity_info.update(_collect_from_return_value(return_value)) + return return_value + + return wrapper + + return monitor |