aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py513
1 files changed, 513 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py
new file mode 100644
index 00000000..93867a9e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py
@@ -0,0 +1,513 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+import logging
+import typing
+from os import PathLike
+from pathlib import Path
+from typing import IO, Any, AnyStr, Dict, List, Optional, Tuple, Union
+
+from typing_extensions import Literal
+
+from azure.ai.ml._restclient.v2023_06_01_preview.models import JobBase as RestJobBase
+from azure.ai.ml._restclient.v2023_06_01_preview.models import JobScheduleAction
+from azure.ai.ml._restclient.v2023_06_01_preview.models import PipelineJob as RestPipelineJob
+from azure.ai.ml._restclient.v2023_06_01_preview.models import Schedule as RestSchedule
+from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleActionType as RestScheduleActionType
+from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleProperties
+from azure.ai.ml._restclient.v2024_01_01_preview.models import TriggerRunSubmissionDto as RestTriggerRunSubmissionDto
+from azure.ai.ml._schema.schedule.schedule import JobScheduleSchema
+from azure.ai.ml._utils.utils import camel_to_snake, dump_yaml_to_file, is_private_preview_enabled
+from azure.ai.ml.constants import JobType
+from azure.ai.ml.constants._common import ARM_ID_PREFIX, BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, ScheduleType
+from azure.ai.ml.entities._job.command_job import CommandJob
+from azure.ai.ml.entities._job.job import Job
+from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
+from azure.ai.ml.entities._job.spark_job import SparkJob
+from azure.ai.ml.entities._mixins import RestTranslatableMixin, TelemetryMixin, YamlTranslatableMixin
+from azure.ai.ml.entities._resource import Resource
+from azure.ai.ml.entities._system_data import SystemData
+from azure.ai.ml.entities._util import load_from_dict
+from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin
+
+from ...exceptions import ErrorCategory, ErrorTarget, ScheduleException, ValidationException
+from .._builders import BaseNode
+from .trigger import CronTrigger, RecurrenceTrigger, TriggerBase
+
+module_logger = logging.getLogger(__name__)
+
+
+class Schedule(YamlTranslatableMixin, PathAwareSchemaValidatableMixin, Resource):
+ """Schedule object used to create and manage schedules.
+
+ This class should not be instantiated directly. Instead, please use the subclasses.
+
+ :keyword name: The name of the schedule.
+ :paramtype name: str
+ :keyword trigger: The schedule trigger configuration.
+ :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger]
+ :keyword display_name: The display name of the schedule.
+ :paramtype display_name: Optional[str]
+ :keyword description: The description of the schedule.
+ :paramtype description: Optional[str]
+ :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+ :paramtype tags: Optional[dict]]
+ :keyword properties: A dictionary of properties to associate with the schedule.
+ :paramtype properties: Optional[dict[str, str]]
+ :keyword kwargs: Additional keyword arguments passed to the Resource constructor.
+ :paramtype kwargs: dict
+ """
+
+ def __init__(
+ self,
+ *,
+ name: str,
+ trigger: Optional[Union[CronTrigger, RecurrenceTrigger]],
+ display_name: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict] = None,
+ properties: Optional[Dict] = None,
+ **kwargs: Any,
+ ) -> None:
+ is_enabled = kwargs.pop("is_enabled", None)
+ provisioning_state = kwargs.pop("provisioning_state", None)
+ super().__init__(name=name, description=description, tags=tags, properties=properties, **kwargs)
+ self.trigger = trigger
+ self.display_name = display_name
+ self._is_enabled: bool = is_enabled
+ self._provisioning_state: str = provisioning_state
+ self._type: Any = None
+
+ def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
+ """Dump the schedule content into a file in YAML format.
+
+ :param dest: The local path or file stream to write the YAML content to.
+ If dest is a file path, a new file will be created.
+ If dest is an open file, the file will be written to directly.
+ :type dest: Union[PathLike, str, IO[AnyStr]]
+ :raises FileExistsError: Raised if dest is a file path and the file already exists.
+ :raises IOError: Raised if dest is an open file and the file is not writable.
+ """
+ path = kwargs.pop("path", None)
+ yaml_serialized = self._to_dict()
+ dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)
+
+ @classmethod
+ def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
+ return ValidationException(
+ message=message,
+ no_personal_data_message=no_personal_data_message,
+ target=ErrorTarget.SCHEDULE,
+ )
+
+ @classmethod
+ def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple:
+ from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule
+ from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule
+
+ if "create_monitor" in data:
+ return MonitorSchedule, None
+ if "import_data" in data:
+ return ImportDataSchedule, None
+ return JobSchedule, None
+
+ @property
+ def create_job(self) -> Any: # pylint: disable=useless-return
+ """The create_job entity associated with the schedule if exists."""
+ module_logger.warning("create_job is not a valid property of %s", str(type(self)))
+ # return None here just to be explicit
+ return None
+
+ @create_job.setter
+ def create_job(self, value: Any) -> None: # pylint: disable=unused-argument
+ """Set the create_job entity associated with the schedule if exists.
+
+ :param value: The create_job entity associated with the schedule if exists.
+ :type value: Any
+ """
+ module_logger.warning("create_job is not a valid property of %s", str(type(self)))
+
+ @property
+ def is_enabled(self) -> bool:
+ """Specifies if the schedule is enabled or not.
+
+ :return: True if the schedule is enabled, False otherwise.
+ :rtype: bool
+ """
+ return self._is_enabled
+
+ @property
+ def provisioning_state(self) -> str:
+ """Returns the schedule's provisioning state. The possible values include
+ "Creating", "Updating", "Deleting", "Succeeded", "Failed", "Canceled".
+
+ :return: The schedule's provisioning state.
+ :rtype: str
+ """
+ return self._provisioning_state
+
+ @property
+ def type(self) -> Optional[str]:
+ """The schedule type. Accepted values are 'job' and 'monitor'.
+
+ :return: The schedule type.
+ :rtype: str
+ """
+ return self._type
+
+ def _to_dict(self) -> Dict:
+ res: dict = self._dump_for_validation()
+ return res
+
+ @classmethod
+ def _from_rest_object(cls, obj: RestSchedule) -> "Schedule":
+ from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule
+ from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule
+
+ if obj.properties.action.action_type == RestScheduleActionType.CREATE_JOB:
+ return JobSchedule._from_rest_object(obj)
+ if obj.properties.action.action_type == RestScheduleActionType.CREATE_MONITOR:
+ res_monitor_schedule: Schedule = MonitorSchedule._from_rest_object(obj)
+ return res_monitor_schedule
+ if obj.properties.action.action_type == RestScheduleActionType.IMPORT_DATA:
+ res_data_schedule: Schedule = ImportDataSchedule._from_rest_object(obj)
+ return res_data_schedule
+ msg = f"Unsupported schedule type {obj.properties.action.action_type}"
+ raise ScheduleException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.SCHEDULE,
+ error_category=ErrorCategory.SYSTEM_ERROR,
+ )
+
+
+class JobSchedule(RestTranslatableMixin, Schedule, TelemetryMixin):
+ """Class for managing job schedules.
+
+ :keyword name: The name of the schedule.
+ :paramtype name: str
+ :keyword trigger: The trigger configuration for the schedule.
+ :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger]
+ :keyword create_job: The job definition or an existing job name.
+ :paramtype create_job: Union[~azure.ai.ml.entities.Job, str]
+ :keyword display_name: The display name of the schedule.
+ :paramtype display_name: Optional[str]
+ :keyword description: The description of the schedule.
+ :paramtype description: Optional[str]
+ :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+ :paramtype tags: Optional[dict[str, str]]
+ :keyword properties: A dictionary of properties to associate with the schedule.
+ :paramtype properties: Optional[dict[str, str]]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_schedule_configuration]
+ :end-before: [END job_schedule_configuration]
+ :language: python
+ :dedent: 8
+ :caption: Configuring a JobSchedule.
+ """
+
+ def __init__(
+ self,
+ *,
+ name: str,
+ trigger: Optional[Union[CronTrigger, RecurrenceTrigger]],
+ create_job: Union[Job, str],
+ display_name: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict] = None,
+ properties: Optional[Dict] = None,
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(
+ name=name,
+ trigger=trigger,
+ display_name=display_name,
+ description=description,
+ tags=tags,
+ properties=properties,
+ **kwargs,
+ )
+ self._create_job = create_job
+ self._type = ScheduleType.JOB
+
+ @property
+ def create_job(self) -> Union[Job, str]:
+ """Return the job associated with the schedule.
+
+ :return: The job definition or an existing job name.
+ :rtype: Union[~azure.ai.ml.entities.Job, str]
+ """
+ return self._create_job
+
+ @create_job.setter
+ def create_job(self, value: Union[Job, str]) -> None:
+ """Sets the job that will be run when the schedule is triggered.
+
+ :param value: The job definition or an existing job name.
+ :type value: Union[~azure.ai.ml.entities.Job, str]
+ """
+ self._create_job = value
+
+ @classmethod
+ def _load(
+ cls,
+ data: Optional[Dict] = None,
+ yaml_path: Optional[Union[PathLike, str]] = None,
+ params_override: Optional[list] = None,
+ **kwargs: Any,
+ ) -> "JobSchedule":
+ data = data or {}
+ params_override = params_override or []
+ context = {
+ BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
+ PARAMS_OVERRIDE_KEY: params_override,
+ }
+ return JobSchedule(
+ base_path=context[BASE_PATH_CONTEXT_KEY],
+ **load_from_dict(JobScheduleSchema, data, context, **kwargs),
+ )
+
+ @classmethod
+ def _load_from_rest_dict(
+ cls,
+ data: Optional[Dict] = None,
+ yaml_path: Optional[Union[PathLike, str]] = None,
+ params_override: Optional[list] = None,
+ **kwargs: Any,
+ ) -> "JobSchedule":
+ """
+ Load job schedule from rest object dict.
+
+ This function is added because the user-faced schema is different from the rest one.
+
+ For example:
+
+ user yaml create_job is a file reference with updates(not a job definition):
+
+ .. code-block:: yaml
+
+ create_job:
+ job: ./job.yaml
+ inputs:
+ input: 10
+
+ while what we get from rest will be a complete job definition:
+
+ .. code-block:: yaml
+
+ create_job:
+ name: xx
+ jobs:
+ node1: ...
+ inputs:
+ input: ..
+
+ :param data: The REST object to convert
+ :type data: Optional[Dict]
+ :param yaml_path: The yaml path
+ :type yaml_path: Optional[Union[PathLike str]]
+ :param params_override: A list of parameter overrides
+ :type params_override: Optional[list]
+ :return: The job schedule
+ :rtype: JobSchedule
+ """
+ data = data or {}
+ params_override = params_override or []
+ context = {
+ BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
+ PARAMS_OVERRIDE_KEY: params_override,
+ }
+ create_job_key = "create_job"
+ if create_job_key not in data:
+ msg = "Job definition for schedule '{}' can not be None."
+ raise ScheduleException(
+ message=msg.format(data["name"]),
+ no_personal_data_message=msg.format("[name]"),
+ target=ErrorTarget.JOB,
+ error_category=ErrorCategory.SYSTEM_ERROR,
+ )
+ # Load the job definition separately
+ create_job_data = data.pop(create_job_key)
+ # Save the id for remote job reference before load job, as data dict will be changed
+ job_id = create_job_data.get("id")
+ if isinstance(job_id, str) and job_id.startswith(ARM_ID_PREFIX):
+ job_id = job_id[len(ARM_ID_PREFIX) :]
+ create_job = Job._load(
+ data=create_job_data,
+ **kwargs,
+ )
+ # Set id manually as it is a dump only field in schema
+ create_job._id = job_id
+ schedule = JobSchedule(
+ base_path=context[BASE_PATH_CONTEXT_KEY],
+ **load_from_dict(JobScheduleSchema, data, context, **kwargs),
+ )
+ schedule.create_job = create_job
+ return schedule
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> JobScheduleSchema:
+ return JobScheduleSchema(context=context)
+
+ def _customized_validate(self) -> MutableValidationResult:
+ """Validate the resource with customized logic.
+
+ :return: The validation result
+ :rtype: MutableValidationResult
+ """
+ if isinstance(self.create_job, PipelineJob):
+ return self.create_job._validate()
+ return self._create_empty_validation_result()
+
+ @classmethod
+ def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]:
+ """Get the fields that should be skipped in schema validation.
+
+ Override this method to add customized validation logic.
+
+ :return: The list of fields to skip in schema validation
+ :rtype: typing.List[str]
+ """
+ return ["create_job"]
+
+ @classmethod
+ def _from_rest_object(cls, obj: RestSchedule) -> "JobSchedule":
+ properties = obj.properties
+ action: JobScheduleAction = properties.action
+ if action.job_definition is None:
+ msg = "Job definition for schedule '{}' can not be None."
+ raise ScheduleException(
+ message=msg.format(obj.name),
+ no_personal_data_message=msg.format("[name]"),
+ target=ErrorTarget.JOB,
+ error_category=ErrorCategory.SYSTEM_ERROR,
+ )
+ if camel_to_snake(action.job_definition.job_type) not in [JobType.PIPELINE, JobType.COMMAND, JobType.SPARK]:
+ msg = f"Unsupported job type {action.job_definition.job_type} for schedule '{{}}'."
+ raise ScheduleException(
+ message=msg.format(obj.name),
+ no_personal_data_message=msg.format("[name]"),
+ target=ErrorTarget.JOB,
+ # Classified as user_error as we may support other type afterwards.
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ # Wrap job definition with JobBase for Job._from_rest_object call.
+ create_job = RestJobBase(properties=action.job_definition)
+ # id is a readonly field so set it after init.
+ # TODO: Add this support after source job id move to JobBaseProperties
+ if hasattr(action.job_definition, "source_job_id"):
+ create_job.id = action.job_definition.source_job_id
+ create_job = Job._from_rest_object(create_job)
+ return cls(
+ trigger=TriggerBase._from_rest_object(properties.trigger),
+ create_job=create_job,
+ name=obj.name,
+ display_name=properties.display_name,
+ description=properties.description,
+ tags=properties.tags,
+ properties=properties.properties,
+ provisioning_state=properties.provisioning_state,
+ is_enabled=properties.is_enabled,
+ creation_context=SystemData._from_rest_object(obj.system_data),
+ )
+
+ def _to_rest_object(self) -> RestSchedule:
+ """Build current parameterized schedule instance to a schedule object before submission.
+
+ :return: Rest schedule.
+ :rtype: RestSchedule
+ """
+ if isinstance(self.create_job, BaseNode):
+ self.create_job = self.create_job._to_job()
+ private_enabled = is_private_preview_enabled()
+ if isinstance(self.create_job, PipelineJob):
+ job_definition = self.create_job._to_rest_object().properties
+ # Set the source job id, as it is used only for schedule scenario.
+ job_definition.source_job_id = self.create_job.id
+ elif private_enabled and isinstance(self.create_job, (CommandJob, SparkJob)):
+ job_definition = self.create_job._to_rest_object().properties
+ # TODO: Merge this branch with PipelineJob after source job id move to JobBaseProperties
+ # job_definition.source_job_id = self.create_job.id
+ elif isinstance(self.create_job, str): # arm id reference
+ # TODO: Update this after source job id move to JobBaseProperties
+ # Rest pipeline job will hold a 'Default' as experiment_name,
+ # MFE will add default if None, so pass an empty string here.
+ job_definition = RestPipelineJob(source_job_id=self.create_job, experiment_name="")
+ else:
+ msg = "Unsupported job type '{}' in schedule {}."
+ raise ValidationException(
+ message=msg.format(type(self.create_job).__name__, self.name),
+ no_personal_data_message=msg.format("[type]", "[name]"),
+ target=ErrorTarget.SCHEDULE,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ return RestSchedule(
+ properties=ScheduleProperties(
+ description=self.description,
+ properties=self.properties,
+ tags=self.tags,
+ action=JobScheduleAction(job_definition=job_definition),
+ display_name=self.display_name,
+ is_enabled=self._is_enabled,
+ trigger=self.trigger._to_rest_object() if self.trigger is not None else None,
+ )
+ )
+
+ def __str__(self) -> str:
+ try:
+ res_yaml: str = self._to_yaml()
+ return res_yaml
+ except BaseException: # pylint: disable=W0718
+ res_jobSchedule: str = super(JobSchedule, self).__str__()
+ return res_jobSchedule
+
+ # pylint: disable-next=docstring-missing-param
+ def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict[Literal["trigger_type"], str]:
+ """Return the telemetry values of schedule.
+
+ :return: A dictionary with telemetry values
+ :rtype: Dict[Literal["trigger_type"], str]
+ """
+ return {"trigger_type": type(self.trigger).__name__}
+
+
+class ScheduleTriggerResult:
+ """Schedule trigger result returned by trigger an enabled schedule once.
+
+ This class shouldn't be instantiated directly. Instead, it is used as the return type of schedule trigger.
+
+ :ivar str job_name:
+ :ivar str schedule_action_type:
+ """
+
+ def __init__(self, **kwargs):
+ self.job_name = kwargs.get("job_name", None)
+ self.schedule_action_type = kwargs.get("schedule_action_type", None)
+
+ @classmethod
+ def _from_rest_object(cls, obj: RestTriggerRunSubmissionDto) -> "ScheduleTriggerResult":
+ """Construct a ScheduleJob from a rest object.
+
+ :param obj: The rest object to construct from.
+ :type obj: ~azure.ai.ml._restclient.v2024_01_01_preview.models.TriggerRunSubmissionDto
+ :return: The constructed ScheduleJob.
+ :rtype: ScheduleTriggerResult
+ """
+ return cls(
+ schedule_action_type=obj.schedule_action_type,
+ job_name=obj.submission_id,
+ )
+
+ def _to_dict(self) -> dict:
+ """Convert the object to a dictionary.
+ :return: The dictionary representation of the object.
+ :rtype: dict
+ """
+ return {
+ "job_name": self.job_name,
+ "schedule_action_type": self.schedule_action_type,
+ }