# --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- # pylint: disable=protected-access import logging import typing from os import PathLike from pathlib import Path from typing import IO, Any, AnyStr, Dict, List, Optional, Tuple, Union from typing_extensions import Literal from azure.ai.ml._restclient.v2023_06_01_preview.models import JobBase as RestJobBase from azure.ai.ml._restclient.v2023_06_01_preview.models import JobScheduleAction from azure.ai.ml._restclient.v2023_06_01_preview.models import PipelineJob as RestPipelineJob from azure.ai.ml._restclient.v2023_06_01_preview.models import Schedule as RestSchedule from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleActionType as RestScheduleActionType from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleProperties from azure.ai.ml._restclient.v2024_01_01_preview.models import TriggerRunSubmissionDto as RestTriggerRunSubmissionDto from azure.ai.ml._schema.schedule.schedule import JobScheduleSchema from azure.ai.ml._utils.utils import camel_to_snake, dump_yaml_to_file, is_private_preview_enabled from azure.ai.ml.constants import JobType from azure.ai.ml.constants._common import ARM_ID_PREFIX, BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, ScheduleType from azure.ai.ml.entities._job.command_job import CommandJob from azure.ai.ml.entities._job.job import Job from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob from azure.ai.ml.entities._job.spark_job import SparkJob from azure.ai.ml.entities._mixins import RestTranslatableMixin, TelemetryMixin, YamlTranslatableMixin from azure.ai.ml.entities._resource import Resource from azure.ai.ml.entities._system_data import SystemData from azure.ai.ml.entities._util import load_from_dict from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin from ...exceptions import ErrorCategory, ErrorTarget, ScheduleException, ValidationException from .._builders import BaseNode from .trigger import CronTrigger, RecurrenceTrigger, TriggerBase module_logger = logging.getLogger(__name__) class Schedule(YamlTranslatableMixin, PathAwareSchemaValidatableMixin, Resource): """Schedule object used to create and manage schedules. This class should not be instantiated directly. Instead, please use the subclasses. :keyword name: The name of the schedule. :paramtype name: str :keyword trigger: The schedule trigger configuration. :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger] :keyword display_name: The display name of the schedule. :paramtype display_name: Optional[str] :keyword description: The description of the schedule. :paramtype description: Optional[str] :keyword tags: Tag dictionary. Tags can be added, removed, and updated. :paramtype tags: Optional[dict]] :keyword properties: A dictionary of properties to associate with the schedule. :paramtype properties: Optional[dict[str, str]] :keyword kwargs: Additional keyword arguments passed to the Resource constructor. :paramtype kwargs: dict """ def __init__( self, *, name: str, trigger: Optional[Union[CronTrigger, RecurrenceTrigger]], display_name: Optional[str] = None, description: Optional[str] = None, tags: Optional[Dict] = None, properties: Optional[Dict] = None, **kwargs: Any, ) -> None: is_enabled = kwargs.pop("is_enabled", None) provisioning_state = kwargs.pop("provisioning_state", None) super().__init__(name=name, description=description, tags=tags, properties=properties, **kwargs) self.trigger = trigger self.display_name = display_name self._is_enabled: bool = is_enabled self._provisioning_state: str = provisioning_state self._type: Any = None def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None: """Dump the schedule content into a file in YAML format. :param dest: The local path or file stream to write the YAML content to. If dest is a file path, a new file will be created. If dest is an open file, the file will be written to directly. :type dest: Union[PathLike, str, IO[AnyStr]] :raises FileExistsError: Raised if dest is a file path and the file already exists. :raises IOError: Raised if dest is an open file and the file is not writable. """ path = kwargs.pop("path", None) yaml_serialized = self._to_dict() dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs) @classmethod def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException: return ValidationException( message=message, no_personal_data_message=no_personal_data_message, target=ErrorTarget.SCHEDULE, ) @classmethod def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple: from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule if "create_monitor" in data: return MonitorSchedule, None if "import_data" in data: return ImportDataSchedule, None return JobSchedule, None @property def create_job(self) -> Any: # pylint: disable=useless-return """The create_job entity associated with the schedule if exists.""" module_logger.warning("create_job is not a valid property of %s", str(type(self))) # return None here just to be explicit return None @create_job.setter def create_job(self, value: Any) -> None: # pylint: disable=unused-argument """Set the create_job entity associated with the schedule if exists. :param value: The create_job entity associated with the schedule if exists. :type value: Any """ module_logger.warning("create_job is not a valid property of %s", str(type(self))) @property def is_enabled(self) -> bool: """Specifies if the schedule is enabled or not. :return: True if the schedule is enabled, False otherwise. :rtype: bool """ return self._is_enabled @property def provisioning_state(self) -> str: """Returns the schedule's provisioning state. The possible values include "Creating", "Updating", "Deleting", "Succeeded", "Failed", "Canceled". :return: The schedule's provisioning state. :rtype: str """ return self._provisioning_state @property def type(self) -> Optional[str]: """The schedule type. Accepted values are 'job' and 'monitor'. :return: The schedule type. :rtype: str """ return self._type def _to_dict(self) -> Dict: res: dict = self._dump_for_validation() return res @classmethod def _from_rest_object(cls, obj: RestSchedule) -> "Schedule": from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule if obj.properties.action.action_type == RestScheduleActionType.CREATE_JOB: return JobSchedule._from_rest_object(obj) if obj.properties.action.action_type == RestScheduleActionType.CREATE_MONITOR: res_monitor_schedule: Schedule = MonitorSchedule._from_rest_object(obj) return res_monitor_schedule if obj.properties.action.action_type == RestScheduleActionType.IMPORT_DATA: res_data_schedule: Schedule = ImportDataSchedule._from_rest_object(obj) return res_data_schedule msg = f"Unsupported schedule type {obj.properties.action.action_type}" raise ScheduleException( message=msg, no_personal_data_message=msg, target=ErrorTarget.SCHEDULE, error_category=ErrorCategory.SYSTEM_ERROR, ) class JobSchedule(RestTranslatableMixin, Schedule, TelemetryMixin): """Class for managing job schedules. :keyword name: The name of the schedule. :paramtype name: str :keyword trigger: The trigger configuration for the schedule. :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger] :keyword create_job: The job definition or an existing job name. :paramtype create_job: Union[~azure.ai.ml.entities.Job, str] :keyword display_name: The display name of the schedule. :paramtype display_name: Optional[str] :keyword description: The description of the schedule. :paramtype description: Optional[str] :keyword tags: Tag dictionary. Tags can be added, removed, and updated. :paramtype tags: Optional[dict[str, str]] :keyword properties: A dictionary of properties to associate with the schedule. :paramtype properties: Optional[dict[str, str]] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_misc.py :start-after: [START job_schedule_configuration] :end-before: [END job_schedule_configuration] :language: python :dedent: 8 :caption: Configuring a JobSchedule. """ def __init__( self, *, name: str, trigger: Optional[Union[CronTrigger, RecurrenceTrigger]], create_job: Union[Job, str], display_name: Optional[str] = None, description: Optional[str] = None, tags: Optional[Dict] = None, properties: Optional[Dict] = None, **kwargs: Any, ) -> None: super().__init__( name=name, trigger=trigger, display_name=display_name, description=description, tags=tags, properties=properties, **kwargs, ) self._create_job = create_job self._type = ScheduleType.JOB @property def create_job(self) -> Union[Job, str]: """Return the job associated with the schedule. :return: The job definition or an existing job name. :rtype: Union[~azure.ai.ml.entities.Job, str] """ return self._create_job @create_job.setter def create_job(self, value: Union[Job, str]) -> None: """Sets the job that will be run when the schedule is triggered. :param value: The job definition or an existing job name. :type value: Union[~azure.ai.ml.entities.Job, str] """ self._create_job = value @classmethod def _load( cls, data: Optional[Dict] = None, yaml_path: Optional[Union[PathLike, str]] = None, params_override: Optional[list] = None, **kwargs: Any, ) -> "JobSchedule": data = data or {} params_override = params_override or [] context = { BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"), PARAMS_OVERRIDE_KEY: params_override, } return JobSchedule( base_path=context[BASE_PATH_CONTEXT_KEY], **load_from_dict(JobScheduleSchema, data, context, **kwargs), ) @classmethod def _load_from_rest_dict( cls, data: Optional[Dict] = None, yaml_path: Optional[Union[PathLike, str]] = None, params_override: Optional[list] = None, **kwargs: Any, ) -> "JobSchedule": """ Load job schedule from rest object dict. This function is added because the user-faced schema is different from the rest one. For example: user yaml create_job is a file reference with updates(not a job definition): .. code-block:: yaml create_job: job: ./job.yaml inputs: input: 10 while what we get from rest will be a complete job definition: .. code-block:: yaml create_job: name: xx jobs: node1: ... inputs: input: .. :param data: The REST object to convert :type data: Optional[Dict] :param yaml_path: The yaml path :type yaml_path: Optional[Union[PathLike str]] :param params_override: A list of parameter overrides :type params_override: Optional[list] :return: The job schedule :rtype: JobSchedule """ data = data or {} params_override = params_override or [] context = { BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"), PARAMS_OVERRIDE_KEY: params_override, } create_job_key = "create_job" if create_job_key not in data: msg = "Job definition for schedule '{}' can not be None." raise ScheduleException( message=msg.format(data["name"]), no_personal_data_message=msg.format("[name]"), target=ErrorTarget.JOB, error_category=ErrorCategory.SYSTEM_ERROR, ) # Load the job definition separately create_job_data = data.pop(create_job_key) # Save the id for remote job reference before load job, as data dict will be changed job_id = create_job_data.get("id") if isinstance(job_id, str) and job_id.startswith(ARM_ID_PREFIX): job_id = job_id[len(ARM_ID_PREFIX) :] create_job = Job._load( data=create_job_data, **kwargs, ) # Set id manually as it is a dump only field in schema create_job._id = job_id schedule = JobSchedule( base_path=context[BASE_PATH_CONTEXT_KEY], **load_from_dict(JobScheduleSchema, data, context, **kwargs), ) schedule.create_job = create_job return schedule @classmethod def _create_schema_for_validation(cls, context: Any) -> JobScheduleSchema: return JobScheduleSchema(context=context) def _customized_validate(self) -> MutableValidationResult: """Validate the resource with customized logic. :return: The validation result :rtype: MutableValidationResult """ if isinstance(self.create_job, PipelineJob): return self.create_job._validate() return self._create_empty_validation_result() @classmethod def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]: """Get the fields that should be skipped in schema validation. Override this method to add customized validation logic. :return: The list of fields to skip in schema validation :rtype: typing.List[str] """ return ["create_job"] @classmethod def _from_rest_object(cls, obj: RestSchedule) -> "JobSchedule": properties = obj.properties action: JobScheduleAction = properties.action if action.job_definition is None: msg = "Job definition for schedule '{}' can not be None." raise ScheduleException( message=msg.format(obj.name), no_personal_data_message=msg.format("[name]"), target=ErrorTarget.JOB, error_category=ErrorCategory.SYSTEM_ERROR, ) if camel_to_snake(action.job_definition.job_type) not in [JobType.PIPELINE, JobType.COMMAND, JobType.SPARK]: msg = f"Unsupported job type {action.job_definition.job_type} for schedule '{{}}'." raise ScheduleException( message=msg.format(obj.name), no_personal_data_message=msg.format("[name]"), target=ErrorTarget.JOB, # Classified as user_error as we may support other type afterwards. error_category=ErrorCategory.USER_ERROR, ) # Wrap job definition with JobBase for Job._from_rest_object call. create_job = RestJobBase(properties=action.job_definition) # id is a readonly field so set it after init. # TODO: Add this support after source job id move to JobBaseProperties if hasattr(action.job_definition, "source_job_id"): create_job.id = action.job_definition.source_job_id create_job = Job._from_rest_object(create_job) return cls( trigger=TriggerBase._from_rest_object(properties.trigger), create_job=create_job, name=obj.name, display_name=properties.display_name, description=properties.description, tags=properties.tags, properties=properties.properties, provisioning_state=properties.provisioning_state, is_enabled=properties.is_enabled, creation_context=SystemData._from_rest_object(obj.system_data), ) def _to_rest_object(self) -> RestSchedule: """Build current parameterized schedule instance to a schedule object before submission. :return: Rest schedule. :rtype: RestSchedule """ if isinstance(self.create_job, BaseNode): self.create_job = self.create_job._to_job() private_enabled = is_private_preview_enabled() if isinstance(self.create_job, PipelineJob): job_definition = self.create_job._to_rest_object().properties # Set the source job id, as it is used only for schedule scenario. job_definition.source_job_id = self.create_job.id elif private_enabled and isinstance(self.create_job, (CommandJob, SparkJob)): job_definition = self.create_job._to_rest_object().properties # TODO: Merge this branch with PipelineJob after source job id move to JobBaseProperties # job_definition.source_job_id = self.create_job.id elif isinstance(self.create_job, str): # arm id reference # TODO: Update this after source job id move to JobBaseProperties # Rest pipeline job will hold a 'Default' as experiment_name, # MFE will add default if None, so pass an empty string here. job_definition = RestPipelineJob(source_job_id=self.create_job, experiment_name="") else: msg = "Unsupported job type '{}' in schedule {}." raise ValidationException( message=msg.format(type(self.create_job).__name__, self.name), no_personal_data_message=msg.format("[type]", "[name]"), target=ErrorTarget.SCHEDULE, error_category=ErrorCategory.USER_ERROR, ) return RestSchedule( properties=ScheduleProperties( description=self.description, properties=self.properties, tags=self.tags, action=JobScheduleAction(job_definition=job_definition), display_name=self.display_name, is_enabled=self._is_enabled, trigger=self.trigger._to_rest_object() if self.trigger is not None else None, ) ) def __str__(self) -> str: try: res_yaml: str = self._to_yaml() return res_yaml except BaseException: # pylint: disable=W0718 res_jobSchedule: str = super(JobSchedule, self).__str__() return res_jobSchedule # pylint: disable-next=docstring-missing-param def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict[Literal["trigger_type"], str]: """Return the telemetry values of schedule. :return: A dictionary with telemetry values :rtype: Dict[Literal["trigger_type"], str] """ return {"trigger_type": type(self.trigger).__name__} class ScheduleTriggerResult: """Schedule trigger result returned by trigger an enabled schedule once. This class shouldn't be instantiated directly. Instead, it is used as the return type of schedule trigger. :ivar str job_name: :ivar str schedule_action_type: """ def __init__(self, **kwargs): self.job_name = kwargs.get("job_name", None) self.schedule_action_type = kwargs.get("schedule_action_type", None) @classmethod def _from_rest_object(cls, obj: RestTriggerRunSubmissionDto) -> "ScheduleTriggerResult": """Construct a ScheduleJob from a rest object. :param obj: The rest object to construct from. :type obj: ~azure.ai.ml._restclient.v2024_01_01_preview.models.TriggerRunSubmissionDto :return: The constructed ScheduleJob. :rtype: ScheduleTriggerResult """ return cls( schedule_action_type=obj.schedule_action_type, job_name=obj.submission_id, ) def _to_dict(self) -> dict: """Convert the object to a dictionary. :return: The dictionary representation of the object. :rtype: dict """ return { "job_name": self.job_name, "schedule_action_type": self.schedule_action_type, }