diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring')
9 files changed, 3004 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/__init__.py new file mode 100644 index 00000000..fdf8caba --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/__init__.py @@ -0,0 +1,5 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/alert_notification.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/alert_notification.py new file mode 100644 index 00000000..2df0d055 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/alert_notification.py @@ -0,0 +1,54 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import List, Optional + +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + EmailMonitoringAlertNotificationSettings, + EmailNotificationEnableType, + NotificationSetting, +) +from azure.ai.ml.entities._mixins import RestTranslatableMixin + + +class AlertNotification(RestTranslatableMixin): + """Alert notification configuration for monitoring jobs + + :keyword emails: A list of email addresses that will receive notifications for monitoring alerts. + Defaults to None. + :paramtype emails: Optional[List[str]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_spark_configurations.py + :start-after: [START spark_monitor_definition] + :end-before: [END spark_monitor_definition] + :language: python + :dedent: 8 + :caption: Configuring alert notifications for a monitored job. + """ + + def __init__( + self, + *, + emails: Optional[List[str]] = None, + ) -> None: + self.emails = emails + + def _to_rest_object( + self, + ) -> EmailMonitoringAlertNotificationSettings: + return EmailMonitoringAlertNotificationSettings( + email_notification_setting=NotificationSetting( + emails=self.emails, + email_on=[ + EmailNotificationEnableType.JOB_FAILED, + EmailNotificationEnableType.JOB_COMPLETED, + ], + ) + ) + + @classmethod + def _from_rest_object(cls, obj: EmailMonitoringAlertNotificationSettings) -> "AlertNotification": + return cls(emails=obj.email_notification_setting.emails) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/compute.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/compute.py new file mode 100644 index 00000000..ff91a814 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/compute.py @@ -0,0 +1,55 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from azure.ai.ml._exception_helper import log_and_raise_error +from azure.ai.ml._restclient.v2023_06_01_preview.models import AmlTokenComputeIdentity, MonitorServerlessSparkCompute +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException + + +class ServerlessSparkCompute: + """Serverless Spark compute. + + :param runtime_version: The runtime version of the compute. + :type runtime_version: str + :param instance_type: The instance type of the compute. + :type instance_type: str + """ + + def __init__( + self, + *, + runtime_version: str, + instance_type: str, + ): + self.runtime_version = runtime_version + self.instance_type = instance_type + + def _to_rest_object(self) -> MonitorServerlessSparkCompute: + self._validate() + return MonitorServerlessSparkCompute( + runtime_version=self.runtime_version, + instance_type=self.instance_type, + compute_identity=AmlTokenComputeIdentity( + compute_identity_type="AmlToken", + ), + ) + + @classmethod + def _from_rest_object(cls, obj: MonitorServerlessSparkCompute) -> "ServerlessSparkCompute": + return cls( + runtime_version=obj.runtime_version, + instance_type=obj.instance_type, + ) + + def _validate(self) -> None: + if self.runtime_version != "3.4": + msg = "Compute runtime version must be 3.4" + err = ValidationException( + message=msg, + target=ErrorTarget.MODEL_MONITORING, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.MISSING_FIELD, + ) + log_and_raise_error(err) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/definition.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/definition.py new file mode 100644 index 00000000..3b81be1e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/definition.py @@ -0,0 +1,162 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +from typing import Any, Dict, Optional, Union + +from typing_extensions import Literal + +from azure.ai.ml._restclient.v2023_06_01_preview.models import AzMonMonitoringAlertNotificationSettings +from azure.ai.ml._restclient.v2023_06_01_preview.models import MonitorDefinition as RestMonitorDefinition +from azure.ai.ml.constants._monitoring import ( + AZMONITORING, + DEFAULT_DATA_DRIFT_SIGNAL_NAME, + DEFAULT_DATA_QUALITY_SIGNAL_NAME, + DEFAULT_PREDICTION_DRIFT_SIGNAL_NAME, + DEFAULT_TOKEN_USAGE_SIGNAL_NAME, + MonitorTargetTasks, +) +from azure.ai.ml.entities._mixins import RestTranslatableMixin +from azure.ai.ml.entities._monitoring.alert_notification import AlertNotification +from azure.ai.ml.entities._monitoring.compute import ServerlessSparkCompute +from azure.ai.ml.entities._monitoring.signals import ( + CustomMonitoringSignal, + DataDriftSignal, + DataQualitySignal, + FeatureAttributionDriftSignal, + GenerationSafetyQualitySignal, + GenerationTokenStatisticsSignal, + MonitoringSignal, + PredictionDriftSignal, +) +from azure.ai.ml.entities._monitoring.target import MonitoringTarget + + +class MonitorDefinition(RestTranslatableMixin): + """Monitor definition + + :keyword compute: The Spark resource configuration to be associated with the monitor + :paramtype compute: ~azure.ai.ml.entities.SparkResourceConfiguration + :keyword monitoring_target: The ARM ID object associated with the model or deployment that is being monitored. + :paramtype monitoring_target: Optional[~azure.ai.ml.entities.MonitoringTarget] + :keyword monitoring_signals: The dictionary of signals to monitor. The key is the name of the signal and the value + is the DataSignal object. Accepted values for the DataSignal objects are DataDriftSignal, DataQualitySignal, + PredictionDriftSignal, FeatureAttributionDriftSignal, and CustomMonitoringSignal. + :paramtype monitoring_signals: Optional[Dict[str, Union[~azure.ai.ml.entities.DataDriftSignal + , ~azure.ai.ml.entities.DataQualitySignal, ~azure.ai.ml.entities.PredictionDriftSignal + , ~azure.ai.ml.entities.FeatureAttributionDriftSignal + , ~azure.ai.ml.entities.CustomMonitoringSignal + , ~azure.ai.ml.entities.GenerationSafetyQualitySignal + , ~azure.ai.ml.entities.GenerationTokenStatisticsSignal + , ~azure.ai.ml.entities.ModelPerformanceSignal]]] + :keyword alert_notification: The alert configuration for the monitor. + :paramtype alert_notification: Optional[Union[Literal['azmonitoring'], ~azure.ai.ml.entities.AlertNotification]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_spark_configurations.py + :start-after: [START spark_monitor_definition] + :end-before: [END spark_monitor_definition] + :language: python + :dedent: 8 + :caption: Creating Monitor definition. + """ + + def __init__( + self, + *, + compute: ServerlessSparkCompute, + monitoring_target: Optional[MonitoringTarget] = None, + monitoring_signals: Dict[ + str, + Union[ + DataDriftSignal, + DataQualitySignal, + PredictionDriftSignal, + FeatureAttributionDriftSignal, + CustomMonitoringSignal, + GenerationSafetyQualitySignal, + GenerationTokenStatisticsSignal, + ], + ] = None, # type: ignore[assignment] + alert_notification: Optional[Union[Literal["azmonitoring"], AlertNotification]] = None, + ) -> None: + self.compute = compute + self.monitoring_target = monitoring_target + self.monitoring_signals = monitoring_signals + self.alert_notification = alert_notification + + def _to_rest_object(self, **kwargs: Any) -> RestMonitorDefinition: + default_data_window_size = kwargs.get("default_data_window_size") + ref_data_window_size = kwargs.get("ref_data_window_size") + rest_alert_notification = None + if self.alert_notification: + if isinstance(self.alert_notification, str) and self.alert_notification.lower() == AZMONITORING: + rest_alert_notification = AzMonMonitoringAlertNotificationSettings() + else: + if not isinstance(self.alert_notification, str): + rest_alert_notification = self.alert_notification._to_rest_object() + + if self.monitoring_signals is not None: + _signals = { + signal_name: signal._to_rest_object( + default_data_window_size=default_data_window_size, + ref_data_window_size=ref_data_window_size, + ) + for signal_name, signal in self.monitoring_signals.items() + } + return RestMonitorDefinition( + compute_configuration=self.compute._to_rest_object(), + monitoring_target=self.monitoring_target._to_rest_object() if self.monitoring_target else None, + signals=_signals, # pylint: disable=possibly-used-before-assignment + alert_notification_setting=rest_alert_notification, + ) + + @classmethod + def _from_rest_object( + cls, # pylint: disable=unused-argument + obj: RestMonitorDefinition, + **kwargs: Any, + ) -> "MonitorDefinition": + from_rest_alert_notification: Any = None + if obj.alert_notification_setting: + if isinstance(obj.alert_notification_setting, AzMonMonitoringAlertNotificationSettings): + from_rest_alert_notification = AZMONITORING + else: + from_rest_alert_notification = AlertNotification._from_rest_object(obj.alert_notification_setting) + + _monitoring_signals = {} + for signal_name, signal in obj.signals.items(): + _monitoring_signals[signal_name] = MonitoringSignal._from_rest_object(signal) + + return cls( + compute=ServerlessSparkCompute._from_rest_object(obj.compute_configuration), + monitoring_target=( + MonitoringTarget( + endpoint_deployment_id=obj.monitoring_target.deployment_id, ml_task=obj.monitoring_target.task_type + ) + if obj.monitoring_target + else None + ), + monitoring_signals=_monitoring_signals, # type: ignore[arg-type] + alert_notification=from_rest_alert_notification, + ) + + def _populate_default_signal_information(self) -> None: + if ( + isinstance(self.monitoring_target, MonitoringTarget) + and self.monitoring_target.ml_task is not None + and self.monitoring_target.ml_task.lower() + == MonitorTargetTasks.QUESTION_ANSWERING.lower() # type: ignore[union-attr] + ): + self.monitoring_signals = { + DEFAULT_TOKEN_USAGE_SIGNAL_NAME: GenerationTokenStatisticsSignal._get_default_token_statistics_signal(), + } + else: + self.monitoring_signals = { + DEFAULT_DATA_DRIFT_SIGNAL_NAME: DataDriftSignal._get_default_data_drift_signal(), + DEFAULT_PREDICTION_DRIFT_SIGNAL_NAME: PredictionDriftSignal._get_default_prediction_drift_signal(), + DEFAULT_DATA_QUALITY_SIGNAL_NAME: DataQualitySignal._get_default_data_quality_signal(), + } diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/input_data.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/input_data.py new file mode 100644 index 00000000..10d80531 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/input_data.py @@ -0,0 +1,206 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import datetime +from typing import Dict, Optional + +import isodate + +from azure.ai.ml._restclient.v2023_06_01_preview.models import FixedInputData as RestFixedInputData +from azure.ai.ml._restclient.v2023_06_01_preview.models import MonitoringInputDataBase as RestMonitorInputBase +from azure.ai.ml._restclient.v2023_06_01_preview.models import StaticInputData as RestStaticInputData +from azure.ai.ml._restclient.v2023_06_01_preview.models import TrailingInputData as RestTrailingInputData +from azure.ai.ml._utils.utils import camel_to_snake, snake_to_camel +from azure.ai.ml.constants._monitoring import MonitorDatasetContext, MonitorInputDataType +from azure.ai.ml.entities._mixins import RestTranslatableMixin + + +class MonitorInputData(RestTranslatableMixin): + """Monitor input data. + + :keyword type: Specifies the type of monitoring input data. + :paramtype type: MonitorInputDataType + :keyword input_dataset: Input data used by the monitor + :paramtype input_dataset: Optional[~azure.ai.ml.Input] + :keyword dataset_context: The context of the input dataset. Accepted values are "model_inputs", + "model_outputs", "training", "test", "validation", and "ground_truth". + :paramtype dataset_context: Optional[Union[str, ~azure.ai.ml.constants.MonitorDatasetContext]] + :keyword target_column_name: The target column in the given input dataset. + :paramtype target_column_name: Optional[str] + :keyword pre_processing_component: The ARM (Azure Resource Manager) resource ID of the component resource used to + preprocess the data. + :paramtype pre_processing_component: Optional[str] + """ + + def __init__( + self, + *, + type: Optional[MonitorInputDataType] = None, + data_context: Optional[MonitorDatasetContext] = None, + target_columns: Optional[Dict] = None, + job_type: Optional[str] = None, + uri: Optional[str] = None, + ): + self.type = type + self.data_context = data_context + self.target_columns = target_columns + self.job_type = job_type + self.uri = uri + + @classmethod + def _from_rest_object(cls, obj: RestMonitorInputBase) -> Optional["MonitorInputData"]: + if obj.input_data_type == MonitorInputDataType.FIXED: + return FixedInputData._from_rest_object(obj) + if obj.input_data_type == MonitorInputDataType.TRAILING: + return TrailingInputData._from_rest_object(obj) + if obj.input_data_type == MonitorInputDataType.STATIC: + return StaticInputData._from_rest_object(obj) + + return None + + +class FixedInputData(MonitorInputData): + """ + :ivar type: Specifies the type of monitoring input data. Set automatically to "Fixed" for this class. + :var type: MonitorInputDataType + """ + + def __init__( + self, + *, + data_context: Optional[MonitorDatasetContext] = None, + target_columns: Optional[Dict] = None, + job_type: Optional[str] = None, + uri: Optional[str] = None, + ): + super().__init__( + type=MonitorInputDataType.FIXED, + data_context=data_context, + target_columns=target_columns, + job_type=job_type, + uri=uri, + ) + + def _to_rest_object(self) -> RestFixedInputData: + return RestFixedInputData( + data_context=camel_to_snake(self.data_context), + columns=self.target_columns, + job_input_type=self.job_type, + uri=self.uri, + ) + + @classmethod + def _from_rest_object(cls, obj: RestFixedInputData) -> "FixedInputData": + return cls( + data_context=camel_to_snake(obj.data_context), + target_columns=obj.columns, + job_type=obj.job_input_type, + uri=obj.uri, + ) + + +class TrailingInputData(MonitorInputData): + """ + :ivar type: Specifies the type of monitoring input data. Set automatically to "Trailing" for this class. + :var type: MonitorInputDataType + """ + + def __init__( + self, + *, + data_context: Optional[MonitorDatasetContext] = None, + target_columns: Optional[Dict] = None, + job_type: Optional[str] = None, + uri: Optional[str] = None, + window_size: Optional[str] = None, + window_offset: Optional[str] = None, + pre_processing_component_id: Optional[str] = None, + ): + super().__init__( + type=MonitorInputDataType.TRAILING, + data_context=data_context, + target_columns=target_columns, + job_type=job_type, + uri=uri, + ) + self.window_size = window_size + self.window_offset = window_offset + self.pre_processing_component_id = pre_processing_component_id + + def _to_rest_object(self) -> RestTrailingInputData: + return RestTrailingInputData( + data_context=camel_to_snake(self.data_context), + columns=self.target_columns, + job_input_type=self.job_type, + uri=self.uri, + window_size=self.window_size, + window_offset=self.window_offset, + preprocessing_component_id=self.pre_processing_component_id, + ) + + @classmethod + def _from_rest_object(cls, obj: RestTrailingInputData) -> "TrailingInputData": + return cls( + data_context=snake_to_camel(obj.data_context), + target_columns=obj.columns, + job_type=obj.job_input_type, + uri=obj.uri, + window_size=str(isodate.duration_isoformat(obj.window_size)), + window_offset=str(isodate.duration_isoformat(obj.window_offset)), + pre_processing_component_id=obj.preprocessing_component_id, + ) + + +class StaticInputData(MonitorInputData): + """ + :ivar type: Specifies the type of monitoring input data. Set automatically to "Static" for this class. + :var type: MonitorInputDataType + """ + + def __init__( + self, + *, + data_context: Optional[MonitorDatasetContext] = None, + target_columns: Optional[Dict] = None, + job_type: Optional[str] = None, + uri: Optional[str] = None, + pre_processing_component_id: Optional[str] = None, + window_start: Optional[str] = None, + window_end: Optional[str] = None, + ): + super().__init__( + type=MonitorInputDataType.STATIC, + data_context=data_context, + target_columns=target_columns, + job_type=job_type, + uri=uri, + ) + self.pre_processing_component_id = pre_processing_component_id + self.window_start = window_start + self.window_end = window_end + + def _to_rest_object(self) -> RestStaticInputData: + return RestStaticInputData( + data_context=camel_to_snake(self.data_context), + columns=self.target_columns, + job_input_type=self.job_type, + uri=self.uri, + preprocessing_component_id=self.pre_processing_component_id, + window_start=datetime.datetime.strptime(str(self.window_start), "%Y-%m-%d"), + window_end=datetime.datetime.strptime(str(self.window_end), "%Y-%m-%d"), + ) + + @classmethod + def _from_rest_object(cls, obj: RestStaticInputData) -> "StaticInputData": + return cls( + data_context=snake_to_camel(obj.data_context), + target_columns=obj.columns, + job_type=obj.job_input_type, + uri=obj.uri, + pre_processing_component_id=obj.preprocessing_component_id, + window_start=str(datetime.datetime.strftime(obj.window_start, "%Y-%m-%d")), + window_end=datetime.datetime.strftime(obj.window_end, "%Y-%m-%d"), + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/schedule.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/schedule.py new file mode 100644 index 00000000..f23c4e3e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/schedule.py @@ -0,0 +1,175 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import logging +from os import PathLike +from pathlib import Path +from typing import IO, Any, AnyStr, Dict, Optional, Union, cast + +from azure.ai.ml._restclient.v2023_06_01_preview.models import CreateMonitorAction, RecurrenceFrequency +from azure.ai.ml._restclient.v2023_06_01_preview.models import Schedule as RestSchedule +from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleProperties +from azure.ai.ml._schema.monitoring.schedule import MonitorScheduleSchema +from azure.ai.ml._utils.utils import dump_yaml_to_file +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, ScheduleType +from azure.ai.ml.entities._mixins import RestTranslatableMixin +from azure.ai.ml.entities._monitoring.definition import MonitorDefinition +from azure.ai.ml.entities._schedule.schedule import Schedule +from azure.ai.ml.entities._schedule.trigger import CronTrigger, RecurrenceTrigger, TriggerBase +from azure.ai.ml.entities._system_data import SystemData +from azure.ai.ml.entities._util import load_from_dict + +module_logger = logging.getLogger(__name__) + + +class MonitorSchedule(Schedule, RestTranslatableMixin): + """Monitor schedule. + + :keyword name: The schedule name. + :paramtype name: str + :keyword trigger: The schedule trigger. + :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger] + :keyword create_monitor: The schedule action monitor definition. + :paramtype create_monitor: ~azure.ai.ml.entities.MonitorDefinition + :keyword display_name: The display name of the schedule. + :paramtype display_name: Optional[str] + :keyword description: A description of the schedule. + :paramtype description: Optional[str] + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. + :paramtype tags: Optional[dict[str, str]] + :keyword properties: The job property dictionary. + :paramtype properties: Optional[dict[str, str]] + """ + + def __init__( + self, + *, + name: str, + trigger: Optional[Union[CronTrigger, RecurrenceTrigger]], + create_monitor: MonitorDefinition, + display_name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + properties: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + super().__init__( + name=name, + trigger=trigger, + display_name=display_name, + description=description, + tags=tags, + properties=properties, + **kwargs, + ) + self.create_monitor = create_monitor + self._type = ScheduleType.MONITOR + + @classmethod + def _load( + cls, + data: Optional[Dict] = None, + yaml_path: Optional[Union[PathLike, str]] = None, + params_override: Optional[list] = None, + **kwargs: Any, + ) -> "MonitorSchedule": + data = data or {} + params_override = params_override or [] + context = { + BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"), + PARAMS_OVERRIDE_KEY: params_override, + } + return cls( + base_path=cast(Dict, context[BASE_PATH_CONTEXT_KEY]), + **load_from_dict(MonitorScheduleSchema, data, context, **kwargs), + ) + + def _to_rest_object(self) -> RestSchedule: + if self.tags is not None: + tags = { + **self.tags, + } + # default data window size is calculated based on the trigger frequency + # by default 7 days if user provides incorrect recurrence frequency + # or a cron expression + default_data_window_size = "P7D" + ref_data_window_size = "P14D" + if isinstance(self.trigger, RecurrenceTrigger): + frequency = self.trigger.frequency.lower() + interval = self.trigger.interval + if frequency == RecurrenceFrequency.MINUTE.lower() or frequency == RecurrenceFrequency.HOUR.lower(): + default_data_window_size = "P1D" + ref_data_window_size = "P2D" + elif frequency == RecurrenceFrequency.DAY.lower(): + default_data_window_size = f"P{interval}D" + ref_data_window_size = f"P{interval * 2}D" + elif frequency == RecurrenceFrequency.WEEK.lower(): + default_data_window_size = f"P{interval * 7}D" + ref_data_window_size = f"P{(interval * 7) * 2}D" + elif frequency == RecurrenceFrequency.MONTH.lower(): + default_data_window_size = f"P{interval * 30}D" + ref_data_window_size = f"P{(interval * 30) * 2}D" + + return RestSchedule( + properties=ScheduleProperties( + description=self.description, + properties=self.properties, + tags=tags, # pylint: disable=possibly-used-before-assignment + action=CreateMonitorAction( + monitor_definition=self.create_monitor._to_rest_object( + default_data_window_size=default_data_window_size, ref_data_window_size=ref_data_window_size + ) + ), + display_name=self.display_name, + is_enabled=self._is_enabled, + trigger=self.trigger._to_rest_object() if self.trigger is not None else None, + ) + ) + + def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None: + """Dump the asset content into a file in YAML format. + + :param dest: The local path or file stream to write the YAML content to. + If dest is a file path, a new file will be created. + If dest is an open file, the file will be written to directly. + :type dest: Union[PathLike, str, IO[AnyStr]] + :raises FileExistsError: Raised if dest is a file path and the file already exists. + :raises IOError: Raised if dest is an open file and the file is not writable. + """ + path = kwargs.pop("path", None) + yaml_serialized = self._to_dict() + dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs) + + def _to_dict(self) -> Dict: + res: dict = MonitorScheduleSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self) + return res + + @classmethod + def _from_rest_object(cls, obj: RestSchedule) -> "MonitorSchedule": + properties = obj.properties + return cls( + trigger=TriggerBase._from_rest_object(properties.trigger), + create_monitor=MonitorDefinition._from_rest_object( + properties.action.monitor_definition, tags=obj.properties.tags + ), + name=obj.name, + id=obj.id, + display_name=properties.display_name, + description=properties.description, + tags=properties.tags, + properties=properties.properties, + provisioning_state=properties.provisioning_state, + is_enabled=properties.is_enabled, + creation_context=SystemData._from_rest_object(obj.system_data) if obj.system_data else None, + ) + + def _create_default_monitor_definition(self) -> None: + self.create_monitor._populate_default_signal_information() + + def _set_baseline_data_trailing_tags_for_signal(self, signal_name: str) -> None: + if self.tags is not None: + self.tags[f"{signal_name}.baselinedata.datarange.type"] = "Trailing" + self.tags[f"{signal_name}.baselinedata.datarange.window_size"] = "P7D" diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/signals.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/signals.py new file mode 100644 index 00000000..5a9e1df7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/signals.py @@ -0,0 +1,1338 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access, too-many-lines + +import datetime +from typing import Any, Dict, List, Optional, Union + +import isodate +from typing_extensions import Literal + +from azure.ai.ml._exception_helper import log_and_raise_error +from azure.ai.ml._restclient.v2023_06_01_preview.models import AllFeatures as RestAllFeatures +from azure.ai.ml._restclient.v2023_06_01_preview.models import CustomMonitoringSignal as RestCustomMonitoringSignal +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + DataDriftMonitoringSignal as RestMonitoringDataDriftSignal, +) +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + DataQualityMonitoringSignal as RestMonitoringDataQualitySignal, +) +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + FeatureAttributionDriftMonitoringSignal as RestFeatureAttributionDriftMonitoringSignal, +) +from azure.ai.ml._restclient.v2023_06_01_preview.models import FeatureSubset as RestFeatureSubset +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + GenerationSafetyQualityMonitoringSignal as RestGenerationSafetyQualityMonitoringSignal, +) +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + GenerationTokenStatisticsSignal as RestGenerationTokenStatisticsSignal, +) +from azure.ai.ml._restclient.v2023_06_01_preview.models import ModelPerformanceSignal as RestModelPerformanceSignal +from azure.ai.ml._restclient.v2023_06_01_preview.models import MonitoringDataSegment as RestMonitoringDataSegment +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + MonitoringFeatureFilterBase as RestMonitoringFeatureFilterBase, +) +from azure.ai.ml._restclient.v2023_06_01_preview.models import MonitoringInputDataBase as RestMonitoringInputData +from azure.ai.ml._restclient.v2023_06_01_preview.models import MonitoringNotificationMode +from azure.ai.ml._restclient.v2023_06_01_preview.models import MonitoringSignalBase as RestMonitoringSignalBase +from azure.ai.ml._restclient.v2023_06_01_preview.models import MonitoringSignalType +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + MonitoringWorkspaceConnection as RestMonitoringWorkspaceConnection, +) +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + PredictionDriftMonitoringSignal as RestPredictionDriftMonitoringSignal, +) +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + TopNFeaturesByAttribution as RestTopNFeaturesByAttribution, +) +from azure.ai.ml._utils._experimental import experimental +from azure.ai.ml.constants._monitoring import ( + ALL_FEATURES, + MonitorDatasetContext, + MonitorFeatureDataType, + MonitorSignalType, +) +from azure.ai.ml.entities._inputs_outputs import Input +from azure.ai.ml.entities._job._input_output_helpers import ( + from_rest_inputs_to_dataset_literal, + to_rest_dataset_literal_inputs, +) +from azure.ai.ml.entities._mixins import RestTranslatableMixin +from azure.ai.ml.entities._monitoring.input_data import FixedInputData, StaticInputData, TrailingInputData +from azure.ai.ml.entities._monitoring.thresholds import ( + CustomMonitoringMetricThreshold, + DataDriftMetricThreshold, + DataQualityMetricThreshold, + FeatureAttributionDriftMetricThreshold, + GenerationSafetyQualityMonitoringMetricThreshold, + GenerationTokenStatisticsMonitorMetricThreshold, + MetricThreshold, + ModelPerformanceMetricThreshold, + PredictionDriftMetricThreshold, +) +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException + + +class DataSegment(RestTranslatableMixin): + """Data segment for monitoring. + + :keyword feature_name: The feature to segment the data on. + :paramtype feature_name: str + :keyword feature_values: A list of values for the given segmented feature to filter. + :paramtype feature_values: List[str] + """ + + def __init__( + self, + *, + feature_name: Optional[str] = None, + feature_values: Optional[List[str]] = None, + ) -> None: + self.feature_name = feature_name + self.feature_values = feature_values + + def _to_rest_object(self) -> RestMonitoringDataSegment: + return RestMonitoringDataSegment(feature=self.feature_name, values=self.feature_values) + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringDataSegment) -> "DataSegment": + return cls( + feature_name=obj.feature, + feature_values=obj.values, + ) + + +class MonitorFeatureFilter(RestTranslatableMixin): + """Monitor feature filter + + :keyword top_n_feature_importance: The number of top features to include. Defaults to 10. + :paramtype top_n_feature_importance: int + """ + + def __init__( + self, + *, + top_n_feature_importance: int = 10, + ) -> None: + self.top_n_feature_importance = top_n_feature_importance + + def _to_rest_object(self) -> RestTopNFeaturesByAttribution: + return RestTopNFeaturesByAttribution( + top=self.top_n_feature_importance, + ) + + @classmethod + def _from_rest_object(cls, obj: RestTopNFeaturesByAttribution) -> "MonitorFeatureFilter": + return cls(top_n_feature_importance=obj.top) + + +class BaselineDataRange: + """Baseline data range for monitoring. + + This class is used when initializing a data_window for a ReferenceData object. + For trailing input, set lookback_window_size and lookback_window_offset to a desired value. + For static input, set window_start and window_end to a desired value. + """ + + def __init__( + self, + *, + window_start: Optional[str] = None, + window_end: Optional[str] = None, + lookback_window_size: Optional[str] = None, + lookback_window_offset: Optional[str] = None, + ): + self.window_start = window_start + self.window_end = window_end + self.lookback_window_size = lookback_window_size + self.lookback_window_offset = lookback_window_offset + + +class ProductionData(RestTranslatableMixin): + """Production Data + + :param input_data: The data for which drift will be calculated + :type Input: ~azure.ai.ml.entities._input_outputs + :param data_context: The context of the input dataset. Possible values + include: model_inputs, model_outputs, training, test, validation, ground_truth + :type MonitorDatasetContext: ~azure.ai.ml.constants.MonitorDatasetContext + :param pre_processing_component: ARM resource ID of the component resource used to + preprocess the data. + :type pre_processing_component: string + :param data_window: The number of days or a time frame that a singal monitor looks back over the target. + :type data_window_size: BaselineDataRange + """ + + def __init__( + self, + *, + input_data: Input, + data_context: Optional[MonitorDatasetContext] = None, + pre_processing_component: Optional[str] = None, + data_window: Optional[BaselineDataRange] = None, + data_column_names: Optional[Dict[str, str]] = None, + ): + self.input_data = input_data + self.data_context = data_context + self.pre_processing_component = pre_processing_component + self.data_window = data_window + self.data_column_names = data_column_names + + def _to_rest_object(self, **kwargs: Any) -> RestMonitoringInputData: + self._validate() + default_data_window_size = kwargs.get("default_data_window_size") + if self.data_window is None: + self.data_window = BaselineDataRange( + lookback_window_size=default_data_window_size, lookback_window_offset="P0D" + ) + if self.data_window.lookback_window_size in ["default", None]: + self.data_window.lookback_window_size = default_data_window_size + uri = self.input_data.path + job_type = self.input_data.type + monitoring_input_data = TrailingInputData( + data_context=self.data_context, + target_columns=self.data_column_names, + job_type=job_type, + uri=uri, + pre_processing_component_id=self.pre_processing_component, + window_size=self.data_window.lookback_window_size, + window_offset=( + self.data_window.lookback_window_offset + if self.data_window.lookback_window_offset is not None + else "P0D" + ), + ) + return monitoring_input_data._to_rest_object() + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringInputData) -> "ProductionData": + data_window = BaselineDataRange( + lookback_window_size=isodate.duration_isoformat(obj.window_size), + lookback_window_offset=isodate.duration_isoformat(obj.window_offset), + ) + return cls( + input_data=Input( + path=obj.uri, + type=obj.job_input_type, + ), + data_context=obj.data_context, + pre_processing_component=obj.preprocessing_component_id, + data_window=data_window, + data_column_names=obj.columns, + ) + + def _validate(self) -> None: + if self.data_window: + if self.data_window.window_start or self.data_window.window_end: + msg = "ProductionData only accepts lookback_window_size and lookback_window_offset." + err = ValidationException( + message=msg, + target=ErrorTarget.MODEL_MONITORING, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.MISSING_FIELD, + ) + log_and_raise_error(err) + + +class ReferenceData(RestTranslatableMixin): + """Reference Data + + :param input_data: The data for which drift will be calculated + :type Input: ~azure.ai.ml.entities._input_outputs + :param data_context: The context of the input dataset. Possible values + include: model_inputs, model_outputs, training, test, validation, ground_truth + :type MonitorDatasetContext: ~azure.ai.ml.constants.MonitorDatasetContext + :param pre_processing_component: ARM resource ID of the component resource used to + preprocess the data. + :type pre_processing_component: string + :param target_column_name: The name of the target column in the dataset. + :type target_column_name: string + :param data_window: The number of days or a time frame that a single monitor looks back over the target. + :type data_window_size: BaselineDataRange + """ + + def __init__( + self, + *, + input_data: Input, + data_context: Optional[MonitorDatasetContext] = None, + pre_processing_component: Optional[str] = None, + data_window: Optional[BaselineDataRange] = None, + data_column_names: Optional[Dict[str, str]] = None, + ): + self.input_data = input_data + self.data_context = data_context + self.pre_processing_component = pre_processing_component + self.data_window = data_window + self.data_column_names = data_column_names + + def _to_rest_object(self, **kwargs: Any) -> RestMonitoringInputData: + default_data_window = kwargs.get("default_data_window") + ref_data_window_size = kwargs.get("ref_data_window_size") + if self.data_window is not None: + if self.data_window.lookback_window_size is not None: + if self.data_window.lookback_window_size == "default": + self.data_window.lookback_window_size = ref_data_window_size + if self.data_window.lookback_window_offset == "default": + self.data_window.lookback_window_offset = default_data_window + return TrailingInputData( + data_context=self.data_context, + target_columns=self.data_column_names, + job_type=self.input_data.type, + uri=self.input_data.path, + pre_processing_component_id=self.pre_processing_component, + window_size=self.data_window.lookback_window_size, + window_offset=( + self.data_window.lookback_window_offset + if self.data_window.lookback_window_offset is not None + else "P0D" + ), + )._to_rest_object() + if self.data_window.window_start is not None and self.data_window.window_end is not None: + return StaticInputData( + data_context=self.data_context, + target_columns=self.data_column_names, + job_type=self.input_data.type, + uri=self.input_data.path, + pre_processing_component_id=self.pre_processing_component, + window_start=self.data_window.window_start, + window_end=self.data_window.window_end, + )._to_rest_object() + + return FixedInputData( + data_context=self.data_context, + target_columns=self.data_column_names, + job_type=self.input_data.type, + uri=self.input_data.path, + )._to_rest_object() + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringInputData) -> "ReferenceData": + data_window = None + if obj.input_data_type == "Static": + data_window = BaselineDataRange( + window_start=datetime.datetime.strftime(obj.window_start, "%Y-%m-%d"), + window_end=datetime.datetime.strftime(obj.window_end, "%Y-%m-%d"), + ) + if obj.input_data_type == "Trailing": + data_window = BaselineDataRange( + lookback_window_size=isodate.duration_isoformat(obj.window_size), + lookback_window_offset=isodate.duration_isoformat(obj.window_offset), + ) + + return cls( + input_data=Input( + path=obj.uri, + type=obj.job_input_type, + ), + data_context=obj.data_context, + pre_processing_component=obj.preprocessing_component_id if obj.input_data_type != "Fixed" else None, + data_window=data_window, + data_column_names=obj.columns, + ) + + +class MonitoringSignal(RestTranslatableMixin): + """ + Base class for monitoring signals. + + This class should not be instantiated directly. Instead, use one of its subclasses. + + :keyword baseline_dataset: The baseline dataset definition for monitor input. + :paramtype baseline_dataset: ~azure.ai.ml.entities.MonitorInputData + :keyword metric_thresholds: The metric thresholds for the signal. + :paramtype metric_thresholds: Union[ + ~azure.ai.ml.entities.DataDriftMetricThreshold, + ~azure.ai.ml.entities.DataQualityMetricThreshold, + ~azure.ai.ml.entities.PredictionDriftMetricThreshold, + ~azure.ai.ml.entities.FeatureAttributionDriftMetricThreshold, + ~azure.ai.ml.entities.CustomMonitoringMetricThreshold, + ~azure.ai.ml.entities.GenerationSafetyQualityMonitoringMetricThreshold, + List[Union[ + ~azure.ai.ml.entities.DataDriftMetricThreshold, + ~azure.ai.ml.entities.DataQualityMetricThreshold, + ~azure.ai.ml.entities.PredictionDriftMetricThreshold, + ~azure.ai.ml.entities.FeatureAttributionDriftMetricThreshold, + ~azure.ai.ml.entities.CustomMonitoringMetricThreshold, + ~azure.ai.ml.entities.GenerationSafetyQualityMonitoringMetricThreshold, + + ]]] + :keyword alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + """ + + def __init__( + self, + *, + production_data: Optional[ProductionData] = None, + reference_data: Optional[ReferenceData] = None, + metric_thresholds: Optional[Union[MetricThreshold, List[MetricThreshold]]], + properties: Optional[Dict[str, str]] = None, + alert_enabled: bool = False, + ): + self.production_data = production_data + self.reference_data = reference_data + self.metric_thresholds = metric_thresholds + self.alert_enabled = alert_enabled + self.properties = properties + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringSignalBase) -> Optional[ # pylint: disable=too-many-return-statements + Union[ + "DataDriftSignal", + "DataQualitySignal", + "PredictionDriftSignal", + "ModelPerformanceSignal", + "FeatureAttributionDriftSignal", + "CustomMonitoringSignal", + "GenerationSafetyQualitySignal", + "GenerationTokenStatisticsSignal", + ] + ]: + if obj.signal_type == MonitoringSignalType.DATA_DRIFT: + return DataDriftSignal._from_rest_object(obj) + if obj.signal_type == MonitoringSignalType.DATA_QUALITY: + return DataQualitySignal._from_rest_object(obj) + if obj.signal_type == MonitoringSignalType.PREDICTION_DRIFT: + return PredictionDriftSignal._from_rest_object(obj) + if obj.signal_type == "ModelPerformanceSignalBase": + return ModelPerformanceSignal._from_rest_object(obj) + if obj.signal_type == MonitoringSignalType.FEATURE_ATTRIBUTION_DRIFT: + return FeatureAttributionDriftSignal._from_rest_object(obj) + if obj.signal_type == MonitoringSignalType.CUSTOM: + return CustomMonitoringSignal._from_rest_object(obj) + if obj.signal_type == MonitoringSignalType.GENERATION_SAFETY_QUALITY: + return GenerationSafetyQualitySignal._from_rest_object(obj) + if obj.signal_type == MonitoringSignalType.MODEL_PERFORMANCE: + return ModelPerformanceSignal._from_rest_object(obj) + if obj.signal_type == MonitoringSignalType.GENERATION_TOKEN_STATISTICS: + return GenerationTokenStatisticsSignal._from_rest_object(obj) + + return None + + +class DataSignal(MonitoringSignal): + """Base class for data signals. + + This class should not be instantiated directly. Instead, use one of its subclasses. + + :keyword baseline_dataset: The baseline dataset definition for monitor input. + :paramtype baseline_dataset: ~azure.ai.ml.entities.MonitorInputData + :keyword features: The features to include in the signal. + :paramtype features: Union[List[str], ~azure.ai.ml.entities.MonitorFeatureFilter, Literal[ALL_FEATURES]] + :keyword metric_thresholds: The metric thresholds for the signal. + :paramtype metric_thresholds: List[Union[ + ~azure.ai.ml.entities.DataDriftMetricThreshold, + ~azure.ai.ml.entities.DataQualityMetricThreshold, + ~azure.ai.ml.entities.PredictionDriftMetricThreshold, + ~azure.ai.ml.entities.FeatureAttributionDriftMetricThreshold, + ~azure.ai.ml.entities.CustomMonitoringMetricThreshold, + ~azure.ai.ml.entities.GenerationSafetyQualityMonitoringMetricThreshold, + + ]] + :keyword alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + """ + + def __init__( + self, + *, + production_data: Optional[ProductionData] = None, + reference_data: Optional[ReferenceData] = None, + features: Optional[Union[List[str], MonitorFeatureFilter, Literal["all_features"]]] = None, + feature_type_override: Optional[Dict[str, Union[str, MonitorFeatureDataType]]] = None, + metric_thresholds: Optional[Union[MetricThreshold, List[MetricThreshold]]], + alert_enabled: bool = False, + properties: Optional[Dict[str, str]] = None, + ): + super().__init__( + production_data=production_data, + reference_data=reference_data, + metric_thresholds=metric_thresholds, + alert_enabled=alert_enabled, + properties=properties, + ) + self.features = features + self.feature_type_override = feature_type_override + + +class DataDriftSignal(DataSignal): + """Data drift signal. + + :ivar type: The type of the signal, set to "data_drift" for this class. + :vartype type: str + :param production_data: The data for which drift will be calculated + :paramtype production_data: ~azure.ai.ml.entities.ProductionData + :param reference_data: The data to calculate drift against + :paramtype reference_data: ~azure.ai.ml.entities.ReferenceData + :param metric_thresholds: Metrics to calculate and their associated thresholds + :paramtype metric_thresholds: ~azure.ai.ml.entities.DataDriftMetricThreshold + :param alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + :param data_segment: The data segment used for scoping on a subset of the data population. + :paramtype data_segment: ~azure.ai.ml.entities.DataSegment + :keyword features: The feature filter identifying which feature(s) to calculate drift over. + :paramtype features: Union[List[str], ~azure.ai.ml.entities.MonitorFeatureFilter, Literal['all_features']] + :param feature_type_override: Dictionary of features and what they should be overridden to. + :paramtype feature_type_override: dict[str, str] + :param properties: Dictionary of additional properties. + :paramtype properties: dict[str, str] + """ + + def __init__( + self, + *, + production_data: Optional[ProductionData] = None, + reference_data: Optional[ReferenceData] = None, + features: Optional[Union[List[str], MonitorFeatureFilter, Literal["all_features"]]] = None, + feature_type_override: Optional[Dict[str, Union[str, MonitorFeatureDataType]]] = None, + metric_thresholds: Optional[Union[DataDriftMetricThreshold, List[MetricThreshold]]] = None, + alert_enabled: bool = False, + data_segment: Optional[DataSegment] = None, + properties: Optional[Dict[str, str]] = None, + ): + super().__init__( + production_data=production_data, + reference_data=reference_data, + metric_thresholds=metric_thresholds, + features=features, + feature_type_override=feature_type_override, + alert_enabled=alert_enabled, + properties=properties, + ) + self.type = MonitorSignalType.DATA_DRIFT + self.data_segment = data_segment + + def _to_rest_object(self, **kwargs: Any) -> RestMonitoringDataDriftSignal: + default_data_window_size = kwargs.get("default_data_window_size") + ref_data_window_size = kwargs.get("ref_data_window_size") + if self.production_data is not None and self.production_data.data_window is None: + self.production_data.data_window = BaselineDataRange(lookback_window_size=default_data_window_size) + rest_features = _to_rest_features(self.features) if self.features else None + return RestMonitoringDataDriftSignal( + production_data=( + self.production_data._to_rest_object(default_data_window_size=default_data_window_size) + if self.production_data is not None + else None + ), + reference_data=( + self.reference_data._to_rest_object( + default_data_window=default_data_window_size, ref_data_window_size=ref_data_window_size + ) + if self.reference_data is not None + else None + ), + features=rest_features, + feature_data_type_override=self.feature_type_override, + metric_thresholds=( + self.metric_thresholds._to_rest_object() + if isinstance(self.metric_thresholds, MetricThreshold) + else None + ), + mode=MonitoringNotificationMode.ENABLED if self.alert_enabled else MonitoringNotificationMode.DISABLED, + data_segment=self.data_segment._to_rest_object() if self.data_segment else None, + properties=self.properties, + ) + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringDataDriftSignal) -> "DataDriftSignal": + return cls( + production_data=ProductionData._from_rest_object(obj.production_data), + reference_data=ReferenceData._from_rest_object(obj.reference_data), + features=_from_rest_features(obj.features), + feature_type_override=obj.feature_data_type_override, + metric_thresholds=DataDriftMetricThreshold._from_rest_object(obj.metric_thresholds), + alert_enabled=( + False + if not obj.mode or (obj.mode and obj.mode == MonitoringNotificationMode.DISABLED) + else MonitoringNotificationMode.ENABLED + ), + data_segment=DataSegment._from_rest_object(obj.data_segment) if obj.data_segment else None, + properties=obj.properties, + ) + + @classmethod + def _get_default_data_drift_signal(cls) -> "DataDriftSignal": + return cls( + features=ALL_FEATURES, # type: ignore[arg-type] + metric_thresholds=DataDriftMetricThreshold._get_default_thresholds(), + ) + + +class PredictionDriftSignal(MonitoringSignal): + """Prediction drift signal. + + :ivar type: The type of the signal, set to "prediction_drift" for this class. + :vartype type: str + :param production_data: The data for which drift will be calculated + :paramtype production_data: ~azure.ai.ml.entities.ProductionData + :param reference_data: The data to calculate drift against + :paramtype reference_data: ~azure.ai.ml.entities.ReferenceData + :param metric_thresholds: Metrics to calculate and their associated thresholds + :paramtype metric_thresholds: ~azure.ai.ml.entities.DataDriftMetricThreshold + :param alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + :param properties: Dictionary of additional properties. + :paramtype properties: dict[str, str] + """ + + def __init__( + self, + *, + production_data: Optional[ProductionData] = None, + reference_data: Optional[ReferenceData] = None, + metric_thresholds: PredictionDriftMetricThreshold, + alert_enabled: bool = False, + properties: Optional[Dict[str, str]] = None, + ): + super().__init__( + production_data=production_data, + reference_data=reference_data, + metric_thresholds=metric_thresholds, + alert_enabled=alert_enabled, + properties=properties, + ) + self.type = MonitorSignalType.PREDICTION_DRIFT + + def _to_rest_object(self, **kwargs: Any) -> RestPredictionDriftMonitoringSignal: + default_data_window_size = kwargs.get("default_data_window_size") + ref_data_window_size = kwargs.get("ref_data_window_size") + if self.production_data is not None and self.production_data.data_window is None: + self.production_data.data_window = BaselineDataRange(lookback_window_size=default_data_window_size) + return RestPredictionDriftMonitoringSignal( + production_data=( + self.production_data._to_rest_object(default_data_window_size=default_data_window_size) + if self.production_data is not None + else None + ), + reference_data=( + self.reference_data._to_rest_object( + default_data_window=default_data_window_size, ref_data_window_size=ref_data_window_size + ) + if self.reference_data is not None + else None + ), + metric_thresholds=( + self.metric_thresholds._to_rest_object() + if isinstance(self.metric_thresholds, MetricThreshold) + else None + ), + properties=self.properties, + mode=MonitoringNotificationMode.ENABLED if self.alert_enabled else MonitoringNotificationMode.DISABLED, + model_type="classification", + ) + + @classmethod + def _from_rest_object(cls, obj: RestPredictionDriftMonitoringSignal) -> "PredictionDriftSignal": + return cls( + production_data=ProductionData._from_rest_object(obj.production_data), + reference_data=ReferenceData._from_rest_object(obj.reference_data), + metric_thresholds=PredictionDriftMetricThreshold._from_rest_object(obj.metric_thresholds), + alert_enabled=( + False + if not obj.mode or (obj.mode and obj.mode == MonitoringNotificationMode.DISABLED) + else MonitoringNotificationMode.ENABLED + ), + properties=obj.properties, + ) + + @classmethod + def _get_default_prediction_drift_signal(cls) -> "PredictionDriftSignal": + return cls( + metric_thresholds=PredictionDriftMetricThreshold._get_default_thresholds(), + ) + + +class DataQualitySignal(DataSignal): + """Data quality signal + + :ivar type: The type of the signal. Set to "data_quality" for this class. + :vartype type: str + :param production_data: The data for which drift will be calculated + :paramtype production_data: ~azure.ai.ml.entities.ProductionData + :param reference_data: The data to calculate drift against + :paramtype reference_data: ~azure.ai.ml.entities.ReferenceData + :param metric_thresholds: Metrics to calculate and their associated thresholds + :paramtype metric_thresholds: ~azure.ai.ml.entities.DataDriftMetricThreshold + :param alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + :keyword features: The feature filter identifying which feature(s) to calculate drift over. + :paramtype features: Union[List[str], ~azure.ai.ml.entities.MonitorFeatureFilter, Literal['all_features']] + :param feature_type_override: Dictionary of features and what they should be overridden to. + :paramtype feature_type_override: dict[str, str] + :param properties: Dictionary of additional properties. + :paramtype properties: dict[str, str] + """ + + def __init__( + self, + *, + production_data: Optional[ProductionData] = None, + reference_data: Optional[ReferenceData] = None, + features: Optional[Union[List[str], MonitorFeatureFilter, Literal["all_features"]]] = None, + feature_type_override: Optional[Dict[str, Union[str, MonitorFeatureDataType]]] = None, + metric_thresholds: Optional[Union[MetricThreshold, List[MetricThreshold]]] = None, + alert_enabled: bool = False, + properties: Optional[Dict[str, str]] = None, + ): + super().__init__( + production_data=production_data, + reference_data=reference_data, + metric_thresholds=metric_thresholds, + features=features, + feature_type_override=feature_type_override, + alert_enabled=alert_enabled, + properties=properties, + ) + self.type = MonitorSignalType.DATA_QUALITY + + def _to_rest_object(self, **kwargs: Any) -> RestMonitoringDataQualitySignal: + default_data_window_size = kwargs.get("default_data_window_size") + ref_data_window_size = kwargs.get("ref_data_window_size") + if self.production_data is not None and self.production_data.data_window is None: + self.production_data.data_window = BaselineDataRange( + lookback_window_size=default_data_window_size, + ) + rest_features = _to_rest_features(self.features) if self.features else None + rest_metrics = ( + # TODO: Bug Item number: 2883365 + _to_rest_data_quality_metrics( + self.metric_thresholds.numerical, self.metric_thresholds.categorical # type: ignore + ) + if isinstance(self.metric_thresholds, MetricThreshold) + else None + ) + return RestMonitoringDataQualitySignal( + production_data=( + self.production_data._to_rest_object(default_data_window_size=default_data_window_size) + if self.production_data is not None + else None + ), + reference_data=( + self.reference_data._to_rest_object( + default_data_window=default_data_window_size, ref_data_window_size=ref_data_window_size + ) + if self.reference_data is not None + else None + ), + features=rest_features, + feature_data_type_override=self.feature_type_override, + metric_thresholds=rest_metrics, + mode=MonitoringNotificationMode.ENABLED if self.alert_enabled else MonitoringNotificationMode.DISABLED, + properties=self.properties, + ) + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringDataQualitySignal) -> "DataQualitySignal": + return cls( + production_data=ProductionData._from_rest_object(obj.production_data), + reference_data=ReferenceData._from_rest_object(obj.reference_data), + features=_from_rest_features(obj.features), + feature_type_override=obj.feature_data_type_override, + metric_thresholds=DataQualityMetricThreshold._from_rest_object(obj.metric_thresholds), + alert_enabled=( + False + if not obj.mode or (obj.mode and obj.mode == MonitoringNotificationMode.DISABLED) + else MonitoringNotificationMode.ENABLED + ), + properties=obj.properties, + ) + + @classmethod + def _get_default_data_quality_signal( + cls, + ) -> "DataQualitySignal": + return cls( + features=ALL_FEATURES, # type: ignore[arg-type] + metric_thresholds=DataQualityMetricThreshold._get_default_thresholds(), + ) + + +@experimental +class FADProductionData(RestTranslatableMixin): + """Feature Attribution Production Data + + :keyword input_data: Input data used by the monitor. + :paramtype input_data: ~azure.ai.ml.Input + :keyword data_context: The context of the input dataset. Accepted values are "model_inputs", + "model_outputs", "training", "test", "validation", and "ground_truth". + :paramtype data_context: ~azure.ai.ml.constants._monitoring + :keyword data_column_names: The names of the columns in the input data. + :paramtype data_column_names: Dict[str, str] + :keyword pre_processing_component: The ARM (Azure Resource Manager) resource ID of the component resource used to + preprocess the data. + :paramtype pre_processing_component: string + :param data_window: The number of days or a time frame that a singal monitor looks back over the target. + :type data_window: BaselineDataRange + """ + + def __init__( + self, + *, + input_data: Input, + data_context: Optional[MonitorDatasetContext] = None, + data_column_names: Optional[Dict[str, str]] = None, + pre_processing_component: Optional[str] = None, + data_window: Optional[BaselineDataRange] = None, + ): + self.input_data = input_data + self.data_context = data_context + self.data_column_names = data_column_names + self.pre_processing_component = pre_processing_component + self.data_window = data_window + + def _to_rest_object(self, **kwargs: Any) -> RestMonitoringInputData: + default_data_window_size = kwargs.get("default") + if self.data_window is None: + self.data_window = BaselineDataRange( + lookback_window_size=default_data_window_size, lookback_window_offset="P0D" + ) + if self.data_window.lookback_window_size == "default": + self.data_window.lookback_window_size = default_data_window_size + uri = self.input_data.path + job_type = self.input_data.type + monitoring_input_data = TrailingInputData( + data_context=self.data_context, + target_columns=self.data_column_names, + job_type=job_type, + uri=uri, + pre_processing_component_id=self.pre_processing_component, + window_size=self.data_window.lookback_window_size, + window_offset=( + self.data_window.lookback_window_offset + if self.data_window.lookback_window_offset is not None + else "P0D" + ), + ) + return monitoring_input_data._to_rest_object() + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringInputData) -> "FADProductionData": + data_window = BaselineDataRange( + lookback_window_size=isodate.duration_isoformat(obj.window_size), + lookback_window_offset=isodate.duration_isoformat(obj.window_offset), + ) + return cls( + input_data=Input( + path=obj.uri, + type=obj.job_input_type, + ), + data_context=obj.data_context, + data_column_names=obj.columns, + pre_processing_component=obj.preprocessing_component_id, + data_window=data_window, + ) + + +@experimental +class FeatureAttributionDriftSignal(RestTranslatableMixin): + """Feature attribution drift signal + + :ivar type: The type of the signal. Set to "feature_attribution_drift" for this class. + :vartype type: str + :keyword production_data: The data for which drift will be calculated. + :paratype production_data: ~azure.ai.ml.entities.FADProductionData + :keyword reference_data: The data to calculate drift against. + :paramtype reference_data: ~azure.ai.ml.entities.ReferenceData + :keyword metric_thresholds: Metrics to calculate and their + associated thresholds. + :paramtype metric_thresholds: ~azure.ai.ml.entities.FeatureAttributionDriftMetricThreshold + :keyword alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + """ + + def __init__( + self, + *, + production_data: Optional[List[FADProductionData]] = None, + reference_data: ReferenceData, + metric_thresholds: FeatureAttributionDriftMetricThreshold, + alert_enabled: bool = False, + properties: Optional[Dict[str, str]] = None, + ): + self.production_data = production_data + self.reference_data = reference_data + self.metric_thresholds = metric_thresholds + self.alert_enabled = alert_enabled + self.properties = properties + self.type = MonitorSignalType.FEATURE_ATTRIBUTION_DRIFT + + def _to_rest_object(self, **kwargs: Any) -> RestFeatureAttributionDriftMonitoringSignal: + default_window_size = kwargs.get("default_data_window_size") + ref_data_window_size = kwargs.get("ref_data_window_size") + return RestFeatureAttributionDriftMonitoringSignal( + production_data=( + [data._to_rest_object(default=default_window_size) for data in self.production_data] + if self.production_data is not None + else None + ), + reference_data=self.reference_data._to_rest_object( + default_data_window=default_window_size, ref_data_window_size=ref_data_window_size + ), + metric_threshold=self.metric_thresholds._to_rest_object(), + mode=MonitoringNotificationMode.ENABLED if self.alert_enabled else MonitoringNotificationMode.DISABLED, + properties=self.properties, + ) + + @classmethod + def _from_rest_object(cls, obj: RestFeatureAttributionDriftMonitoringSignal) -> "FeatureAttributionDriftSignal": + return cls( + production_data=[FADProductionData._from_rest_object(data) for data in obj.production_data], + reference_data=ReferenceData._from_rest_object(obj.reference_data), + metric_thresholds=FeatureAttributionDriftMetricThreshold._from_rest_object(obj.metric_threshold), + alert_enabled=( + False + if not obj.mode or (obj.mode and obj.mode == MonitoringNotificationMode.DISABLED) + else MonitoringNotificationMode.ENABLED + ), + properties=obj.properties, + ) + + +@experimental +class ModelPerformanceSignal(RestTranslatableMixin): + """Model performance signal. + + :keyword baseline_dataset: The data to calculate performance against. + :paramtype baseline_dataset: ~azure.ai.ml.entities.MonitorInputData + :keyword metric_thresholds: A list of metrics to calculate and their + associated thresholds. + :paramtype metric_thresholds: ~azure.ai.ml.entities.ModelPerformanceMetricThreshold + :keyword model_type: The model type. + :paramtype model_type: ~azure.ai.ml.constants.MonitorModelType + :keyword data_segment: The data segment to calculate performance against. + :paramtype data_segment: ~azure.ai.ml.entities.DataSegment + :keyword alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + """ + + def __init__( + self, + *, + production_data: ProductionData, + reference_data: ReferenceData, + metric_thresholds: ModelPerformanceMetricThreshold, + data_segment: Optional[DataSegment] = None, + alert_enabled: bool = False, + properties: Optional[Dict[str, str]] = None, + ) -> None: + self.production_data = production_data + self.reference_data = reference_data + self.metric_thresholds = metric_thresholds + self.alert_enabled = alert_enabled + self.type = MonitorSignalType.MODEL_PERFORMANCE + self.data_segment = data_segment + self.properties = properties + + def _to_rest_object(self, **kwargs: Any) -> RestModelPerformanceSignal: + default_data_window_size = kwargs.get("default_data_window_size") + ref_data_window_size = kwargs.get("ref_data_window_size") + if self.properties is None: + self.properties = {} + self.properties["azureml.modelmonitor.model_performance_thresholds"] = self.metric_thresholds._to_str_object() + if self.production_data.data_window is None: + self.production_data.data_window = BaselineDataRange( + lookback_window_size=default_data_window_size, + ) + return RestModelPerformanceSignal( + production_data=[self.production_data._to_rest_object(default_data_window_size=default_data_window_size)], + reference_data=self.reference_data._to_rest_object( + default_data_window_size=default_data_window_size, ref_data_window_size=ref_data_window_size + ), + metric_threshold=self.metric_thresholds._to_rest_object(), + data_segment=self.data_segment._to_rest_object() if self.data_segment else None, + mode=MonitoringNotificationMode.ENABLED if self.alert_enabled else MonitoringNotificationMode.DISABLED, + properties=self.properties, + ) + + @classmethod + def _from_rest_object(cls, obj: RestModelPerformanceSignal) -> "ModelPerformanceSignal": + return cls( + production_data=ProductionData._from_rest_object(obj.production_data[0]), + reference_data=ReferenceData._from_rest_object(obj.reference_data), + metric_thresholds=ModelPerformanceMetricThreshold._from_rest_object(obj.metric_threshold), + data_segment=DataSegment._from_rest_object(obj.data_segment) if obj.data_segment else None, + alert_enabled=( + False + if not obj.mode or (obj.mode and obj.mode == MonitoringNotificationMode.DISABLED) + else MonitoringNotificationMode.ENABLED + ), + ) + + +@experimental +class Connection(RestTranslatableMixin): + """Monitoring Connection + + :param environment_variables: A dictionary of environment variables to set for the workspace. + :paramtype environment_variables: Optional[dict[str, str]] + :param secret_config: A dictionary of secrets to set for the workspace. + :paramtype secret_config: Optional[dict[str, str]] + """ + + def __init__( + self, + *, + environment_variables: Optional[Dict[str, str]] = None, + secret_config: Optional[Dict[str, str]] = None, + ): + self.environment_variables = environment_variables + self.secret_config = secret_config + + def _to_rest_object(self) -> RestMonitoringWorkspaceConnection: + return RestMonitoringWorkspaceConnection( + environment_variables=self.environment_variables, + secrets=self.secret_config, + ) + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringWorkspaceConnection) -> "Connection": + return cls( + environment_variables=obj.environment_variables, + secret_config=obj.secrets, + ) + + +@experimental +class CustomMonitoringSignal(RestTranslatableMixin): + """Custom monitoring signal. + + :ivar type: The type of the signal. Set to "custom" for this class. + :vartype type: str + :keyword input_data: A dictionary of input datasets for monitoring. + Each key is the component input port name, and its value is the data asset. + :paramtype input_data: Optional[dict[str, ~azure.ai.ml.entities.ReferenceData]] + :keyword metric_thresholds: A list of metrics to calculate and their + associated thresholds. + :paramtype metric_thresholds: List[~azure.ai.ml.entities.CustomMonitoringMetricThreshold] + :keyword inputs: + :paramtype inputs: Optional[dict[str, ~azure.ai.ml.entities.Input]] + :keyword component_id: The ARM (Azure Resource Manager) ID of the component resource used to + calculate the custom metrics. + :paramtype component_id: str + :keyword connection: Specify connection with environment variables and secret configs. + :paramtype connection: Optional[~azure.ai.ml.entities.WorkspaceConnection] + :keyword alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + :keyword properties: A dictionary of custom properties for the signal. + :paramtype properties: Optional[dict[str, str]] + """ + + def __init__( + self, + *, + inputs: Optional[Dict[str, Input]] = None, + metric_thresholds: List[CustomMonitoringMetricThreshold], + component_id: str, + connection: Optional[Connection] = None, + input_data: Optional[Dict[str, ReferenceData]] = None, + alert_enabled: bool = False, + properties: Optional[Dict[str, str]] = None, + ): + self.type = MonitorSignalType.CUSTOM + self.inputs = inputs + self.metric_thresholds = metric_thresholds + self.component_id = component_id + self.alert_enabled = alert_enabled + self.input_data = input_data + self.properties = properties + self.connection = connection + + def _to_rest_object(self, **kwargs: Any) -> RestCustomMonitoringSignal: # pylint:disable=unused-argument + if self.connection is None: + self.connection = Connection() + return RestCustomMonitoringSignal( + component_id=self.component_id, + metric_thresholds=[threshold._to_rest_object() for threshold in self.metric_thresholds], + inputs=to_rest_dataset_literal_inputs(self.inputs, job_type=None) if self.inputs else None, + input_assets=( + {asset_name: asset_value._to_rest_object() for asset_name, asset_value in self.input_data.items()} + if self.input_data + else None + ), + workspace_connection=self.connection._to_rest_object(), + mode=MonitoringNotificationMode.ENABLED if self.alert_enabled else MonitoringNotificationMode.DISABLED, + properties=self.properties, + ) + + @classmethod + def _from_rest_object(cls, obj: RestCustomMonitoringSignal) -> "CustomMonitoringSignal": + return cls( + inputs=from_rest_inputs_to_dataset_literal(obj.inputs) if obj.inputs else None, + input_data={key: ReferenceData._from_rest_object(data) for key, data in obj.input_assets.items()}, + metric_thresholds=[ + CustomMonitoringMetricThreshold._from_rest_object(metric) for metric in obj.metric_thresholds + ], + component_id=obj.component_id, + alert_enabled=( + False + if not obj.mode or (obj.mode and obj.mode == MonitoringNotificationMode.DISABLED) + else MonitoringNotificationMode.ENABLED + ), + properties=obj.properties, + connection=Connection._from_rest_object(obj.workspace_connection), + ) + + +@experimental +class LlmData(RestTranslatableMixin): + """LLM Request Response Data + + :param input_data: Input data used by the monitor. + :paramtype input_data: ~azure.ai.ml.entities.Input + :param data_column_names: The names of columns in the input data. + :paramtype data_column_names: Dict[str, str] + :param data_window: The number of days or a time frame that a singal monitor looks back over the target. + :type data_window_size: BaselineDataRange + """ + + def __init__( + self, + *, + input_data: Input, + data_column_names: Optional[Dict[str, str]] = None, + data_window: Optional[BaselineDataRange] = None, + ): + self.input_data = input_data + self.data_column_names = data_column_names + self.data_window = data_window + + def _to_rest_object(self, **kwargs: Any) -> RestMonitoringInputData: + if self.data_window is None: + self.data_window = BaselineDataRange( + lookback_window_size=kwargs.get("default"), + ) + return TrailingInputData( + target_columns=self.data_column_names, + job_type=self.input_data.type, + uri=self.input_data.path, + window_size=self.data_window.lookback_window_size, + window_offset=( + self.data_window.lookback_window_offset + if self.data_window.lookback_window_offset is not None + else "P0D" + ), + )._to_rest_object() + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringInputData) -> "LlmData": + data_window = BaselineDataRange( + lookback_window_size=isodate.duration_isoformat(obj.window_size), + lookback_window_offset=isodate.duration_isoformat(obj.window_offset), + ) + return cls( + input_data=Input( + path=obj.uri, + type=obj.job_input_type, + ), + data_column_names=obj.columns, + data_window=data_window, + ) + + +@experimental +class GenerationSafetyQualitySignal(RestTranslatableMixin): + """Generation Safety Quality monitoring signal. + + :ivar type: The type of the signal. Set to "generationsafetyquality" for this class. + :vartype type: str + :keyword production_data: A list of input datasets for monitoring. + :paramtype input_datasets: Optional[dict[str, ~azure.ai.ml.entities.LlmData]] + :keyword metric_thresholds: Metrics to calculate and their associated thresholds. + :paramtype metric_thresholds: ~azure.ai.ml.entities.GenerationSafetyQualityMonitoringMetricThreshold + :keyword alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + :keyword connection_id: Gets or sets the connection ID used to connect to the + content generation endpoint. + :paramtype connection_id: str + :keyword properties: The properties of the signal + :paramtype properties: Dict[str, str] + :keyword sampling_rate: The sample rate of the target data, should be greater + than 0 and at most 1. + :paramtype sampling_rate: float + """ + + def __init__( + self, + *, + production_data: Optional[List[LlmData]] = None, + connection_id: Optional[str] = None, + metric_thresholds: GenerationSafetyQualityMonitoringMetricThreshold, + alert_enabled: bool = False, + properties: Optional[Dict[str, str]] = None, + sampling_rate: Optional[float] = None, + ): + self.type = MonitorSignalType.GENERATION_SAFETY_QUALITY + self.production_data = production_data + self.connection_id = connection_id + self.metric_thresholds = metric_thresholds + self.alert_enabled = alert_enabled + self.properties = properties + self.sampling_rate = sampling_rate + + def _to_rest_object(self, **kwargs: Any) -> RestGenerationSafetyQualityMonitoringSignal: + data_window_size = kwargs.get("default_data_window_size") + return RestGenerationSafetyQualityMonitoringSignal( + production_data=( + [data._to_rest_object(default=data_window_size) for data in self.production_data] + if self.production_data is not None + else None + ), + workspace_connection_id=self.connection_id, + metric_thresholds=self.metric_thresholds._to_rest_object(), + mode=MonitoringNotificationMode.ENABLED if self.alert_enabled else MonitoringNotificationMode.DISABLED, + properties=self.properties, + sampling_rate=self.sampling_rate, + ) + + @classmethod + def _from_rest_object(cls, obj: RestGenerationSafetyQualityMonitoringSignal) -> "GenerationSafetyQualitySignal": + return cls( + production_data=[LlmData._from_rest_object(data) for data in obj.production_data], + connection_id=obj.workspace_connection_id, + metric_thresholds=GenerationSafetyQualityMonitoringMetricThreshold._from_rest_object(obj.metric_thresholds), + alert_enabled=( + False + if not obj.mode or (obj.mode and obj.mode == MonitoringNotificationMode.DISABLED) + else MonitoringNotificationMode.ENABLED + ), + properties=obj.properties, + sampling_rate=obj.sampling_rate, + ) + + +@experimental +class GenerationTokenStatisticsSignal(RestTranslatableMixin): + """Generation token statistics signal definition. + + :ivar type: The type of the signal. Set to "generationtokenstatisticssignal" for this class. + :vartype type: str + :keyword production_data: input dataset for monitoring. + :paramtype input_dataset: Optional[~azure.ai.ml.entities.LlmData] + :keyword metric_thresholds: Metrics to calculate and their associated thresholds. Defaults to App Traces + :paramtype metric_thresholds: Optional[~azure.ai.ml.entities.GenerationTokenStatisticsMonitorMetricThreshold] + :keyword alert_enabled: Whether or not to enable alerts for the signal. Defaults to False. + :paramtype alert_enabled: bool + :keyword properties: The properties of the signal + :paramtype properties: Optional[Dict[str, str]] + :keyword sampling_rate: The sample rate of the target data, should be greater + than 0 and at most 1. + :paramtype sampling_rate: float + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_genAI_monitors_configuration.py + :start-after: [START default_monitoring] + :end-before: [END default_monitoring] + :language: python + :dedent: 8 + :caption: Set Token Statistics Monitor. + """ + + def __init__( + self, + *, + production_data: Optional[LlmData] = None, + metric_thresholds: Optional[GenerationTokenStatisticsMonitorMetricThreshold] = None, + alert_enabled: bool = False, + properties: Optional[Dict[str, str]] = None, + sampling_rate: Optional[float] = None, + ): + self.type = MonitorSignalType.GENERATION_TOKEN_STATISTICS + self.production_data = production_data + self.metric_thresholds = metric_thresholds + self.alert_enabled = alert_enabled + self.properties = properties + self.sampling_rate = sampling_rate + + def _to_rest_object(self, **kwargs: Any) -> RestGenerationTokenStatisticsSignal: + data_window_size = kwargs.get("default_data_window_size") + return RestGenerationTokenStatisticsSignal( + production_data=( + self.production_data._to_rest_object(default=data_window_size) + if self.production_data is not None + else None + ), + metric_thresholds=( + self.metric_thresholds._to_rest_object() + if self.metric_thresholds + else GenerationTokenStatisticsMonitorMetricThreshold._get_default_thresholds()._to_rest_object() + ), + mode=MonitoringNotificationMode.ENABLED if self.alert_enabled else MonitoringNotificationMode.DISABLED, + properties=self.properties, + sampling_rate=self.sampling_rate if self.sampling_rate else 0.1, + ) + + @classmethod + def _from_rest_object(cls, obj: RestGenerationTokenStatisticsSignal) -> "GenerationTokenStatisticsSignal": + return cls( + production_data=LlmData._from_rest_object(obj.production_data), + metric_thresholds=GenerationTokenStatisticsMonitorMetricThreshold._from_rest_object(obj.metric_thresholds), + alert_enabled=( + False + if not obj.mode or (obj.mode and obj.mode == MonitoringNotificationMode.DISABLED) + else MonitoringNotificationMode.ENABLED + ), + properties=obj.properties, + sampling_rate=obj.sampling_rate, + ) + + @classmethod + def _get_default_token_statistics_signal(cls) -> "GenerationTokenStatisticsSignal": + return cls( + metric_thresholds=GenerationTokenStatisticsMonitorMetricThreshold._get_default_thresholds(), + sampling_rate=0.1, + ) + + +def _from_rest_features( + obj: RestMonitoringFeatureFilterBase, +) -> Optional[Union[List[str], MonitorFeatureFilter, Literal["all_features"]]]: + if isinstance(obj, RestTopNFeaturesByAttribution): + return MonitorFeatureFilter(top_n_feature_importance=obj.top) + if isinstance(obj, RestFeatureSubset): + _restFeatureSubset: List[str] = obj.features + return _restFeatureSubset + if isinstance(obj, RestAllFeatures): + _restAllFeatures: Literal["all_features"] = ALL_FEATURES # type: ignore[assignment] + return _restAllFeatures + + return None + + +def _to_rest_features( + features: Union[List[str], MonitorFeatureFilter, Literal["all_features"]] +) -> RestMonitoringFeatureFilterBase: + rest_features = None + if isinstance(features, list): + rest_features = RestFeatureSubset(features=features) + elif isinstance(features, MonitorFeatureFilter): + rest_features = features._to_rest_object() + elif isinstance(features, str) and features == ALL_FEATURES: + rest_features = RestAllFeatures() + return rest_features + + +def _to_rest_num_cat_metrics(numerical_metrics: Any, categorical_metrics: Any) -> List: + metrics = [] + if numerical_metrics is not None: + metrics.append(numerical_metrics._to_rest_object()) + + if categorical_metrics is not None: + metrics.append(categorical_metrics._to_rest_object()) + + return metrics + + +def _to_rest_data_quality_metrics(numerical_metrics: Any, categorical_metrics: Any) -> List: + metric_thresholds: List = [] + if numerical_metrics is not None: + metric_thresholds = metric_thresholds + numerical_metrics._to_rest_object() + + if categorical_metrics is not None: + metric_thresholds = metric_thresholds + categorical_metrics._to_rest_object() + + return metric_thresholds diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/target.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/target.py new file mode 100644 index 00000000..73a11895 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/target.py @@ -0,0 +1,55 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import Optional, Union + +from azure.ai.ml._restclient.v2023_06_01_preview.models import MonitoringTarget as RestMonitoringTarget +from azure.ai.ml.constants._monitoring import MonitorTargetTasks + + +class MonitoringTarget: + """Monitoring target. + + :keyword ml_task: Type of task. Allowed values: Classification, Regression, and QuestionAnswering + :paramtype ml_task: Optional[Union[str, MonitorTargetTasks]] + :keyword endpoint_deployment_id: The ARM ID of the target deployment. Mutually exclusive with model_id. + :paramtype endpoint_deployment_id: Optional[str] + :keyword model_id: ARM ID of the target model ID. Mutually exclusive with endpoint_deployment_id. + :paramtype model_id: Optional[str] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_spark_configurations.py + :start-after: [START spark_monitor_definition] + :end-before: [END spark_monitor_definition] + :language: python + :dedent: 8 + :caption: Setting a monitoring target using endpoint_deployment_id. + """ + + def __init__( + self, + *, + ml_task: Optional[Union[str, MonitorTargetTasks]] = None, + endpoint_deployment_id: Optional[str] = None, + model_id: Optional[str] = None, + ): + self.endpoint_deployment_id = endpoint_deployment_id + self.model_id = model_id + self.ml_task = ml_task + + def _to_rest_object(self) -> RestMonitoringTarget: + return RestMonitoringTarget( + task_type=self.ml_task if self.ml_task else "classification", + deployment_id=self.endpoint_deployment_id, + model_id=self.model_id, + ) + + @classmethod + def _from_rest_object(cls, obj: RestMonitoringTarget) -> "MonitoringTarget": + return cls( + ml_task=obj.task_type, + endpoint_deployment_id=obj.endpoint_deployment_id, + model_id=obj.model_id, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/thresholds.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/thresholds.py new file mode 100644 index 00000000..3e1c33b5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_monitoring/thresholds.py @@ -0,0 +1,954 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument, protected-access + +from typing import Any, Dict, List, Optional, Tuple + +from azure.ai.ml._restclient.v2023_06_01_preview.models import ( + CategoricalDataDriftMetricThreshold, + CategoricalDataQualityMetricThreshold, + CategoricalPredictionDriftMetricThreshold, + ClassificationModelPerformanceMetricThreshold, + CustomMetricThreshold, + DataDriftMetricThresholdBase, + DataQualityMetricThresholdBase, + FeatureAttributionMetricThreshold, + GenerationSafetyQualityMetricThreshold, + GenerationTokenStatisticsMetricThreshold, + ModelPerformanceMetricThresholdBase, + MonitoringThreshold, + NumericalDataDriftMetricThreshold, + NumericalDataQualityMetricThreshold, + NumericalPredictionDriftMetricThreshold, + PredictionDriftMetricThresholdBase, +) +from azure.ai.ml._utils._experimental import experimental +from azure.ai.ml._utils.utils import camel_to_snake, snake_to_camel +from azure.ai.ml.constants._monitoring import MonitorFeatureType, MonitorMetricName +from azure.ai.ml.entities._mixins import RestTranslatableMixin + + +class MetricThreshold(RestTranslatableMixin): + def __init__(self, *, threshold: Optional[float] = None): + self.data_type: Any = None + self.metric_name: Optional[str] = None + self.threshold = threshold + + +class NumericalDriftMetrics(RestTranslatableMixin): + """Numerical Drift Metrics + + :param jensen_shannon_distance: The Jensen-Shannon distance between the two distributions + :paramtype jensen_shannon_distance: float + :param normalized_wasserstein_distance: The normalized Wasserstein distance between the two distributions + :paramtype normalized_wasserstein_distance: float + :param population_stability_index: The population stability index between the two distributions + :paramtype population_stability_index: float + :param two_sample_kolmogorov_smirnov_test: The two sample Kolmogorov-Smirnov test between the two distributions + :paramtype two_sample_kolmogorov_smirnov_test: float + """ + + def __init__( + self, + *, + jensen_shannon_distance: Optional[float] = None, + normalized_wasserstein_distance: Optional[float] = None, + population_stability_index: Optional[float] = None, + two_sample_kolmogorov_smirnov_test: Optional[float] = None, + metric: Optional[str] = None, + metric_threshold: Optional[float] = None, + ): + self.jensen_shannon_distance = jensen_shannon_distance + self.normalized_wasserstein_distance = normalized_wasserstein_distance + self.population_stability_index = population_stability_index + self.two_sample_kolmogorov_smirnov_test = two_sample_kolmogorov_smirnov_test + self.metric = metric + self.metric_threshold = metric_threshold + + def _find_name_and_threshold(self) -> Tuple: + metric_name = None + threshold = None + if self.jensen_shannon_distance: + metric_name = MonitorMetricName.JENSEN_SHANNON_DISTANCE + threshold = MonitoringThreshold(value=self.jensen_shannon_distance) + elif self.normalized_wasserstein_distance: + metric_name = MonitorMetricName.NORMALIZED_WASSERSTEIN_DISTANCE + threshold = MonitoringThreshold(value=self.normalized_wasserstein_distance) + elif self.population_stability_index: + metric_name = MonitorMetricName.POPULATION_STABILITY_INDEX + threshold = MonitoringThreshold(value=self.population_stability_index) + elif self.two_sample_kolmogorov_smirnov_test: + metric_name = MonitorMetricName.TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST + threshold = MonitoringThreshold(value=self.two_sample_kolmogorov_smirnov_test) + + return metric_name, threshold + + @classmethod + # pylint: disable=arguments-differ + def _from_rest_object(cls, metric_name: str, threshold: Optional[float]) -> "NumericalDriftMetrics": # type: ignore + metric_name = camel_to_snake(metric_name) + if metric_name == MonitorMetricName.JENSEN_SHANNON_DISTANCE: + return cls(jensen_shannon_distance=threshold) + if metric_name == MonitorMetricName.NORMALIZED_WASSERSTEIN_DISTANCE: + return cls(normalized_wasserstein_distance=threshold) + if metric_name == MonitorMetricName.POPULATION_STABILITY_INDEX: + return cls(population_stability_index=threshold) + if metric_name == MonitorMetricName.TWO_SAMPLE_KOLMOGOROV_SMIRNOV_TEST: + return cls(two_sample_kolmogorov_smirnov_test=threshold) + return cls() + + @classmethod + def _get_default_thresholds(cls) -> "NumericalDriftMetrics": + return cls( + normalized_wasserstein_distance=0.1, + ) + + +class CategoricalDriftMetrics(RestTranslatableMixin): + """Categorical Drift Metrics + + :param jensen_shannon_distance: The Jensen-Shannon distance between the two distributions + :paramtype jensen_shannon_distance: float + :param population_stability_index: The population stability index between the two distributions + :paramtype population_stability_index: float + :param pearsons_chi_squared_test: The Pearson's Chi-Squared test between the two distributions + :paramtype pearsons_chi_squared_test: float + """ + + def __init__( + self, + *, + jensen_shannon_distance: Optional[float] = None, + population_stability_index: Optional[float] = None, + pearsons_chi_squared_test: Optional[float] = None, + ): + self.jensen_shannon_distance = jensen_shannon_distance + self.population_stability_index = population_stability_index + self.pearsons_chi_squared_test = pearsons_chi_squared_test + + def _find_name_and_threshold(self) -> Tuple: + metric_name = None + threshold = None + if self.jensen_shannon_distance: + metric_name = MonitorMetricName.JENSEN_SHANNON_DISTANCE + threshold = MonitoringThreshold(value=self.jensen_shannon_distance) + if self.population_stability_index and threshold is None: + metric_name = MonitorMetricName.POPULATION_STABILITY_INDEX + threshold = MonitoringThreshold(value=self.population_stability_index) + if self.pearsons_chi_squared_test and threshold is None: + metric_name = MonitorMetricName.PEARSONS_CHI_SQUARED_TEST + threshold = MonitoringThreshold(value=self.pearsons_chi_squared_test) + + return metric_name, threshold + + @classmethod + # pylint: disable=arguments-differ + def _from_rest_object( # type: ignore + cls, metric_name: str, threshold: Optional[float] + ) -> "CategoricalDriftMetrics": + metric_name = camel_to_snake(metric_name) + if metric_name == MonitorMetricName.JENSEN_SHANNON_DISTANCE: + return cls(jensen_shannon_distance=threshold) + if metric_name == MonitorMetricName.POPULATION_STABILITY_INDEX: + return cls(population_stability_index=threshold) + if metric_name == MonitorMetricName.PEARSONS_CHI_SQUARED_TEST: + return cls(pearsons_chi_squared_test=threshold) + return cls() + + @classmethod + def _get_default_thresholds(cls) -> "CategoricalDriftMetrics": + return cls( + jensen_shannon_distance=0.1, + ) + + +class DataDriftMetricThreshold(MetricThreshold): + """Data drift metric threshold + + :param numerical: Numerical drift metrics + :paramtype numerical: ~azure.ai.ml.entities.NumericalDriftMetrics + :param categorical: Categorical drift metrics + :paramtype categorical: ~azure.ai.ml.entities.CategoricalDriftMetrics + """ + + def __init__( + self, + *, + data_type: Optional[MonitorFeatureType] = None, + threshold: Optional[float] = None, + metric: Optional[str] = None, + numerical: Optional[NumericalDriftMetrics] = None, + categorical: Optional[CategoricalDriftMetrics] = None, + ): + super().__init__(threshold=threshold) + self.data_type = data_type + self.metric = metric + self.numerical = numerical + self.categorical = categorical + + def _to_rest_object(self) -> DataDriftMetricThresholdBase: + thresholds = [] + if self.numerical: + num_metric_name, num_threshold = self.numerical._find_name_and_threshold() + thresholds.append( + NumericalDataDriftMetricThreshold( + metric=snake_to_camel(num_metric_name), + threshold=num_threshold, + ) + ) + if self.categorical: + cat_metric_name, cat_threshold = self.categorical._find_name_and_threshold() + thresholds.append( + CategoricalDataDriftMetricThreshold( + metric=snake_to_camel(cat_metric_name), + threshold=cat_threshold, + ) + ) + + return thresholds + + @classmethod + def _from_rest_object(cls, obj: DataDriftMetricThresholdBase) -> "DataDriftMetricThreshold": + num = None + cat = None + for threshold in obj: + if threshold.data_type == "Numerical": + num = NumericalDriftMetrics()._from_rest_object( # pylint: disable=protected-access + threshold.metric, threshold.threshold.value if threshold.threshold else None + ) + elif threshold.data_type == "Categorical": + cat = CategoricalDriftMetrics()._from_rest_object( # pylint: disable=protected-access + threshold.metric, threshold.threshold.value if threshold.threshold else None + ) + + return cls( + numerical=num, + categorical=cat, + ) + + @classmethod + def _get_default_thresholds(cls) -> "DataDriftMetricThreshold": + return cls( + numerical=NumericalDriftMetrics._get_default_thresholds(), + categorical=CategoricalDriftMetrics._get_default_thresholds(), + ) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, DataDriftMetricThreshold): + return NotImplemented + return self.numerical == other.numerical and self.categorical == other.categorical + + +class PredictionDriftMetricThreshold(MetricThreshold): + """Prediction drift metric threshold + + :param numerical: Numerical drift metrics + :paramtype numerical: ~azure.ai.ml.entities.NumericalDriftMetrics + :param categorical: Categorical drift metrics + :paramtype categorical: ~azure.ai.ml.entities.CategoricalDriftMetrics + """ + + def __init__( + self, + *, + data_type: Optional[MonitorFeatureType] = None, + threshold: Optional[float] = None, + numerical: Optional[NumericalDriftMetrics] = None, + categorical: Optional[CategoricalDriftMetrics] = None, + ): + super().__init__(threshold=threshold) + self.data_type = data_type + self.numerical = numerical + self.categorical = categorical + + def _to_rest_object(self) -> PredictionDriftMetricThresholdBase: + thresholds = [] + if self.numerical: + num_metric_name, num_threshold = self.numerical._find_name_and_threshold() + thresholds.append( + NumericalPredictionDriftMetricThreshold( + metric=snake_to_camel(num_metric_name), + threshold=num_threshold, + ) + ) + if self.categorical: + cat_metric_name, cat_threshold = self.categorical._find_name_and_threshold() + thresholds.append( + CategoricalPredictionDriftMetricThreshold( + metric=snake_to_camel(cat_metric_name), + threshold=cat_threshold, + ) + ) + + return thresholds + + @classmethod + def _from_rest_object(cls, obj: PredictionDriftMetricThresholdBase) -> "PredictionDriftMetricThreshold": + num = None + cat = None + for threshold in obj: + if threshold.data_type == "Numerical": + num = NumericalDriftMetrics()._from_rest_object( # pylint: disable=protected-access + threshold.metric, threshold.threshold.value if threshold.threshold else None + ) + elif threshold.data_type == "Categorical": + cat = CategoricalDriftMetrics()._from_rest_object( # pylint: disable=protected-access + threshold.metric, threshold.threshold.value if threshold.threshold else None + ) + + return cls( + numerical=num, + categorical=cat, + ) + + @classmethod + def _get_default_thresholds(cls) -> "PredictionDriftMetricThreshold": + return cls( + numerical=NumericalDriftMetrics._get_default_thresholds(), + categorical=CategoricalDriftMetrics._get_default_thresholds(), + ) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, PredictionDriftMetricThreshold): + return NotImplemented + return ( + self.data_type == other.data_type + and self.metric_name == other.metric_name + and self.threshold == other.threshold + ) + + +class DataQualityMetricsNumerical(RestTranslatableMixin): + """Data Quality Numerical Metrics + + :param null_value_rate: The null value rate + :paramtype null_value_rate: float + :param data_type_error_rate: The data type error rate + :paramtype data_type_error_rate: float + :param out_of_bounds_rate: The out of bounds rate + :paramtype out_of_bounds_rate: float + """ + + def __init__( + self, + *, + null_value_rate: Optional[float] = None, + data_type_error_rate: Optional[float] = None, + out_of_bounds_rate: Optional[float] = None, + ): + self.null_value_rate = null_value_rate + self.data_type_error_rate = data_type_error_rate + self.out_of_bounds_rate = out_of_bounds_rate + + def _to_rest_object(self) -> List[NumericalDataQualityMetricThreshold]: + metric_thresholds = [] + if self.null_value_rate is not None: + metric_name = MonitorMetricName.NULL_VALUE_RATE + threshold = MonitoringThreshold(value=self.null_value_rate) + metric_thresholds.append( + NumericalDataQualityMetricThreshold(metric=snake_to_camel(metric_name), threshold=threshold) + ) + if self.data_type_error_rate is not None: + metric_name = MonitorMetricName.DATA_TYPE_ERROR_RATE + threshold = MonitoringThreshold(value=self.data_type_error_rate) + metric_thresholds.append( + NumericalDataQualityMetricThreshold(metric=snake_to_camel(metric_name), threshold=threshold) + ) + if self.out_of_bounds_rate is not None: + metric_name = MonitorMetricName.OUT_OF_BOUND_RATE + threshold = MonitoringThreshold(value=self.out_of_bounds_rate) + metric_thresholds.append( + NumericalDataQualityMetricThreshold(metric=snake_to_camel(metric_name), threshold=threshold) + ) + + return metric_thresholds + + @classmethod + def _from_rest_object(cls, obj: List) -> "DataQualityMetricsNumerical": + null_value_rate_val = None + data_type_error_rate_val = None + out_of_bounds_rate_val = None + for thresholds in obj: + if thresholds.metric in ("NullValueRate" "nullValueRate"): + null_value_rate_val = thresholds.threshold.value + if thresholds.metric in ("DataTypeErrorRate", "dataTypeErrorRate"): + data_type_error_rate_val = thresholds.threshold.value + if thresholds.metric in ("OutOfBoundsRate", "outOfBoundsRate"): + out_of_bounds_rate_val = thresholds.threshold.value + return cls( + null_value_rate=null_value_rate_val, + data_type_error_rate=data_type_error_rate_val, + out_of_bounds_rate=out_of_bounds_rate_val, + ) + + @classmethod + def _get_default_thresholds(cls) -> "DataQualityMetricsNumerical": + return cls( + null_value_rate=0.0, + data_type_error_rate=0.0, + out_of_bounds_rate=0.0, + ) + + +class DataQualityMetricsCategorical(RestTranslatableMixin): + """Data Quality Categorical Metrics + + :param null_value_rate: The null value rate + :paramtype null_value_rate: float + :param data_type_error_rate: The data type error rate + :paramtype data_type_error_rate: float + :param out_of_bounds_rate: The out of bounds rate + :paramtype out_of_bounds_rate: float + """ + + def __init__( + self, + *, + null_value_rate: Optional[float] = None, + data_type_error_rate: Optional[float] = None, + out_of_bounds_rate: Optional[float] = None, + ): + self.null_value_rate = null_value_rate + self.data_type_error_rate = data_type_error_rate + self.out_of_bounds_rate = out_of_bounds_rate + + def _to_rest_object(self) -> List[CategoricalDataQualityMetricThreshold]: + metric_thresholds = [] + if self.null_value_rate is not None: + metric_name = MonitorMetricName.NULL_VALUE_RATE + threshold = MonitoringThreshold(value=self.null_value_rate) + metric_thresholds.append( + CategoricalDataQualityMetricThreshold(metric=snake_to_camel(metric_name), threshold=threshold) + ) + if self.data_type_error_rate is not None: + metric_name = MonitorMetricName.DATA_TYPE_ERROR_RATE + threshold = MonitoringThreshold(value=self.data_type_error_rate) + metric_thresholds.append( + CategoricalDataQualityMetricThreshold(metric=snake_to_camel(metric_name), threshold=threshold) + ) + if self.out_of_bounds_rate is not None: + metric_name = MonitorMetricName.OUT_OF_BOUND_RATE + threshold = MonitoringThreshold(value=self.out_of_bounds_rate) + metric_thresholds.append( + CategoricalDataQualityMetricThreshold(metric=snake_to_camel(metric_name), threshold=threshold) + ) + + return metric_thresholds + + @classmethod + def _from_rest_object(cls, obj: List) -> "DataQualityMetricsCategorical": + null_value_rate_val = None + data_type_error_rate_val = None + out_of_bounds_rate_val = None + for thresholds in obj: + if thresholds.metric in ("NullValueRate" "nullValueRate"): + null_value_rate_val = thresholds.threshold.value + if thresholds.metric in ("DataTypeErrorRate", "dataTypeErrorRate"): + data_type_error_rate_val = thresholds.threshold.value + if thresholds.metric in ("OutOfBoundsRate", "outOfBoundsRate"): + out_of_bounds_rate_val = thresholds.threshold.value + return cls( + null_value_rate=null_value_rate_val, + data_type_error_rate=data_type_error_rate_val, + out_of_bounds_rate=out_of_bounds_rate_val, + ) + + @classmethod + def _get_default_thresholds(cls) -> "DataQualityMetricsCategorical": + return cls( + null_value_rate=0.0, + data_type_error_rate=0.0, + out_of_bounds_rate=0.0, + ) + + +class DataQualityMetricThreshold(MetricThreshold): + """Data quality metric threshold + + :param numerical: Numerical data quality metrics + :paramtype numerical: ~azure.ai.ml.entities.DataQualityMetricsNumerical + :param categorical: Categorical data quality metrics + :paramtype categorical: ~azure.ai.ml.entities.DataQualityMetricsCategorical + """ + + def __init__( + self, + *, + data_type: Optional[MonitorFeatureType] = None, + threshold: Optional[float] = None, + metric_name: Optional[str] = None, + numerical: Optional[DataQualityMetricsNumerical] = None, + categorical: Optional[DataQualityMetricsCategorical] = None, + ): + super().__init__(threshold=threshold) + self.data_type = data_type + self.metric_name = metric_name + self.numerical = numerical + self.categorical = categorical + + def _to_rest_object(self) -> DataQualityMetricThresholdBase: + thresholds: list = [] + if self.numerical: + thresholds = thresholds + ( + DataQualityMetricsNumerical( # pylint: disable=protected-access + null_value_rate=self.numerical.null_value_rate, + data_type_error_rate=self.numerical.data_type_error_rate, + out_of_bounds_rate=self.numerical.out_of_bounds_rate, + )._to_rest_object() + ) + if self.categorical: + thresholds = ( + thresholds + + ( + DataQualityMetricsCategorical( # pylint: disable=protected-access + null_value_rate=self.numerical.null_value_rate, + data_type_error_rate=self.numerical.data_type_error_rate, + out_of_bounds_rate=self.numerical.out_of_bounds_rate, + )._to_rest_object() + ) + if self.numerical is not None + else thresholds + ) + return thresholds + + @classmethod + def _from_rest_object(cls, obj: DataQualityMetricThresholdBase) -> "DataQualityMetricThreshold": + num = [] + cat = [] + for threshold in obj: + if threshold.data_type == "Numerical": + num.append(threshold) + elif threshold.data_type == "Categorical": + cat.append(threshold) + + num_from_rest = DataQualityMetricsNumerical()._from_rest_object(num) # pylint: disable=protected-access + cat_from_rest = DataQualityMetricsCategorical()._from_rest_object(cat) # pylint: disable=protected-access + return cls( + numerical=num_from_rest, + categorical=cat_from_rest, + ) + + @classmethod + def _get_default_thresholds(cls) -> "DataQualityMetricThreshold": + return cls( + numerical=DataQualityMetricsNumerical()._get_default_thresholds(), # pylint: disable=protected-access + categorical=DataQualityMetricsCategorical()._get_default_thresholds(), # pylint: disable=protected-access + ) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, DataQualityMetricThreshold): + return NotImplemented + return ( + self.data_type == other.data_type + and self.metric_name == other.metric_name + and self.threshold == other.threshold + ) + + +@experimental +class FeatureAttributionDriftMetricThreshold(MetricThreshold): + """Feature attribution drift metric threshold + + :param normalized_discounted_cumulative_gain: The threshold value for metric. + :paramtype normalized_discounted_cumulative_gain: float + """ + + def __init__( + self, *, normalized_discounted_cumulative_gain: Optional[float] = None, threshold: Optional[float] = None + ): + super().__init__(threshold=threshold) + self.data_type = MonitorFeatureType.ALL_FEATURE_TYPES + self.metric_name = MonitorMetricName.NORMALIZED_DISCOUNTED_CUMULATIVE_GAIN + self.normalized_discounted_cumulative_gain = normalized_discounted_cumulative_gain + + def _to_rest_object(self) -> FeatureAttributionMetricThreshold: + return FeatureAttributionMetricThreshold( + metric=snake_to_camel(self.metric_name), + threshold=( + MonitoringThreshold(value=self.normalized_discounted_cumulative_gain) + if self.normalized_discounted_cumulative_gain + else None + ), + ) + + @classmethod + def _from_rest_object(cls, obj: FeatureAttributionMetricThreshold) -> "FeatureAttributionDriftMetricThreshold": + return cls(normalized_discounted_cumulative_gain=obj.threshold.value if obj.threshold else None) + + +@experimental +class ModelPerformanceClassificationThresholds(RestTranslatableMixin): + def __init__( + self, + *, + accuracy: Optional[float] = None, + precision: Optional[float] = None, + recall: Optional[float] = None, + ): + self.accuracy = accuracy + self.precision = precision + self.recall = recall + + def _to_str_object(self, **kwargs): + thresholds = [] + if self.accuracy: + thresholds.append( + '{"modelType":"classification","metric":"Accuracy","threshold":{"value":' + f"{self.accuracy}" + "}}" + ) + if self.precision: + thresholds.append( + '{"modelType":"classification","metric":"Precision","threshold":{"value":' + f"{self.precision}" + "}}" + ) + if self.recall: + thresholds.append( + '{"modelType":"classification","metric":"Recall","threshold":{"value":' + f"{self.recall}" + "}}" + ) + + if not thresholds: + return None + + return ", ".join(thresholds) + + @classmethod + def _from_rest_object(cls, obj) -> "ModelPerformanceClassificationThresholds": + return cls( + accuracy=obj.threshold.value if obj.threshold else None, + ) + + +@experimental +class ModelPerformanceRegressionThresholds(RestTranslatableMixin): + def __init__( + self, + *, + mean_absolute_error: Optional[float] = None, + mean_squared_error: Optional[float] = None, + root_mean_squared_error: Optional[float] = None, + ): + self.mean_absolute_error = mean_absolute_error + self.mean_squared_error = mean_squared_error + self.root_mean_squared_error = root_mean_squared_error + + def _to_str_object(self, **kwargs): + thresholds = [] + if self.mean_absolute_error: + thresholds.append( + '{"modelType":"regression","metric":"MeanAbsoluteError","threshold":{"value":' + + f"{self.mean_absolute_error}" + + "}}" + ) + if self.mean_squared_error: + thresholds.append( + '{"modelType":"regression","metric":"MeanSquaredError","threshold":{"value":' + + f"{self.mean_squared_error}" + + "}}" + ) + if self.root_mean_squared_error: + thresholds.append( + '{"modelType":"regression","metric":"RootMeanSquaredError","threshold":{"value":' + + f"{self.root_mean_squared_error}" + + "}}" + ) + + if not thresholds: + return None + + return ", ".join(thresholds) + + +@experimental +class ModelPerformanceMetricThreshold(RestTranslatableMixin): + def __init__( + self, + *, + classification: Optional[ModelPerformanceClassificationThresholds] = None, + regression: Optional[ModelPerformanceRegressionThresholds] = None, + ): + self.classification = classification + self.regression = regression + + def _to_str_object(self, **kwargs): + thresholds = [] + if self.classification: + thresholds.append(self.classification._to_str_object(**kwargs)) + if self.regression: + thresholds.append(self.regression._to_str_object(**kwargs)) + + if not thresholds: + return None + if len(thresholds) == 2: + result = "[" + ", ".join(thresholds) + "]" + else: + result = "[" + thresholds[0] + "]" + return result + + def _to_rest_object(self, **kwargs) -> ModelPerformanceMetricThresholdBase: + threshold = MonitoringThreshold(value=0.9) + return ClassificationModelPerformanceMetricThreshold( + metric="Accuracy", + threshold=threshold, + ) + + @classmethod + def _from_rest_object(cls, obj: ModelPerformanceMetricThresholdBase) -> "ModelPerformanceMetricThreshold": + return cls( + classification=ModelPerformanceClassificationThresholds._from_rest_object(obj), + regression=None, + ) + + +@experimental +class CustomMonitoringMetricThreshold(MetricThreshold): + """Feature attribution drift metric threshold + + :param metric_name: The metric to calculate + :type metric_name: str + :param threshold: The threshold value. If None, a default value will be set + depending on the selected metric. + :type threshold: float + """ + + def __init__( + self, + *, + metric_name: Optional[str], + threshold: Optional[float] = None, + ): + super().__init__(threshold=threshold) + self.metric_name = metric_name + + def _to_rest_object(self) -> CustomMetricThreshold: + return CustomMetricThreshold( + metric=self.metric_name, + threshold=MonitoringThreshold(value=self.threshold) if self.threshold is not None else None, + ) + + @classmethod + def _from_rest_object(cls, obj: CustomMetricThreshold) -> "CustomMonitoringMetricThreshold": + return cls(metric_name=obj.metric, threshold=obj.threshold.value if obj.threshold else None) + + +@experimental +class GenerationSafetyQualityMonitoringMetricThreshold(RestTranslatableMixin): # pylint: disable=name-too-long + """Generation safety quality metric threshold + + :param groundedness: The groundedness metric threshold + :paramtype groundedness: Dict[str, float] + :param relevance: The relevance metric threshold + :paramtype relevance: Dict[str, float] + :param coherence: The coherence metric threshold + :paramtype coherence: Dict[str, float] + :param fluency: The fluency metric threshold + :paramtype fluency: Dict[str, float] + :param similarity: The similarity metric threshold + :paramtype similarity: Dict[str, float] + """ + + def __init__( + self, + *, + groundedness: Optional[Dict[str, float]] = None, + relevance: Optional[Dict[str, float]] = None, + coherence: Optional[Dict[str, float]] = None, + fluency: Optional[Dict[str, float]] = None, + similarity: Optional[Dict[str, float]] = None, + ): + self.groundedness = groundedness + self.relevance = relevance + self.coherence = coherence + self.fluency = fluency + self.similarity = similarity + + def _to_rest_object(self) -> GenerationSafetyQualityMetricThreshold: + metric_thresholds = [] + if self.groundedness: + if "acceptable_groundedness_score_per_instance" in self.groundedness: + acceptable_threshold = MonitoringThreshold( + value=self.groundedness["acceptable_groundedness_score_per_instance"] + ) + else: + acceptable_threshold = MonitoringThreshold(value=3) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AcceptableGroundednessScorePerInstance", threshold=acceptable_threshold + ) + ) + aggregated_threshold = MonitoringThreshold(value=self.groundedness["aggregated_groundedness_pass_rate"]) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AggregatedGroundednessPassRate", threshold=aggregated_threshold + ) + ) + if self.relevance: + if "acceptable_relevance_score_per_instance" in self.relevance: + acceptable_threshold = MonitoringThreshold( + value=self.relevance["acceptable_relevance_score_per_instance"] + ) + else: + acceptable_threshold = MonitoringThreshold(value=3) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AcceptableRelevanceScorePerInstance", threshold=acceptable_threshold + ) + ) + aggregated_threshold = MonitoringThreshold(value=self.relevance["aggregated_relevance_pass_rate"]) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AggregatedRelevancePassRate", threshold=aggregated_threshold + ) + ) + if self.coherence: + if "acceptable_coherence_score_per_instance" in self.coherence: + acceptable_threshold = MonitoringThreshold( + value=self.coherence["acceptable_coherence_score_per_instance"] + ) + else: + acceptable_threshold = MonitoringThreshold(value=3) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AcceptableCoherenceScorePerInstance", threshold=acceptable_threshold + ) + ) + aggregated_threshold = MonitoringThreshold(value=self.coherence["aggregated_coherence_pass_rate"]) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AggregatedCoherencePassRate", threshold=aggregated_threshold + ) + ) + if self.fluency: + if "acceptable_fluency_score_per_instance" in self.fluency: + acceptable_threshold = MonitoringThreshold(value=self.fluency["acceptable_fluency_score_per_instance"]) + else: + acceptable_threshold = MonitoringThreshold(value=3) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AcceptableFluencyScorePerInstance", threshold=acceptable_threshold + ) + ) + aggregated_threshold = MonitoringThreshold(value=self.fluency["aggregated_fluency_pass_rate"]) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AggregatedFluencyPassRate", threshold=aggregated_threshold + ) + ) + if self.similarity: + if "acceptable_similarity_score_per_instance" in self.similarity: + acceptable_threshold = MonitoringThreshold( + value=self.similarity["acceptable_similarity_score_per_instance"] + ) + else: + acceptable_threshold = MonitoringThreshold(value=3) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AcceptableSimilarityScorePerInstance", threshold=acceptable_threshold + ) + ) + aggregated_threshold = MonitoringThreshold(value=self.similarity["aggregated_similarity_pass_rate"]) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="AggregatedSimilarityPassRate", threshold=aggregated_threshold + ) + ) + return metric_thresholds + + @classmethod + def _from_rest_object( + cls, obj: GenerationSafetyQualityMetricThreshold + ) -> "GenerationSafetyQualityMonitoringMetricThreshold": + groundedness = {} + relevance = {} + coherence = {} + fluency = {} + similarity = {} + + for threshold in obj: + if threshold.metric == "AcceptableGroundednessScorePerInstance": + groundedness["acceptable_groundedness_score_per_instance"] = threshold.threshold.value + if threshold.metric == "AcceptableRelevanceScorePerInstance": + relevance["acceptable_relevance_score_per_instance"] = threshold.threshold.value + if threshold.metric == "AcceptableCoherenceScorePerInstance": + coherence["acceptable_coherence_score_per_instance"] = threshold.threshold.value + if threshold.metric == "AcceptableFluencyScorePerInstance": + fluency["acceptable_fluency_score_per_instance"] = threshold.threshold.value + if threshold.metric == "AcceptableSimilarityScorePerInstance": + similarity["acceptable_similarity_score_per_instance"] = threshold.threshold.value + if threshold.metric == "AggregatedGroundednessPassRate": + groundedness["aggregated_groundedness_pass_rate"] = threshold.threshold.value + if threshold.metric == "AggregatedRelevancePassRate": + relevance["aggregated_relevance_pass_rate"] = threshold.threshold.value + if threshold.metric == "AggregatedCoherencePassRate": + coherence["aggregated_coherence_pass_rate"] = threshold.threshold.value + if threshold.metric == "AggregatedFluencyPassRate": + fluency["aggregated_fluency_pass_rate"] = threshold.threshold.value + if threshold.metric == "AggregatedSimilarityPassRate": + similarity["aggregated_similarity_pass_rate"] = threshold.threshold.value + + return cls( + groundedness=groundedness if groundedness else None, + relevance=relevance if relevance else None, + coherence=coherence if coherence else None, + fluency=fluency if fluency else None, + similarity=similarity if similarity else None, + ) + + +@experimental +class GenerationTokenStatisticsMonitorMetricThreshold(RestTranslatableMixin): # pylint: disable=name-too-long + """Generation token statistics metric threshold definition. + + All required parameters must be populated in order to send to Azure. + + :ivar metric: Required. [Required] Gets or sets the feature attribution metric to calculate. + Possible values include: "TotalTokenCount", "TotalTokenCountPerGroup". + :vartype metric: str or + ~azure.mgmt.machinelearningservices.models.GenerationTokenStatisticsMetric + :ivar threshold: Gets or sets the threshold value. + If null, a default value will be set depending on the selected metric. + :vartype threshold: ~azure.mgmt.machinelearningservices.models.MonitoringThreshold + """ + + def __init__( + self, + *, + totaltoken: Optional[Dict[str, float]] = None, + ): + self.totaltoken = totaltoken + + def _to_rest_object(self) -> GenerationSafetyQualityMetricThreshold: + metric_thresholds = [] + if self.totaltoken: + if "total_token_count" in self.totaltoken: + acceptable_threshold = MonitoringThreshold(value=self.totaltoken["total_token_count"]) + else: + acceptable_threshold = MonitoringThreshold(value=3) + metric_thresholds.append( + GenerationTokenStatisticsMetricThreshold(metric="TotalTokenCount", threshold=acceptable_threshold) + ) + acceptable_threshold_per_group = MonitoringThreshold(value=self.totaltoken["total_token_count_per_group"]) + metric_thresholds.append( + GenerationSafetyQualityMetricThreshold( + metric="TotalTokenCountPerGroup", threshold=acceptable_threshold_per_group + ) + ) + return metric_thresholds + + @classmethod + def _from_rest_object( + cls, obj: GenerationTokenStatisticsMetricThreshold + ) -> "GenerationTokenStatisticsMonitorMetricThreshold": + totaltoken = {} + for threshold in obj: + if threshold.metric == "TotalTokenCount": + totaltoken["total_token_count"] = threshold.threshold.value + if threshold.metric == "TotalTokenCountPerGroup": + totaltoken["total_token_count_per_group"] = threshold.threshold.value + + return cls( + totaltoken=totaltoken if totaltoken else None, + ) + + @classmethod + def _get_default_thresholds(cls) -> "GenerationTokenStatisticsMonitorMetricThreshold": + return cls(totaltoken={"total_token_count": 0, "total_token_count_per_group": 0}) |
