about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py379
1 files changed, 379 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py
new file mode 100644
index 00000000..169fa5ef
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py
@@ -0,0 +1,379 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+"""Defines functionality for collecting telemetry about code.
+
+An activity is a logical block of code that consumers want to monitor.
+To monitor an activity, either wrap the logical block of code with the
+``log_activity()`` method or use the ``@monitor_with_activity``
+decorator.
+"""
+import contextlib
+import functools
+import inspect
+import logging
+import os
+import uuid
+from datetime import datetime
+from typing import Dict, Tuple
+from uuid import uuid4
+
+from marshmallow import ValidationError
+
+from azure.ai.ml._utils._logger_utils import OpsLogger
+from azure.ai.ml._utils.utils import _is_user_error_from_exception_type, _is_user_error_from_status_code, _str_to_bool
+from azure.ai.ml.exceptions import ErrorCategory, MlException
+from azure.core.exceptions import HttpResponseError
+
+# Get environment variable IS_IN_CI_PIPELINE to decide whether it's in CI test
+IS_IN_CI_PIPELINE = _str_to_bool(os.environ.get("IS_IN_CI_PIPELINE", "False"))
+
+ACTIVITY_SPAN = "activity_span"
+
+
+class ActivityType(object):
+    """The type of activity (code) monitored.
+
+    The default type is "PublicAPI".
+    """
+
+    PUBLICAPI = "PublicApi"  # incoming public API call (default)
+    INTERNALCALL = "InternalCall"  # internal (function) call
+    CLIENTPROXY = "ClientProxy"  # an outgoing service API call
+
+
+class ActivityCompletionStatus(object):
+    """The activity (code) completion status, success, or failure."""
+
+    SUCCESS = "Success"
+    FAILURE = "Failure"
+
+
+class ActivityLoggerAdapter(logging.LoggerAdapter):
+    """An adapter for loggers to keep activity contextual information in
+    logging output.
+
+    :param logger: The activity logger adapter.
+    :type logger: logging.LoggerAdapter
+    :param activity_info: The info to write to the logger.
+    :type activity_info: str
+    """
+
+    def __init__(self, logger: logging.Logger, activity_info: Dict):
+        """Initialize a new instance of the class.
+
+        :param logger: The activity logger.
+        :type logger: logging.Logger
+        :param activity_info: The info to write to the logger.
+        :type activity_info: Dict
+        """
+        self._activity_info = activity_info
+        super(ActivityLoggerAdapter, self).__init__(logger, None)  # type: ignore[arg-type]
+
+    @property
+    def activity_info(self) -> Dict:
+        """Return current activity info.
+
+        :return: The info to write to the logger
+        :rtype: Dict
+        """
+        return self._activity_info
+
+    def process(self, msg: str, kwargs: Dict) -> Tuple[str, Dict]:  # type: ignore[override]
+        """Process the log message.
+
+        :param msg: The log message.
+        :type msg: str
+        :param kwargs: The arguments with properties.
+        :type kwargs: dict
+        :return: A tuple of the processed msg and kwargs
+        :rtype: Tuple[str, Dict]
+        """
+        if "extra" not in kwargs:
+            kwargs["extra"] = {}
+
+        kwargs["extra"].update(self._activity_info)
+
+        return msg, kwargs
+
+
+def error_preprocess(activityLogger, exception):
+    """Try to update activityLogger if exception is HttpResponseError or
+    builtin error (such as KeyError, TypeError)
+
+    For HttpResponseError, will log exception message and classify error category according to http response status
+    code if have, otherwise classify as Unknown. For builtin error,  will log exception message and classify error
+    category as Unknown.
+
+    :param activityLogger: The logger adapter.
+    :type activityLogger: ActivityLoggerAdapter
+    :param exception: The raised exception to be preprocessed.
+    :type exception: BaseException
+    :return: The provided exception
+    :rtype: Exception
+    """
+
+    if isinstance(exception, HttpResponseError):
+        activityLogger.activity_info["errorMessage"] = exception.message
+        if exception.response:
+            http_status_code = exception.response.status_code
+            error_category = (
+                ErrorCategory.USER_ERROR
+                if _is_user_error_from_status_code(http_status_code)
+                else ErrorCategory.SYSTEM_ERROR
+            )
+        else:
+            error_category = ErrorCategory.UNKNOWN
+        activityLogger.activity_info["errorCategory"] = error_category
+        if exception.inner_exception:
+            activityLogger.activity_info["innerException"] = type(exception.inner_exception).__name__
+    elif isinstance(exception, MlException):
+        # If exception is MlException, it will have error_category, message and target attributes and will log those
+        # information in log_activity, no need more actions here.
+        pass
+    elif isinstance(exception, ValidationError):
+        # Validation error should be user error
+        activityLogger.activity_info["errorMessage"] = str(exception)
+        activityLogger.activity_info["errorCategory"] = ErrorCategory.USER_ERROR
+    else:
+        activityLogger.activity_info["errorMessage"] = "Got error {0}: '{1}' while calling {2}".format(
+            exception.__class__.__name__,
+            exception,
+            activityLogger.activity_info["activity_name"],
+        )
+        if _is_user_error_from_exception_type(exception) or _is_user_error_from_exception_type(exception.__cause__):
+            activityLogger.activity_info["errorCategory"] = ErrorCategory.USER_ERROR
+        else:
+            # Todo: should check KeyError, TypeError, ValueError caused by user before request or raise in code directly
+            activityLogger.activity_info["errorCategory"] = ErrorCategory.UNKNOWN
+    return exception
+
+
+@contextlib.contextmanager
+def log_activity(
+    logger,
+    activity_name,
+    activity_type=ActivityType.INTERNALCALL,
+    custom_dimensions=None,
+):
+    """Log an activity.
+
+    An activity is a logical block of code that consumers want to monitor.
+    To monitor, wrap the logical block of code with the ``log_activity()`` method. As an alternative, you can
+    also use the ``@monitor_with_activity`` decorator.
+
+    :param logger: The logger adapter.
+    :type logger: logging.LoggerAdapter
+    :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block.
+    :type activity_name: str
+    :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call,
+        an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used.
+    :type activity_type: str
+    :param custom_dimensions: The custom properties of the activity.
+    :type custom_dimensions: dict
+    :return: An activity logger
+    :rtype: Iterable[ActivityLoggerAdapter]
+    """
+    activity_info = {
+        "activity_id": str(uuid.uuid4()),
+        "activity_name": activity_name,
+        "activity_type": activity_type,
+    }
+    custom_dimensions = custom_dimensions or {}
+    custom_dimensions.update({"client_request_id": str(uuid4())})
+    activity_info.update(custom_dimensions)
+
+    start_time = datetime.utcnow()
+    completion_status = ActivityCompletionStatus.SUCCESS
+
+    message = "ActivityStarted, {}".format(activity_name)
+    activityLogger = ActivityLoggerAdapter(logger, activity_info)  # type: ignore[arg-type]
+    activityLogger.info(message)
+    exception = None
+
+    try:
+        yield activityLogger
+    except BaseException as e:
+        exception = error_preprocess(activityLogger, e)
+        completion_status = ActivityCompletionStatus.FAILURE
+        # All the system and unknown errors except for NotImplementedError will be wrapped with a new exception.
+        if IS_IN_CI_PIPELINE and not isinstance(e, NotImplementedError):
+            if (
+                isinstance(exception, MlException)
+                and exception.error_category  # pylint: disable=no-member
+                in [ErrorCategory.SYSTEM_ERROR, ErrorCategory.UNKNOWN]
+            ) or (
+                "errorCategory" in activityLogger.activity_info
+                and activityLogger.activity_info["errorCategory"]  # type: ignore[index]
+                in [ErrorCategory.SYSTEM_ERROR, ErrorCategory.UNKNOWN]
+            ):
+                # pylint: disable=W0719
+                raise Exception("Got InternalSDKError", e) from e
+            raise
+        raise
+    finally:
+        try:
+            end_time = datetime.utcnow()
+            duration_ms = round((end_time - start_time).total_seconds() * 1000, 2)
+
+            activityLogger.activity_info["completionStatus"] = completion_status  # type: ignore[index]
+            activityLogger.activity_info["durationMs"] = duration_ms  # type: ignore[index]
+            message = "ActivityCompleted: Activity={}, HowEnded={}, Duration={} [ms]".format(
+                activity_name, completion_status, duration_ms
+            )
+            if exception:
+                activityLogger.activity_info["exception"] = type(exception).__name__  # type: ignore[index]
+                if isinstance(exception, MlException):
+                    activityLogger.activity_info[  # type: ignore[index]
+                        "errorMessage"
+                    ] = exception.no_personal_data_message  # pylint: disable=no-member
+                    # pylint: disable=no-member
+                    activityLogger.activity_info["errorTarget"] = exception.target  # type: ignore[index]
+                    activityLogger.activity_info[  # type: ignore[index]
+                        "errorCategory"
+                    ] = exception.error_category  # pylint: disable=no-member
+                    if exception.inner_exception:  # pylint: disable=no-member
+                        # pylint: disable=no-member
+                        activityLogger.activity_info["innerException"] = type(  # type: ignore[index]
+                            exception.inner_exception
+                        ).__name__
+                message += ", Exception={}".format(activityLogger.activity_info["exception"])
+                message += ", ErrorCategory={}".format(activityLogger.activity_info["errorCategory"])
+                message += ", ErrorMessage={}".format(activityLogger.activity_info["errorMessage"])
+
+                activityLogger.error(message)
+            else:
+                activityLogger.info(message)
+        except Exception:  # pylint: disable=W0718
+            pass
+
+
+# pylint: disable-next=docstring-missing-rtype
+def monitor_with_activity(
+    logger,
+    activity_name,
+    activity_type=ActivityType.INTERNALCALL,
+    custom_dimensions=None,
+):
+    """Add a wrapper for monitoring an activity (code).
+
+    An activity is a logical block of code that consumers want to monitor.
+    To monitor, use the ``@monitor_with_activity`` decorator. As an alternative, you can also wrap the
+    logical block of code with the ``log_activity()`` method.
+
+    :param logger: The operations logging class, containing loggers and tracer for the package and module
+    :type logger: ~azure.ai.ml._utils._logger_utils.OpsLogger
+    :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block.
+    :type activity_name: str
+    :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call,
+        an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used.
+    :type activity_type: str
+    :param custom_dimensions: The custom properties of the activity.
+    :type custom_dimensions: dict
+    :return:
+    """
+
+    def monitor(f):
+        @functools.wraps(f)
+        def wrapper(*args, **kwargs):
+            tracer = logger.package_tracer if isinstance(logger, OpsLogger) else None
+            if tracer:
+                with tracer.start_as_current_span(ACTIVITY_SPAN):
+                    with log_activity(
+                        logger.package_logger, activity_name or f.__name__, activity_type, custom_dimensions
+                    ):
+                        return f(*args, **kwargs)
+            elif hasattr(logger, "package_logger"):
+                with log_activity(logger.package_logger, activity_name or f.__name__, activity_type, custom_dimensions):
+                    return f(*args, **kwargs)
+            else:
+                with log_activity(logger, activity_name or f.__name__, activity_type, custom_dimensions):
+                    return f(*args, **kwargs)
+
+        return wrapper
+
+    return monitor
+
+
+# pylint: disable-next=docstring-missing-rtype
+def monitor_with_telemetry_mixin(
+    logger,
+    activity_name,
+    activity_type=ActivityType.INTERNALCALL,
+    custom_dimensions=None,
+    extra_keys=None,
+):
+    """Add a wrapper for monitoring an activity (code) with telemetry mixin.
+
+    An activity is a logical block of code that consumers want to monitor.
+    Object telemetry values will be added into custom dimensions if activity parameter or return object is
+    an instance of TelemetryMixin, which means log dimension will include: 1. telemetry collected from
+    parameter object. 2. custom dimensions passed in. 3.(optional) if no telemetry found in parameter,
+    will collect from return value.
+    To monitor, use the ``@monitor_with_telemetry_mixin`` decorator.
+
+    :param logger: The operations logging class, containing loggers and tracer for the package and module
+    :type logger: logging.LoggerAdapter
+    :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block.
+    :type activity_name: str
+    :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call,
+        an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used.
+    :type activity_type: str
+    :param custom_dimensions: The custom properties of the activity.
+    :type custom_dimensions: dict
+    :param extra_keys: Extra keys from parameter to be tracked. The key in extra_keys will always be added into
+        dimensions, the value of it will be str(obj) if it exists in parameters, or 'None' if not.
+    :type extra_keys: list[str]
+    :return:
+    """
+
+    logger = logger.package_logger if isinstance(logger, OpsLogger) else logger
+
+    def monitor(f):
+        def _collect_from_parameters(f, args, kwargs, extra_keys):
+            dimensions = {}
+            named_args = dict(zip(inspect.signature(f).parameters.keys(), args))
+            parameters = {**named_args, **kwargs}
+            from azure.ai.ml.entities._mixins import TelemetryMixin
+
+            # extract mixin dimensions from arguments
+            for key, obj in parameters.items():
+                try:
+                    if isinstance(obj, TelemetryMixin):
+                        dimensions.update(obj._get_telemetry_values())
+                    elif extra_keys and key in extra_keys:
+                        dimensions[key] = str(obj)
+                except Exception:  # pylint: disable=W0718
+                    pass
+            # add left keys with None
+            if extra_keys:
+                for key in extra_keys:
+                    if key not in parameters:
+                        dimensions[key] = "None"
+            return dimensions
+
+        def _collect_from_return_value(value):
+            from azure.ai.ml.entities._mixins import TelemetryMixin
+
+            try:
+                return value._get_telemetry_values() if isinstance(value, TelemetryMixin) else {}
+            except Exception:  # pylint: disable=W0718
+                return {}
+
+        @functools.wraps(f)
+        def wrapper(*args, **kwargs):
+            parameter_dimensions = _collect_from_parameters(f, args, kwargs, extra_keys)
+            dimensions = {**parameter_dimensions, **(custom_dimensions or {})}
+            with log_activity(logger, activity_name or f.__name__, activity_type, dimensions) as activityLogger:
+                return_value = f(*args, **kwargs)
+                if not parameter_dimensions:
+                    # collect from return if no dimensions from parameter
+                    activityLogger.activity_info.update(_collect_from_return_value(return_value))
+                return return_value
+
+        return wrapper
+
+    return monitor