diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule')
3 files changed, 808 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/__init__.py new file mode 100644 index 00000000..fdf8caba --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/__init__.py @@ -0,0 +1,5 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py new file mode 100644 index 00000000..93867a9e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py @@ -0,0 +1,513 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access +import logging +import typing +from os import PathLike +from pathlib import Path +from typing import IO, Any, AnyStr, Dict, List, Optional, Tuple, Union + +from typing_extensions import Literal + +from azure.ai.ml._restclient.v2023_06_01_preview.models import JobBase as RestJobBase +from azure.ai.ml._restclient.v2023_06_01_preview.models import JobScheduleAction +from azure.ai.ml._restclient.v2023_06_01_preview.models import PipelineJob as RestPipelineJob +from azure.ai.ml._restclient.v2023_06_01_preview.models import Schedule as RestSchedule +from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleActionType as RestScheduleActionType +from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleProperties +from azure.ai.ml._restclient.v2024_01_01_preview.models import TriggerRunSubmissionDto as RestTriggerRunSubmissionDto +from azure.ai.ml._schema.schedule.schedule import JobScheduleSchema +from azure.ai.ml._utils.utils import camel_to_snake, dump_yaml_to_file, is_private_preview_enabled +from azure.ai.ml.constants import JobType +from azure.ai.ml.constants._common import ARM_ID_PREFIX, BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, ScheduleType +from azure.ai.ml.entities._job.command_job import CommandJob +from azure.ai.ml.entities._job.job import Job +from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob +from azure.ai.ml.entities._job.spark_job import SparkJob +from azure.ai.ml.entities._mixins import RestTranslatableMixin, TelemetryMixin, YamlTranslatableMixin +from azure.ai.ml.entities._resource import Resource +from azure.ai.ml.entities._system_data import SystemData +from azure.ai.ml.entities._util import load_from_dict +from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin + +from ...exceptions import ErrorCategory, ErrorTarget, ScheduleException, ValidationException +from .._builders import BaseNode +from .trigger import CronTrigger, RecurrenceTrigger, TriggerBase + +module_logger = logging.getLogger(__name__) + + +class Schedule(YamlTranslatableMixin, PathAwareSchemaValidatableMixin, Resource): + """Schedule object used to create and manage schedules. + + This class should not be instantiated directly. Instead, please use the subclasses. + + :keyword name: The name of the schedule. + :paramtype name: str + :keyword trigger: The schedule trigger configuration. + :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger] + :keyword display_name: The display name of the schedule. + :paramtype display_name: Optional[str] + :keyword description: The description of the schedule. + :paramtype description: Optional[str] + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. + :paramtype tags: Optional[dict]] + :keyword properties: A dictionary of properties to associate with the schedule. + :paramtype properties: Optional[dict[str, str]] + :keyword kwargs: Additional keyword arguments passed to the Resource constructor. + :paramtype kwargs: dict + """ + + def __init__( + self, + *, + name: str, + trigger: Optional[Union[CronTrigger, RecurrenceTrigger]], + display_name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + properties: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + is_enabled = kwargs.pop("is_enabled", None) + provisioning_state = kwargs.pop("provisioning_state", None) + super().__init__(name=name, description=description, tags=tags, properties=properties, **kwargs) + self.trigger = trigger + self.display_name = display_name + self._is_enabled: bool = is_enabled + self._provisioning_state: str = provisioning_state + self._type: Any = None + + def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None: + """Dump the schedule content into a file in YAML format. + + :param dest: The local path or file stream to write the YAML content to. + If dest is a file path, a new file will be created. + If dest is an open file, the file will be written to directly. + :type dest: Union[PathLike, str, IO[AnyStr]] + :raises FileExistsError: Raised if dest is a file path and the file already exists. + :raises IOError: Raised if dest is an open file and the file is not writable. + """ + path = kwargs.pop("path", None) + yaml_serialized = self._to_dict() + dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs) + + @classmethod + def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException: + return ValidationException( + message=message, + no_personal_data_message=no_personal_data_message, + target=ErrorTarget.SCHEDULE, + ) + + @classmethod + def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple: + from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule + from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule + + if "create_monitor" in data: + return MonitorSchedule, None + if "import_data" in data: + return ImportDataSchedule, None + return JobSchedule, None + + @property + def create_job(self) -> Any: # pylint: disable=useless-return + """The create_job entity associated with the schedule if exists.""" + module_logger.warning("create_job is not a valid property of %s", str(type(self))) + # return None here just to be explicit + return None + + @create_job.setter + def create_job(self, value: Any) -> None: # pylint: disable=unused-argument + """Set the create_job entity associated with the schedule if exists. + + :param value: The create_job entity associated with the schedule if exists. + :type value: Any + """ + module_logger.warning("create_job is not a valid property of %s", str(type(self))) + + @property + def is_enabled(self) -> bool: + """Specifies if the schedule is enabled or not. + + :return: True if the schedule is enabled, False otherwise. + :rtype: bool + """ + return self._is_enabled + + @property + def provisioning_state(self) -> str: + """Returns the schedule's provisioning state. The possible values include + "Creating", "Updating", "Deleting", "Succeeded", "Failed", "Canceled". + + :return: The schedule's provisioning state. + :rtype: str + """ + return self._provisioning_state + + @property + def type(self) -> Optional[str]: + """The schedule type. Accepted values are 'job' and 'monitor'. + + :return: The schedule type. + :rtype: str + """ + return self._type + + def _to_dict(self) -> Dict: + res: dict = self._dump_for_validation() + return res + + @classmethod + def _from_rest_object(cls, obj: RestSchedule) -> "Schedule": + from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule + from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule + + if obj.properties.action.action_type == RestScheduleActionType.CREATE_JOB: + return JobSchedule._from_rest_object(obj) + if obj.properties.action.action_type == RestScheduleActionType.CREATE_MONITOR: + res_monitor_schedule: Schedule = MonitorSchedule._from_rest_object(obj) + return res_monitor_schedule + if obj.properties.action.action_type == RestScheduleActionType.IMPORT_DATA: + res_data_schedule: Schedule = ImportDataSchedule._from_rest_object(obj) + return res_data_schedule + msg = f"Unsupported schedule type {obj.properties.action.action_type}" + raise ScheduleException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SCHEDULE, + error_category=ErrorCategory.SYSTEM_ERROR, + ) + + +class JobSchedule(RestTranslatableMixin, Schedule, TelemetryMixin): + """Class for managing job schedules. + + :keyword name: The name of the schedule. + :paramtype name: str + :keyword trigger: The trigger configuration for the schedule. + :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger] + :keyword create_job: The job definition or an existing job name. + :paramtype create_job: Union[~azure.ai.ml.entities.Job, str] + :keyword display_name: The display name of the schedule. + :paramtype display_name: Optional[str] + :keyword description: The description of the schedule. + :paramtype description: Optional[str] + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. + :paramtype tags: Optional[dict[str, str]] + :keyword properties: A dictionary of properties to associate with the schedule. + :paramtype properties: Optional[dict[str, str]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_schedule_configuration] + :end-before: [END job_schedule_configuration] + :language: python + :dedent: 8 + :caption: Configuring a JobSchedule. + """ + + def __init__( + self, + *, + name: str, + trigger: Optional[Union[CronTrigger, RecurrenceTrigger]], + create_job: Union[Job, str], + display_name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + properties: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + super().__init__( + name=name, + trigger=trigger, + display_name=display_name, + description=description, + tags=tags, + properties=properties, + **kwargs, + ) + self._create_job = create_job + self._type = ScheduleType.JOB + + @property + def create_job(self) -> Union[Job, str]: + """Return the job associated with the schedule. + + :return: The job definition or an existing job name. + :rtype: Union[~azure.ai.ml.entities.Job, str] + """ + return self._create_job + + @create_job.setter + def create_job(self, value: Union[Job, str]) -> None: + """Sets the job that will be run when the schedule is triggered. + + :param value: The job definition or an existing job name. + :type value: Union[~azure.ai.ml.entities.Job, str] + """ + self._create_job = value + + @classmethod + def _load( + cls, + data: Optional[Dict] = None, + yaml_path: Optional[Union[PathLike, str]] = None, + params_override: Optional[list] = None, + **kwargs: Any, + ) -> "JobSchedule": + data = data or {} + params_override = params_override or [] + context = { + BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"), + PARAMS_OVERRIDE_KEY: params_override, + } + return JobSchedule( + base_path=context[BASE_PATH_CONTEXT_KEY], + **load_from_dict(JobScheduleSchema, data, context, **kwargs), + ) + + @classmethod + def _load_from_rest_dict( + cls, + data: Optional[Dict] = None, + yaml_path: Optional[Union[PathLike, str]] = None, + params_override: Optional[list] = None, + **kwargs: Any, + ) -> "JobSchedule": + """ + Load job schedule from rest object dict. + + This function is added because the user-faced schema is different from the rest one. + + For example: + + user yaml create_job is a file reference with updates(not a job definition): + + .. code-block:: yaml + + create_job: + job: ./job.yaml + inputs: + input: 10 + + while what we get from rest will be a complete job definition: + + .. code-block:: yaml + + create_job: + name: xx + jobs: + node1: ... + inputs: + input: .. + + :param data: The REST object to convert + :type data: Optional[Dict] + :param yaml_path: The yaml path + :type yaml_path: Optional[Union[PathLike str]] + :param params_override: A list of parameter overrides + :type params_override: Optional[list] + :return: The job schedule + :rtype: JobSchedule + """ + data = data or {} + params_override = params_override or [] + context = { + BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"), + PARAMS_OVERRIDE_KEY: params_override, + } + create_job_key = "create_job" + if create_job_key not in data: + msg = "Job definition for schedule '{}' can not be None." + raise ScheduleException( + message=msg.format(data["name"]), + no_personal_data_message=msg.format("[name]"), + target=ErrorTarget.JOB, + error_category=ErrorCategory.SYSTEM_ERROR, + ) + # Load the job definition separately + create_job_data = data.pop(create_job_key) + # Save the id for remote job reference before load job, as data dict will be changed + job_id = create_job_data.get("id") + if isinstance(job_id, str) and job_id.startswith(ARM_ID_PREFIX): + job_id = job_id[len(ARM_ID_PREFIX) :] + create_job = Job._load( + data=create_job_data, + **kwargs, + ) + # Set id manually as it is a dump only field in schema + create_job._id = job_id + schedule = JobSchedule( + base_path=context[BASE_PATH_CONTEXT_KEY], + **load_from_dict(JobScheduleSchema, data, context, **kwargs), + ) + schedule.create_job = create_job + return schedule + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> JobScheduleSchema: + return JobScheduleSchema(context=context) + + def _customized_validate(self) -> MutableValidationResult: + """Validate the resource with customized logic. + + :return: The validation result + :rtype: MutableValidationResult + """ + if isinstance(self.create_job, PipelineJob): + return self.create_job._validate() + return self._create_empty_validation_result() + + @classmethod + def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]: + """Get the fields that should be skipped in schema validation. + + Override this method to add customized validation logic. + + :return: The list of fields to skip in schema validation + :rtype: typing.List[str] + """ + return ["create_job"] + + @classmethod + def _from_rest_object(cls, obj: RestSchedule) -> "JobSchedule": + properties = obj.properties + action: JobScheduleAction = properties.action + if action.job_definition is None: + msg = "Job definition for schedule '{}' can not be None." + raise ScheduleException( + message=msg.format(obj.name), + no_personal_data_message=msg.format("[name]"), + target=ErrorTarget.JOB, + error_category=ErrorCategory.SYSTEM_ERROR, + ) + if camel_to_snake(action.job_definition.job_type) not in [JobType.PIPELINE, JobType.COMMAND, JobType.SPARK]: + msg = f"Unsupported job type {action.job_definition.job_type} for schedule '{{}}'." + raise ScheduleException( + message=msg.format(obj.name), + no_personal_data_message=msg.format("[name]"), + target=ErrorTarget.JOB, + # Classified as user_error as we may support other type afterwards. + error_category=ErrorCategory.USER_ERROR, + ) + # Wrap job definition with JobBase for Job._from_rest_object call. + create_job = RestJobBase(properties=action.job_definition) + # id is a readonly field so set it after init. + # TODO: Add this support after source job id move to JobBaseProperties + if hasattr(action.job_definition, "source_job_id"): + create_job.id = action.job_definition.source_job_id + create_job = Job._from_rest_object(create_job) + return cls( + trigger=TriggerBase._from_rest_object(properties.trigger), + create_job=create_job, + name=obj.name, + display_name=properties.display_name, + description=properties.description, + tags=properties.tags, + properties=properties.properties, + provisioning_state=properties.provisioning_state, + is_enabled=properties.is_enabled, + creation_context=SystemData._from_rest_object(obj.system_data), + ) + + def _to_rest_object(self) -> RestSchedule: + """Build current parameterized schedule instance to a schedule object before submission. + + :return: Rest schedule. + :rtype: RestSchedule + """ + if isinstance(self.create_job, BaseNode): + self.create_job = self.create_job._to_job() + private_enabled = is_private_preview_enabled() + if isinstance(self.create_job, PipelineJob): + job_definition = self.create_job._to_rest_object().properties + # Set the source job id, as it is used only for schedule scenario. + job_definition.source_job_id = self.create_job.id + elif private_enabled and isinstance(self.create_job, (CommandJob, SparkJob)): + job_definition = self.create_job._to_rest_object().properties + # TODO: Merge this branch with PipelineJob after source job id move to JobBaseProperties + # job_definition.source_job_id = self.create_job.id + elif isinstance(self.create_job, str): # arm id reference + # TODO: Update this after source job id move to JobBaseProperties + # Rest pipeline job will hold a 'Default' as experiment_name, + # MFE will add default if None, so pass an empty string here. + job_definition = RestPipelineJob(source_job_id=self.create_job, experiment_name="") + else: + msg = "Unsupported job type '{}' in schedule {}." + raise ValidationException( + message=msg.format(type(self.create_job).__name__, self.name), + no_personal_data_message=msg.format("[type]", "[name]"), + target=ErrorTarget.SCHEDULE, + error_category=ErrorCategory.USER_ERROR, + ) + return RestSchedule( + properties=ScheduleProperties( + description=self.description, + properties=self.properties, + tags=self.tags, + action=JobScheduleAction(job_definition=job_definition), + display_name=self.display_name, + is_enabled=self._is_enabled, + trigger=self.trigger._to_rest_object() if self.trigger is not None else None, + ) + ) + + def __str__(self) -> str: + try: + res_yaml: str = self._to_yaml() + return res_yaml + except BaseException: # pylint: disable=W0718 + res_jobSchedule: str = super(JobSchedule, self).__str__() + return res_jobSchedule + + # pylint: disable-next=docstring-missing-param + def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict[Literal["trigger_type"], str]: + """Return the telemetry values of schedule. + + :return: A dictionary with telemetry values + :rtype: Dict[Literal["trigger_type"], str] + """ + return {"trigger_type": type(self.trigger).__name__} + + +class ScheduleTriggerResult: + """Schedule trigger result returned by trigger an enabled schedule once. + + This class shouldn't be instantiated directly. Instead, it is used as the return type of schedule trigger. + + :ivar str job_name: + :ivar str schedule_action_type: + """ + + def __init__(self, **kwargs): + self.job_name = kwargs.get("job_name", None) + self.schedule_action_type = kwargs.get("schedule_action_type", None) + + @classmethod + def _from_rest_object(cls, obj: RestTriggerRunSubmissionDto) -> "ScheduleTriggerResult": + """Construct a ScheduleJob from a rest object. + + :param obj: The rest object to construct from. + :type obj: ~azure.ai.ml._restclient.v2024_01_01_preview.models.TriggerRunSubmissionDto + :return: The constructed ScheduleJob. + :rtype: ScheduleTriggerResult + """ + return cls( + schedule_action_type=obj.schedule_action_type, + job_name=obj.submission_id, + ) + + def _to_dict(self) -> dict: + """Convert the object to a dictionary. + :return: The dictionary representation of the object. + :rtype: dict + """ + return { + "job_name": self.job_name, + "schedule_action_type": self.schedule_action_type, + } diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/trigger.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/trigger.py new file mode 100644 index 00000000..855aac9e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/trigger.py @@ -0,0 +1,290 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access +import logging +from abc import ABC +from datetime import datetime +from typing import List, Optional, Union + +from azure.ai.ml._restclient.v2023_04_01_preview.models import CronTrigger as RestCronTrigger +from azure.ai.ml._restclient.v2023_04_01_preview.models import RecurrenceSchedule as RestRecurrencePattern +from azure.ai.ml._restclient.v2023_04_01_preview.models import RecurrenceTrigger as RestRecurrenceTrigger +from azure.ai.ml._restclient.v2023_04_01_preview.models import TriggerBase as RestTriggerBase +from azure.ai.ml._restclient.v2023_04_01_preview.models import TriggerType as RestTriggerType +from azure.ai.ml._utils.utils import camel_to_snake, snake_to_camel +from azure.ai.ml.constants import TimeZone +from azure.ai.ml.entities._mixins import RestTranslatableMixin + +module_logger = logging.getLogger(__name__) + + +class TriggerBase(RestTranslatableMixin, ABC): + """Base class of Trigger. + + This class should not be instantiated directly. Instead, use one of its subclasses. + + :keyword type: The type of trigger. + :paramtype type: str + :keyword start_time: Specifies the start time of the schedule in ISO 8601 format. + :paramtype start_time: Optional[Union[str, datetime]] + :keyword end_time: Specifies the end time of the schedule in ISO 8601 format. + Note that end_time is not supported for compute schedules. + :paramtype end_time: Optional[Union[str, datetime]] + :keyword time_zone: The time zone where the schedule will run. Defaults to UTC(+00:00). + Note that this applies to the start_time and end_time. + :paramtype time_zone: ~azure.ai.ml.constants.TimeZone + """ + + def __init__( + self, + *, + type: str, # pylint: disable=redefined-builtin + start_time: Optional[Union[str, datetime]] = None, + end_time: Optional[Union[str, datetime]] = None, + time_zone: Union[str, TimeZone] = TimeZone.UTC, + ) -> None: + super().__init__() + self.type = type + self.start_time = start_time + self.end_time = end_time + self.time_zone = time_zone + + @classmethod + def _from_rest_object(cls, obj: RestTriggerBase) -> Optional[Union["CronTrigger", "RecurrenceTrigger"]]: + if obj.trigger_type == RestTriggerType.RECURRENCE: + return RecurrenceTrigger._from_rest_object(obj) + if obj.trigger_type == RestTriggerType.CRON: + return CronTrigger._from_rest_object(obj) + + return None + + +class RecurrencePattern(RestTranslatableMixin): + """Recurrence pattern for a job schedule. + + :keyword hours: The number of hours for the recurrence schedule pattern. + :paramtype hours: Union[int, List[int]] + :keyword minutes: The number of minutes for the recurrence schedule pattern. + :paramtype minutes: Union[int, List[int]] + :keyword week_days: A list of days of the week for the recurrence schedule pattern. + Acceptable values include: "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday" + :type week_days: Optional[Union[str, List[str]]] + :keyword month_days: A list of days of the month for the recurrence schedule pattern. + :paramtype month_days: Optional[Union[int, List[int]]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_schedule_configuration] + :end-before: [END job_schedule_configuration] + :language: python + :dedent: 8 + :caption: Configuring a JobSchedule to use a RecurrencePattern. + """ + + def __init__( + self, + *, + hours: Union[int, List[int]], + minutes: Union[int, List[int]], + week_days: Optional[Union[str, List[str]]] = None, + month_days: Optional[Union[int, List[int]]] = None, + ) -> None: + self.hours = hours + self.minutes = minutes + self.week_days = week_days + self.month_days = month_days + + def _to_rest_object(self) -> RestRecurrencePattern: + return RestRecurrencePattern( + hours=[self.hours] if not isinstance(self.hours, list) else self.hours, + minutes=[self.minutes] if not isinstance(self.minutes, list) else self.minutes, + week_days=[self.week_days] if self.week_days and not isinstance(self.week_days, list) else self.week_days, + month_days=( + [self.month_days] if self.month_days and not isinstance(self.month_days, list) else self.month_days + ), + ) + + def _to_rest_compute_pattern_object(self) -> RestRecurrencePattern: + # This function is added because we can't make compute trigger to use same class + # with schedule from service side. + if self.month_days: + module_logger.warning("'month_days' is ignored for not supported on compute recurrence schedule.") + return RestRecurrencePattern( + hours=[self.hours] if not isinstance(self.hours, list) else self.hours, + minutes=[self.minutes] if not isinstance(self.minutes, list) else self.minutes, + week_days=[self.week_days] if self.week_days and not isinstance(self.week_days, list) else self.week_days, + ) + + @classmethod + def _from_rest_object(cls, obj: RestRecurrencePattern) -> "RecurrencePattern": + return cls( + hours=obj.hours, + minutes=obj.minutes, + week_days=obj.week_days, + month_days=obj.month_days if hasattr(obj, "month_days") else None, + ) + + +class CronTrigger(TriggerBase): + """Cron Trigger for a job schedule. + + :keyword expression: The cron expression of schedule, following NCronTab format. + :paramtype expression: str + :keyword start_time: The start time for the trigger. If using a datetime object, leave the tzinfo as None and use + the ``time_zone`` parameter to specify a time zone if needed. If using a string, use the format + YYYY-MM-DDThh:mm:ss. Defaults to running the first workload instantly and continuing future workloads + based on the schedule. If the start time is in the past, the first workload is run at the next calculated run + time. + :paramtype start_time: Optional[Union[str, datetime]] + :keyword end_time: The start time for the trigger. If using a datetime object, leave the tzinfo as None and use + the ``time_zone`` parameter to specify a time zone if needed. If using a string, use the format + YYYY-MM-DDThh:mm:ss. Note that end_time is not supported for compute schedules. + :paramtype end_time: Optional[Union[str, datetime]] + :keyword time_zone: The time zone where the schedule will run. Defaults to UTC(+00:00). + Note that this applies to the start_time and end_time. + :paramtype time_zone: Union[str, ~azure.ai.ml.constants.TimeZone] + :raises Exception: Raised if end_time is in the past. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START cron_trigger_configuration] + :end-before: [END cron_trigger_configuration] + :language: python + :dedent: 8 + :caption: Configuring a CronTrigger. + """ + + def __init__( + self, + *, + expression: str, + start_time: Optional[Union[str, datetime]] = None, + end_time: Optional[Union[str, datetime]] = None, + time_zone: Union[str, TimeZone] = TimeZone.UTC, + ) -> None: + super().__init__( + type=RestTriggerType.CRON, + start_time=start_time, + end_time=end_time, + time_zone=time_zone, + ) + self.expression = expression + + def _to_rest_object(self) -> RestCronTrigger: # v2022_12_01.models.CronTrigger + return RestCronTrigger( + trigger_type=self.type, + expression=self.expression, + start_time=self.start_time, + end_time=self.end_time, + time_zone=self.time_zone, + ) + + def _to_rest_compute_cron_object(self) -> RestCronTrigger: # v2022_12_01_preview.models.CronTrigger + # This function is added because we can't make compute trigger to use same class + # with schedule from service side. + if self.end_time: + module_logger.warning("'end_time' is ignored for not supported on compute schedule.") + return RestCronTrigger( + expression=self.expression, + start_time=self.start_time, + time_zone=self.time_zone, + ) + + @classmethod + def _from_rest_object(cls, obj: RestCronTrigger) -> "CronTrigger": + return cls( + expression=obj.expression, + start_time=obj.start_time, + end_time=obj.end_time, + time_zone=obj.time_zone, + ) + + +class RecurrenceTrigger(TriggerBase): + """Recurrence trigger for a job schedule. + + :keyword start_time: Specifies the start time of the schedule in ISO 8601 format. + :paramtype start_time: Optional[Union[str, datetime]] + :keyword end_time: Specifies the end time of the schedule in ISO 8601 format. + Note that end_time is not supported for compute schedules. + :paramtype end_time: Optional[Union[str, datetime]] + :keyword time_zone: The time zone where the schedule will run. Defaults to UTC(+00:00). + Note that this applies to the start_time and end_time. + :paramtype time_zone: Union[str, ~azure.ai.ml.constants.TimeZone] + :keyword frequency: Specifies the frequency that the schedule should be triggered with. + Possible values include: "minute", "hour", "day", "week", "month". + :type frequency: str + :keyword interval: Specifies the interval in conjunction with the frequency that the schedule should be triggered + with. + :paramtype interval: int + :keyword schedule: Specifies the recurrence pattern. + :paramtype schedule: Optional[~azure.ai.ml.entities.RecurrencePattern] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_schedule_configuration] + :end-before: [END job_schedule_configuration] + :language: python + :dedent: 8 + :caption: Configuring a JobSchedule to trigger recurrence every 4 weeks. + """ + + def __init__( + self, + *, + frequency: str, + interval: int, + schedule: Optional[RecurrencePattern] = None, + start_time: Optional[Union[str, datetime]] = None, + end_time: Optional[Union[str, datetime]] = None, + time_zone: Union[str, TimeZone] = TimeZone.UTC, + ) -> None: + super().__init__( + type=RestTriggerType.RECURRENCE, + start_time=start_time, + end_time=end_time, + time_zone=time_zone, + ) + # Create empty pattern as schedule is required in rest model + self.schedule = schedule if schedule else RecurrencePattern(hours=[], minutes=[]) + self.frequency = frequency + self.interval = interval + + def _to_rest_object(self) -> RestRecurrenceTrigger: # v2022_12_01.models.RecurrenceTrigger + return RestRecurrenceTrigger( + frequency=snake_to_camel(self.frequency), + interval=self.interval, + schedule=self.schedule._to_rest_object(), + start_time=self.start_time, + end_time=self.end_time, + time_zone=self.time_zone, + ) + + def _to_rest_compute_recurrence_object(self) -> RestRecurrenceTrigger: + # v2022_12_01_preview.models.RecurrenceTrigger + # This function is added because we can't make compute trigger to use same class + # with schedule from service side. + if self.end_time: + module_logger.warning("'end_time' is ignored for not supported on compute schedule.") + return RestRecurrenceTrigger( + frequency=snake_to_camel(self.frequency), + interval=self.interval, + schedule=self.schedule._to_rest_compute_pattern_object(), + start_time=self.start_time, + time_zone=self.time_zone, + ) + + @classmethod + def _from_rest_object(cls, obj: RestRecurrenceTrigger) -> "RecurrenceTrigger": + return cls( + frequency=camel_to_snake(obj.frequency), + interval=obj.interval, + schedule=RecurrencePattern._from_rest_object(obj.schedule) if obj.schedule else None, + start_time=obj.start_time, + end_time=obj.end_time, + time_zone=obj.time_zone, + ) |
