aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py379
1 files changed, 379 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py
new file mode 100644
index 00000000..169fa5ef
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_telemetry/activity.py
@@ -0,0 +1,379 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+"""Defines functionality for collecting telemetry about code.
+
+An activity is a logical block of code that consumers want to monitor.
+To monitor an activity, either wrap the logical block of code with the
+``log_activity()`` method or use the ``@monitor_with_activity``
+decorator.
+"""
+import contextlib
+import functools
+import inspect
+import logging
+import os
+import uuid
+from datetime import datetime
+from typing import Dict, Tuple
+from uuid import uuid4
+
+from marshmallow import ValidationError
+
+from azure.ai.ml._utils._logger_utils import OpsLogger
+from azure.ai.ml._utils.utils import _is_user_error_from_exception_type, _is_user_error_from_status_code, _str_to_bool
+from azure.ai.ml.exceptions import ErrorCategory, MlException
+from azure.core.exceptions import HttpResponseError
+
+# Get environment variable IS_IN_CI_PIPELINE to decide whether it's in CI test
+IS_IN_CI_PIPELINE = _str_to_bool(os.environ.get("IS_IN_CI_PIPELINE", "False"))
+
+ACTIVITY_SPAN = "activity_span"
+
+
+class ActivityType(object):
+ """The type of activity (code) monitored.
+
+ The default type is "PublicAPI".
+ """
+
+ PUBLICAPI = "PublicApi" # incoming public API call (default)
+ INTERNALCALL = "InternalCall" # internal (function) call
+ CLIENTPROXY = "ClientProxy" # an outgoing service API call
+
+
+class ActivityCompletionStatus(object):
+ """The activity (code) completion status, success, or failure."""
+
+ SUCCESS = "Success"
+ FAILURE = "Failure"
+
+
+class ActivityLoggerAdapter(logging.LoggerAdapter):
+ """An adapter for loggers to keep activity contextual information in
+ logging output.
+
+ :param logger: The activity logger adapter.
+ :type logger: logging.LoggerAdapter
+ :param activity_info: The info to write to the logger.
+ :type activity_info: str
+ """
+
+ def __init__(self, logger: logging.Logger, activity_info: Dict):
+ """Initialize a new instance of the class.
+
+ :param logger: The activity logger.
+ :type logger: logging.Logger
+ :param activity_info: The info to write to the logger.
+ :type activity_info: Dict
+ """
+ self._activity_info = activity_info
+ super(ActivityLoggerAdapter, self).__init__(logger, None) # type: ignore[arg-type]
+
+ @property
+ def activity_info(self) -> Dict:
+ """Return current activity info.
+
+ :return: The info to write to the logger
+ :rtype: Dict
+ """
+ return self._activity_info
+
+ def process(self, msg: str, kwargs: Dict) -> Tuple[str, Dict]: # type: ignore[override]
+ """Process the log message.
+
+ :param msg: The log message.
+ :type msg: str
+ :param kwargs: The arguments with properties.
+ :type kwargs: dict
+ :return: A tuple of the processed msg and kwargs
+ :rtype: Tuple[str, Dict]
+ """
+ if "extra" not in kwargs:
+ kwargs["extra"] = {}
+
+ kwargs["extra"].update(self._activity_info)
+
+ return msg, kwargs
+
+
+def error_preprocess(activityLogger, exception):
+ """Try to update activityLogger if exception is HttpResponseError or
+ builtin error (such as KeyError, TypeError)
+
+ For HttpResponseError, will log exception message and classify error category according to http response status
+ code if have, otherwise classify as Unknown. For builtin error, will log exception message and classify error
+ category as Unknown.
+
+ :param activityLogger: The logger adapter.
+ :type activityLogger: ActivityLoggerAdapter
+ :param exception: The raised exception to be preprocessed.
+ :type exception: BaseException
+ :return: The provided exception
+ :rtype: Exception
+ """
+
+ if isinstance(exception, HttpResponseError):
+ activityLogger.activity_info["errorMessage"] = exception.message
+ if exception.response:
+ http_status_code = exception.response.status_code
+ error_category = (
+ ErrorCategory.USER_ERROR
+ if _is_user_error_from_status_code(http_status_code)
+ else ErrorCategory.SYSTEM_ERROR
+ )
+ else:
+ error_category = ErrorCategory.UNKNOWN
+ activityLogger.activity_info["errorCategory"] = error_category
+ if exception.inner_exception:
+ activityLogger.activity_info["innerException"] = type(exception.inner_exception).__name__
+ elif isinstance(exception, MlException):
+ # If exception is MlException, it will have error_category, message and target attributes and will log those
+ # information in log_activity, no need more actions here.
+ pass
+ elif isinstance(exception, ValidationError):
+ # Validation error should be user error
+ activityLogger.activity_info["errorMessage"] = str(exception)
+ activityLogger.activity_info["errorCategory"] = ErrorCategory.USER_ERROR
+ else:
+ activityLogger.activity_info["errorMessage"] = "Got error {0}: '{1}' while calling {2}".format(
+ exception.__class__.__name__,
+ exception,
+ activityLogger.activity_info["activity_name"],
+ )
+ if _is_user_error_from_exception_type(exception) or _is_user_error_from_exception_type(exception.__cause__):
+ activityLogger.activity_info["errorCategory"] = ErrorCategory.USER_ERROR
+ else:
+ # Todo: should check KeyError, TypeError, ValueError caused by user before request or raise in code directly
+ activityLogger.activity_info["errorCategory"] = ErrorCategory.UNKNOWN
+ return exception
+
+
+@contextlib.contextmanager
+def log_activity(
+ logger,
+ activity_name,
+ activity_type=ActivityType.INTERNALCALL,
+ custom_dimensions=None,
+):
+ """Log an activity.
+
+ An activity is a logical block of code that consumers want to monitor.
+ To monitor, wrap the logical block of code with the ``log_activity()`` method. As an alternative, you can
+ also use the ``@monitor_with_activity`` decorator.
+
+ :param logger: The logger adapter.
+ :type logger: logging.LoggerAdapter
+ :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block.
+ :type activity_name: str
+ :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call,
+ an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used.
+ :type activity_type: str
+ :param custom_dimensions: The custom properties of the activity.
+ :type custom_dimensions: dict
+ :return: An activity logger
+ :rtype: Iterable[ActivityLoggerAdapter]
+ """
+ activity_info = {
+ "activity_id": str(uuid.uuid4()),
+ "activity_name": activity_name,
+ "activity_type": activity_type,
+ }
+ custom_dimensions = custom_dimensions or {}
+ custom_dimensions.update({"client_request_id": str(uuid4())})
+ activity_info.update(custom_dimensions)
+
+ start_time = datetime.utcnow()
+ completion_status = ActivityCompletionStatus.SUCCESS
+
+ message = "ActivityStarted, {}".format(activity_name)
+ activityLogger = ActivityLoggerAdapter(logger, activity_info) # type: ignore[arg-type]
+ activityLogger.info(message)
+ exception = None
+
+ try:
+ yield activityLogger
+ except BaseException as e:
+ exception = error_preprocess(activityLogger, e)
+ completion_status = ActivityCompletionStatus.FAILURE
+ # All the system and unknown errors except for NotImplementedError will be wrapped with a new exception.
+ if IS_IN_CI_PIPELINE and not isinstance(e, NotImplementedError):
+ if (
+ isinstance(exception, MlException)
+ and exception.error_category # pylint: disable=no-member
+ in [ErrorCategory.SYSTEM_ERROR, ErrorCategory.UNKNOWN]
+ ) or (
+ "errorCategory" in activityLogger.activity_info
+ and activityLogger.activity_info["errorCategory"] # type: ignore[index]
+ in [ErrorCategory.SYSTEM_ERROR, ErrorCategory.UNKNOWN]
+ ):
+ # pylint: disable=W0719
+ raise Exception("Got InternalSDKError", e) from e
+ raise
+ raise
+ finally:
+ try:
+ end_time = datetime.utcnow()
+ duration_ms = round((end_time - start_time).total_seconds() * 1000, 2)
+
+ activityLogger.activity_info["completionStatus"] = completion_status # type: ignore[index]
+ activityLogger.activity_info["durationMs"] = duration_ms # type: ignore[index]
+ message = "ActivityCompleted: Activity={}, HowEnded={}, Duration={} [ms]".format(
+ activity_name, completion_status, duration_ms
+ )
+ if exception:
+ activityLogger.activity_info["exception"] = type(exception).__name__ # type: ignore[index]
+ if isinstance(exception, MlException):
+ activityLogger.activity_info[ # type: ignore[index]
+ "errorMessage"
+ ] = exception.no_personal_data_message # pylint: disable=no-member
+ # pylint: disable=no-member
+ activityLogger.activity_info["errorTarget"] = exception.target # type: ignore[index]
+ activityLogger.activity_info[ # type: ignore[index]
+ "errorCategory"
+ ] = exception.error_category # pylint: disable=no-member
+ if exception.inner_exception: # pylint: disable=no-member
+ # pylint: disable=no-member
+ activityLogger.activity_info["innerException"] = type( # type: ignore[index]
+ exception.inner_exception
+ ).__name__
+ message += ", Exception={}".format(activityLogger.activity_info["exception"])
+ message += ", ErrorCategory={}".format(activityLogger.activity_info["errorCategory"])
+ message += ", ErrorMessage={}".format(activityLogger.activity_info["errorMessage"])
+
+ activityLogger.error(message)
+ else:
+ activityLogger.info(message)
+ except Exception: # pylint: disable=W0718
+ pass
+
+
+# pylint: disable-next=docstring-missing-rtype
+def monitor_with_activity(
+ logger,
+ activity_name,
+ activity_type=ActivityType.INTERNALCALL,
+ custom_dimensions=None,
+):
+ """Add a wrapper for monitoring an activity (code).
+
+ An activity is a logical block of code that consumers want to monitor.
+ To monitor, use the ``@monitor_with_activity`` decorator. As an alternative, you can also wrap the
+ logical block of code with the ``log_activity()`` method.
+
+ :param logger: The operations logging class, containing loggers and tracer for the package and module
+ :type logger: ~azure.ai.ml._utils._logger_utils.OpsLogger
+ :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block.
+ :type activity_name: str
+ :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call,
+ an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used.
+ :type activity_type: str
+ :param custom_dimensions: The custom properties of the activity.
+ :type custom_dimensions: dict
+ :return:
+ """
+
+ def monitor(f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ tracer = logger.package_tracer if isinstance(logger, OpsLogger) else None
+ if tracer:
+ with tracer.start_as_current_span(ACTIVITY_SPAN):
+ with log_activity(
+ logger.package_logger, activity_name or f.__name__, activity_type, custom_dimensions
+ ):
+ return f(*args, **kwargs)
+ elif hasattr(logger, "package_logger"):
+ with log_activity(logger.package_logger, activity_name or f.__name__, activity_type, custom_dimensions):
+ return f(*args, **kwargs)
+ else:
+ with log_activity(logger, activity_name or f.__name__, activity_type, custom_dimensions):
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ return monitor
+
+
+# pylint: disable-next=docstring-missing-rtype
+def monitor_with_telemetry_mixin(
+ logger,
+ activity_name,
+ activity_type=ActivityType.INTERNALCALL,
+ custom_dimensions=None,
+ extra_keys=None,
+):
+ """Add a wrapper for monitoring an activity (code) with telemetry mixin.
+
+ An activity is a logical block of code that consumers want to monitor.
+ Object telemetry values will be added into custom dimensions if activity parameter or return object is
+ an instance of TelemetryMixin, which means log dimension will include: 1. telemetry collected from
+ parameter object. 2. custom dimensions passed in. 3.(optional) if no telemetry found in parameter,
+ will collect from return value.
+ To monitor, use the ``@monitor_with_telemetry_mixin`` decorator.
+
+ :param logger: The operations logging class, containing loggers and tracer for the package and module
+ :type logger: logging.LoggerAdapter
+ :param activity_name: The name of the activity. The name should be unique per the wrapped logical code block.
+ :type activity_name: str
+ :param activity_type: One of PUBLICAPI, INTERNALCALL, or CLIENTPROXY which represent an incoming API call,
+ an internal (function) call, or an outgoing API call. If not specified, INTERNALCALL is used.
+ :type activity_type: str
+ :param custom_dimensions: The custom properties of the activity.
+ :type custom_dimensions: dict
+ :param extra_keys: Extra keys from parameter to be tracked. The key in extra_keys will always be added into
+ dimensions, the value of it will be str(obj) if it exists in parameters, or 'None' if not.
+ :type extra_keys: list[str]
+ :return:
+ """
+
+ logger = logger.package_logger if isinstance(logger, OpsLogger) else logger
+
+ def monitor(f):
+ def _collect_from_parameters(f, args, kwargs, extra_keys):
+ dimensions = {}
+ named_args = dict(zip(inspect.signature(f).parameters.keys(), args))
+ parameters = {**named_args, **kwargs}
+ from azure.ai.ml.entities._mixins import TelemetryMixin
+
+ # extract mixin dimensions from arguments
+ for key, obj in parameters.items():
+ try:
+ if isinstance(obj, TelemetryMixin):
+ dimensions.update(obj._get_telemetry_values())
+ elif extra_keys and key in extra_keys:
+ dimensions[key] = str(obj)
+ except Exception: # pylint: disable=W0718
+ pass
+ # add left keys with None
+ if extra_keys:
+ for key in extra_keys:
+ if key not in parameters:
+ dimensions[key] = "None"
+ return dimensions
+
+ def _collect_from_return_value(value):
+ from azure.ai.ml.entities._mixins import TelemetryMixin
+
+ try:
+ return value._get_telemetry_values() if isinstance(value, TelemetryMixin) else {}
+ except Exception: # pylint: disable=W0718
+ return {}
+
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ parameter_dimensions = _collect_from_parameters(f, args, kwargs, extra_keys)
+ dimensions = {**parameter_dimensions, **(custom_dimensions or {})}
+ with log_activity(logger, activity_name or f.__name__, activity_type, dimensions) as activityLogger:
+ return_value = f(*args, **kwargs)
+ if not parameter_dimensions:
+ # collect from return if no dimensions from parameter
+ activityLogger.activity_info.update(_collect_from_return_value(return_value))
+ return return_value
+
+ return wrapper
+
+ return monitor