about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py513
1 files changed, 513 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py
new file mode 100644
index 00000000..93867a9e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py
@@ -0,0 +1,513 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+import logging
+import typing
+from os import PathLike
+from pathlib import Path
+from typing import IO, Any, AnyStr, Dict, List, Optional, Tuple, Union
+
+from typing_extensions import Literal
+
+from azure.ai.ml._restclient.v2023_06_01_preview.models import JobBase as RestJobBase
+from azure.ai.ml._restclient.v2023_06_01_preview.models import JobScheduleAction
+from azure.ai.ml._restclient.v2023_06_01_preview.models import PipelineJob as RestPipelineJob
+from azure.ai.ml._restclient.v2023_06_01_preview.models import Schedule as RestSchedule
+from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleActionType as RestScheduleActionType
+from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleProperties
+from azure.ai.ml._restclient.v2024_01_01_preview.models import TriggerRunSubmissionDto as RestTriggerRunSubmissionDto
+from azure.ai.ml._schema.schedule.schedule import JobScheduleSchema
+from azure.ai.ml._utils.utils import camel_to_snake, dump_yaml_to_file, is_private_preview_enabled
+from azure.ai.ml.constants import JobType
+from azure.ai.ml.constants._common import ARM_ID_PREFIX, BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, ScheduleType
+from azure.ai.ml.entities._job.command_job import CommandJob
+from azure.ai.ml.entities._job.job import Job
+from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
+from azure.ai.ml.entities._job.spark_job import SparkJob
+from azure.ai.ml.entities._mixins import RestTranslatableMixin, TelemetryMixin, YamlTranslatableMixin
+from azure.ai.ml.entities._resource import Resource
+from azure.ai.ml.entities._system_data import SystemData
+from azure.ai.ml.entities._util import load_from_dict
+from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin
+
+from ...exceptions import ErrorCategory, ErrorTarget, ScheduleException, ValidationException
+from .._builders import BaseNode
+from .trigger import CronTrigger, RecurrenceTrigger, TriggerBase
+
+module_logger = logging.getLogger(__name__)
+
+
+class Schedule(YamlTranslatableMixin, PathAwareSchemaValidatableMixin, Resource):
+    """Schedule object used to create and manage schedules.
+
+    This class should not be instantiated directly. Instead, please use the subclasses.
+
+    :keyword name: The name of the schedule.
+    :paramtype name: str
+    :keyword trigger: The schedule trigger configuration.
+    :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger]
+    :keyword display_name: The display name of the schedule.
+    :paramtype display_name: Optional[str]
+    :keyword description: The description of the schedule.
+    :paramtype description: Optional[str]
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+    :paramtype tags: Optional[dict]]
+    :keyword properties: A dictionary of properties to associate with the schedule.
+    :paramtype properties: Optional[dict[str, str]]
+    :keyword kwargs: Additional keyword arguments passed to the Resource constructor.
+    :paramtype kwargs: dict
+    """
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        trigger: Optional[Union[CronTrigger, RecurrenceTrigger]],
+        display_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        properties: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        is_enabled = kwargs.pop("is_enabled", None)
+        provisioning_state = kwargs.pop("provisioning_state", None)
+        super().__init__(name=name, description=description, tags=tags, properties=properties, **kwargs)
+        self.trigger = trigger
+        self.display_name = display_name
+        self._is_enabled: bool = is_enabled
+        self._provisioning_state: str = provisioning_state
+        self._type: Any = None
+
+    def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
+        """Dump the schedule content into a file in YAML format.
+
+        :param dest: The local path or file stream to write the YAML content to.
+            If dest is a file path, a new file will be created.
+            If dest is an open file, the file will be written to directly.
+        :type dest: Union[PathLike, str, IO[AnyStr]]
+        :raises FileExistsError: Raised if dest is a file path and the file already exists.
+        :raises IOError: Raised if dest is an open file and the file is not writable.
+        """
+        path = kwargs.pop("path", None)
+        yaml_serialized = self._to_dict()
+        dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)
+
+    @classmethod
+    def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
+        return ValidationException(
+            message=message,
+            no_personal_data_message=no_personal_data_message,
+            target=ErrorTarget.SCHEDULE,
+        )
+
+    @classmethod
+    def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple:
+        from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule
+        from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule
+
+        if "create_monitor" in data:
+            return MonitorSchedule, None
+        if "import_data" in data:
+            return ImportDataSchedule, None
+        return JobSchedule, None
+
+    @property
+    def create_job(self) -> Any:  # pylint: disable=useless-return
+        """The create_job entity associated with the schedule if exists."""
+        module_logger.warning("create_job is not a valid property of %s", str(type(self)))
+        # return None here just to be explicit
+        return None
+
+    @create_job.setter
+    def create_job(self, value: Any) -> None:  # pylint: disable=unused-argument
+        """Set the create_job entity associated with the schedule if exists.
+
+        :param value: The create_job entity associated with the schedule if exists.
+        :type value: Any
+        """
+        module_logger.warning("create_job is not a valid property of %s", str(type(self)))
+
+    @property
+    def is_enabled(self) -> bool:
+        """Specifies if the schedule is enabled or not.
+
+        :return: True if the schedule is enabled, False otherwise.
+        :rtype: bool
+        """
+        return self._is_enabled
+
+    @property
+    def provisioning_state(self) -> str:
+        """Returns the schedule's provisioning state. The possible values include
+        "Creating", "Updating", "Deleting", "Succeeded", "Failed", "Canceled".
+
+        :return: The schedule's provisioning state.
+        :rtype: str
+        """
+        return self._provisioning_state
+
+    @property
+    def type(self) -> Optional[str]:
+        """The schedule type. Accepted values are 'job' and 'monitor'.
+
+        :return: The schedule type.
+        :rtype: str
+        """
+        return self._type
+
+    def _to_dict(self) -> Dict:
+        res: dict = self._dump_for_validation()
+        return res
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestSchedule) -> "Schedule":
+        from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule
+        from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule
+
+        if obj.properties.action.action_type == RestScheduleActionType.CREATE_JOB:
+            return JobSchedule._from_rest_object(obj)
+        if obj.properties.action.action_type == RestScheduleActionType.CREATE_MONITOR:
+            res_monitor_schedule: Schedule = MonitorSchedule._from_rest_object(obj)
+            return res_monitor_schedule
+        if obj.properties.action.action_type == RestScheduleActionType.IMPORT_DATA:
+            res_data_schedule: Schedule = ImportDataSchedule._from_rest_object(obj)
+            return res_data_schedule
+        msg = f"Unsupported schedule type {obj.properties.action.action_type}"
+        raise ScheduleException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.SCHEDULE,
+            error_category=ErrorCategory.SYSTEM_ERROR,
+        )
+
+
+class JobSchedule(RestTranslatableMixin, Schedule, TelemetryMixin):
+    """Class for managing job schedules.
+
+    :keyword name: The name of the schedule.
+    :paramtype name: str
+    :keyword trigger: The trigger configuration for the schedule.
+    :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger]
+    :keyword create_job: The job definition or an existing job name.
+    :paramtype create_job: Union[~azure.ai.ml.entities.Job, str]
+    :keyword display_name: The display name of the schedule.
+    :paramtype display_name: Optional[str]
+    :keyword description: The description of the schedule.
+    :paramtype description: Optional[str]
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+    :paramtype tags: Optional[dict[str, str]]
+    :keyword properties: A dictionary of properties to associate with the schedule.
+    :paramtype properties: Optional[dict[str, str]]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START job_schedule_configuration]
+            :end-before: [END job_schedule_configuration]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a JobSchedule.
+    """
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        trigger: Optional[Union[CronTrigger, RecurrenceTrigger]],
+        create_job: Union[Job, str],
+        display_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        properties: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(
+            name=name,
+            trigger=trigger,
+            display_name=display_name,
+            description=description,
+            tags=tags,
+            properties=properties,
+            **kwargs,
+        )
+        self._create_job = create_job
+        self._type = ScheduleType.JOB
+
+    @property
+    def create_job(self) -> Union[Job, str]:
+        """Return the job associated with the schedule.
+
+        :return: The job definition or an existing job name.
+        :rtype: Union[~azure.ai.ml.entities.Job, str]
+        """
+        return self._create_job
+
+    @create_job.setter
+    def create_job(self, value: Union[Job, str]) -> None:
+        """Sets the job that will be run when the schedule is triggered.
+
+        :param value: The job definition or an existing job name.
+        :type value: Union[~azure.ai.ml.entities.Job, str]
+        """
+        self._create_job = value
+
+    @classmethod
+    def _load(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "JobSchedule":
+        data = data or {}
+        params_override = params_override or []
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+        return JobSchedule(
+            base_path=context[BASE_PATH_CONTEXT_KEY],
+            **load_from_dict(JobScheduleSchema, data, context, **kwargs),
+        )
+
+    @classmethod
+    def _load_from_rest_dict(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "JobSchedule":
+        """
+        Load job schedule from rest object dict.
+
+        This function is added because the user-faced schema is different from the rest one.
+
+        For example:
+
+        user yaml create_job is a file reference with updates(not a job definition):
+
+        .. code-block:: yaml
+
+            create_job:
+                job: ./job.yaml
+                inputs:
+                    input: 10
+
+        while what we get from rest will be a complete job definition:
+
+        .. code-block:: yaml
+
+            create_job:
+                name: xx
+                jobs:
+                    node1: ...
+                inputs:
+                    input: ..
+
+        :param data: The REST object to convert
+        :type data: Optional[Dict]
+        :param yaml_path: The yaml path
+        :type yaml_path: Optional[Union[PathLike str]]
+        :param params_override: A list of parameter overrides
+        :type params_override: Optional[list]
+        :return: The job schedule
+        :rtype: JobSchedule
+        """
+        data = data or {}
+        params_override = params_override or []
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+        create_job_key = "create_job"
+        if create_job_key not in data:
+            msg = "Job definition for schedule '{}' can not be None."
+            raise ScheduleException(
+                message=msg.format(data["name"]),
+                no_personal_data_message=msg.format("[name]"),
+                target=ErrorTarget.JOB,
+                error_category=ErrorCategory.SYSTEM_ERROR,
+            )
+        # Load the job definition separately
+        create_job_data = data.pop(create_job_key)
+        # Save the id for remote job reference before load job, as data dict will be changed
+        job_id = create_job_data.get("id")
+        if isinstance(job_id, str) and job_id.startswith(ARM_ID_PREFIX):
+            job_id = job_id[len(ARM_ID_PREFIX) :]
+        create_job = Job._load(
+            data=create_job_data,
+            **kwargs,
+        )
+        # Set id manually as it is a dump only field in schema
+        create_job._id = job_id
+        schedule = JobSchedule(
+            base_path=context[BASE_PATH_CONTEXT_KEY],
+            **load_from_dict(JobScheduleSchema, data, context, **kwargs),
+        )
+        schedule.create_job = create_job
+        return schedule
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> JobScheduleSchema:
+        return JobScheduleSchema(context=context)
+
+    def _customized_validate(self) -> MutableValidationResult:
+        """Validate the resource with customized logic.
+
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        if isinstance(self.create_job, PipelineJob):
+            return self.create_job._validate()
+        return self._create_empty_validation_result()
+
+    @classmethod
+    def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]:
+        """Get the fields that should be skipped in schema validation.
+
+        Override this method to add customized validation logic.
+
+        :return: The list of fields to skip in schema validation
+        :rtype: typing.List[str]
+        """
+        return ["create_job"]
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestSchedule) -> "JobSchedule":
+        properties = obj.properties
+        action: JobScheduleAction = properties.action
+        if action.job_definition is None:
+            msg = "Job definition for schedule '{}' can not be None."
+            raise ScheduleException(
+                message=msg.format(obj.name),
+                no_personal_data_message=msg.format("[name]"),
+                target=ErrorTarget.JOB,
+                error_category=ErrorCategory.SYSTEM_ERROR,
+            )
+        if camel_to_snake(action.job_definition.job_type) not in [JobType.PIPELINE, JobType.COMMAND, JobType.SPARK]:
+            msg = f"Unsupported job type {action.job_definition.job_type} for schedule '{{}}'."
+            raise ScheduleException(
+                message=msg.format(obj.name),
+                no_personal_data_message=msg.format("[name]"),
+                target=ErrorTarget.JOB,
+                # Classified as user_error as we may support other type afterwards.
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        # Wrap job definition with JobBase for Job._from_rest_object call.
+        create_job = RestJobBase(properties=action.job_definition)
+        # id is a readonly field so set it after init.
+        # TODO: Add this support after source job id move to JobBaseProperties
+        if hasattr(action.job_definition, "source_job_id"):
+            create_job.id = action.job_definition.source_job_id
+        create_job = Job._from_rest_object(create_job)
+        return cls(
+            trigger=TriggerBase._from_rest_object(properties.trigger),
+            create_job=create_job,
+            name=obj.name,
+            display_name=properties.display_name,
+            description=properties.description,
+            tags=properties.tags,
+            properties=properties.properties,
+            provisioning_state=properties.provisioning_state,
+            is_enabled=properties.is_enabled,
+            creation_context=SystemData._from_rest_object(obj.system_data),
+        )
+
+    def _to_rest_object(self) -> RestSchedule:
+        """Build current parameterized schedule instance to a schedule object before submission.
+
+        :return: Rest schedule.
+        :rtype: RestSchedule
+        """
+        if isinstance(self.create_job, BaseNode):
+            self.create_job = self.create_job._to_job()
+        private_enabled = is_private_preview_enabled()
+        if isinstance(self.create_job, PipelineJob):
+            job_definition = self.create_job._to_rest_object().properties
+            # Set the source job id, as it is used only for schedule scenario.
+            job_definition.source_job_id = self.create_job.id
+        elif private_enabled and isinstance(self.create_job, (CommandJob, SparkJob)):
+            job_definition = self.create_job._to_rest_object().properties
+            # TODO: Merge this branch with PipelineJob after source job id move to JobBaseProperties
+            # job_definition.source_job_id = self.create_job.id
+        elif isinstance(self.create_job, str):  # arm id reference
+            # TODO: Update this after source job id move to JobBaseProperties
+            # Rest pipeline job will hold a 'Default' as experiment_name,
+            # MFE will add default if None, so pass an empty string here.
+            job_definition = RestPipelineJob(source_job_id=self.create_job, experiment_name="")
+        else:
+            msg = "Unsupported job type '{}' in schedule {}."
+            raise ValidationException(
+                message=msg.format(type(self.create_job).__name__, self.name),
+                no_personal_data_message=msg.format("[type]", "[name]"),
+                target=ErrorTarget.SCHEDULE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        return RestSchedule(
+            properties=ScheduleProperties(
+                description=self.description,
+                properties=self.properties,
+                tags=self.tags,
+                action=JobScheduleAction(job_definition=job_definition),
+                display_name=self.display_name,
+                is_enabled=self._is_enabled,
+                trigger=self.trigger._to_rest_object() if self.trigger is not None else None,
+            )
+        )
+
+    def __str__(self) -> str:
+        try:
+            res_yaml: str = self._to_yaml()
+            return res_yaml
+        except BaseException:  # pylint: disable=W0718
+            res_jobSchedule: str = super(JobSchedule, self).__str__()
+            return res_jobSchedule
+
+    # pylint: disable-next=docstring-missing-param
+    def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict[Literal["trigger_type"], str]:
+        """Return the telemetry values of schedule.
+
+        :return: A dictionary with telemetry values
+        :rtype: Dict[Literal["trigger_type"], str]
+        """
+        return {"trigger_type": type(self.trigger).__name__}
+
+
+class ScheduleTriggerResult:
+    """Schedule trigger result returned by trigger an enabled schedule once.
+
+    This class shouldn't be instantiated directly. Instead, it is used as the return type of schedule trigger.
+
+    :ivar str job_name:
+    :ivar str schedule_action_type:
+    """
+
+    def __init__(self, **kwargs):
+        self.job_name = kwargs.get("job_name", None)
+        self.schedule_action_type = kwargs.get("schedule_action_type", None)
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestTriggerRunSubmissionDto) -> "ScheduleTriggerResult":
+        """Construct a ScheduleJob from a rest object.
+
+        :param obj: The rest object to construct from.
+        :type obj: ~azure.ai.ml._restclient.v2024_01_01_preview.models.TriggerRunSubmissionDto
+        :return: The constructed ScheduleJob.
+        :rtype: ScheduleTriggerResult
+        """
+        return cls(
+            schedule_action_type=obj.schedule_action_type,
+            job_name=obj.submission_id,
+        )
+
+    def _to_dict(self) -> dict:
+        """Convert the object to a dictionary.
+        :return: The dictionary representation of the object.
+        :rtype: dict
+        """
+        return {
+            "job_name": self.job_name,
+            "schedule_action_type": self.schedule_action_type,
+        }