about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/__init__.py5
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py513
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/trigger.py290
3 files changed, 808 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/__init__.py
new file mode 100644
index 00000000..fdf8caba
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/__init__.py
@@ -0,0 +1,5 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py
new file mode 100644
index 00000000..93867a9e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/schedule.py
@@ -0,0 +1,513 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+import logging
+import typing
+from os import PathLike
+from pathlib import Path
+from typing import IO, Any, AnyStr, Dict, List, Optional, Tuple, Union
+
+from typing_extensions import Literal
+
+from azure.ai.ml._restclient.v2023_06_01_preview.models import JobBase as RestJobBase
+from azure.ai.ml._restclient.v2023_06_01_preview.models import JobScheduleAction
+from azure.ai.ml._restclient.v2023_06_01_preview.models import PipelineJob as RestPipelineJob
+from azure.ai.ml._restclient.v2023_06_01_preview.models import Schedule as RestSchedule
+from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleActionType as RestScheduleActionType
+from azure.ai.ml._restclient.v2023_06_01_preview.models import ScheduleProperties
+from azure.ai.ml._restclient.v2024_01_01_preview.models import TriggerRunSubmissionDto as RestTriggerRunSubmissionDto
+from azure.ai.ml._schema.schedule.schedule import JobScheduleSchema
+from azure.ai.ml._utils.utils import camel_to_snake, dump_yaml_to_file, is_private_preview_enabled
+from azure.ai.ml.constants import JobType
+from azure.ai.ml.constants._common import ARM_ID_PREFIX, BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, ScheduleType
+from azure.ai.ml.entities._job.command_job import CommandJob
+from azure.ai.ml.entities._job.job import Job
+from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
+from azure.ai.ml.entities._job.spark_job import SparkJob
+from azure.ai.ml.entities._mixins import RestTranslatableMixin, TelemetryMixin, YamlTranslatableMixin
+from azure.ai.ml.entities._resource import Resource
+from azure.ai.ml.entities._system_data import SystemData
+from azure.ai.ml.entities._util import load_from_dict
+from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin
+
+from ...exceptions import ErrorCategory, ErrorTarget, ScheduleException, ValidationException
+from .._builders import BaseNode
+from .trigger import CronTrigger, RecurrenceTrigger, TriggerBase
+
+module_logger = logging.getLogger(__name__)
+
+
+class Schedule(YamlTranslatableMixin, PathAwareSchemaValidatableMixin, Resource):
+    """Schedule object used to create and manage schedules.
+
+    This class should not be instantiated directly. Instead, please use the subclasses.
+
+    :keyword name: The name of the schedule.
+    :paramtype name: str
+    :keyword trigger: The schedule trigger configuration.
+    :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger]
+    :keyword display_name: The display name of the schedule.
+    :paramtype display_name: Optional[str]
+    :keyword description: The description of the schedule.
+    :paramtype description: Optional[str]
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+    :paramtype tags: Optional[dict]]
+    :keyword properties: A dictionary of properties to associate with the schedule.
+    :paramtype properties: Optional[dict[str, str]]
+    :keyword kwargs: Additional keyword arguments passed to the Resource constructor.
+    :paramtype kwargs: dict
+    """
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        trigger: Optional[Union[CronTrigger, RecurrenceTrigger]],
+        display_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        properties: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        is_enabled = kwargs.pop("is_enabled", None)
+        provisioning_state = kwargs.pop("provisioning_state", None)
+        super().__init__(name=name, description=description, tags=tags, properties=properties, **kwargs)
+        self.trigger = trigger
+        self.display_name = display_name
+        self._is_enabled: bool = is_enabled
+        self._provisioning_state: str = provisioning_state
+        self._type: Any = None
+
+    def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
+        """Dump the schedule content into a file in YAML format.
+
+        :param dest: The local path or file stream to write the YAML content to.
+            If dest is a file path, a new file will be created.
+            If dest is an open file, the file will be written to directly.
+        :type dest: Union[PathLike, str, IO[AnyStr]]
+        :raises FileExistsError: Raised if dest is a file path and the file already exists.
+        :raises IOError: Raised if dest is an open file and the file is not writable.
+        """
+        path = kwargs.pop("path", None)
+        yaml_serialized = self._to_dict()
+        dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)
+
+    @classmethod
+    def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
+        return ValidationException(
+            message=message,
+            no_personal_data_message=no_personal_data_message,
+            target=ErrorTarget.SCHEDULE,
+        )
+
+    @classmethod
+    def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple:
+        from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule
+        from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule
+
+        if "create_monitor" in data:
+            return MonitorSchedule, None
+        if "import_data" in data:
+            return ImportDataSchedule, None
+        return JobSchedule, None
+
+    @property
+    def create_job(self) -> Any:  # pylint: disable=useless-return
+        """The create_job entity associated with the schedule if exists."""
+        module_logger.warning("create_job is not a valid property of %s", str(type(self)))
+        # return None here just to be explicit
+        return None
+
+    @create_job.setter
+    def create_job(self, value: Any) -> None:  # pylint: disable=unused-argument
+        """Set the create_job entity associated with the schedule if exists.
+
+        :param value: The create_job entity associated with the schedule if exists.
+        :type value: Any
+        """
+        module_logger.warning("create_job is not a valid property of %s", str(type(self)))
+
+    @property
+    def is_enabled(self) -> bool:
+        """Specifies if the schedule is enabled or not.
+
+        :return: True if the schedule is enabled, False otherwise.
+        :rtype: bool
+        """
+        return self._is_enabled
+
+    @property
+    def provisioning_state(self) -> str:
+        """Returns the schedule's provisioning state. The possible values include
+        "Creating", "Updating", "Deleting", "Succeeded", "Failed", "Canceled".
+
+        :return: The schedule's provisioning state.
+        :rtype: str
+        """
+        return self._provisioning_state
+
+    @property
+    def type(self) -> Optional[str]:
+        """The schedule type. Accepted values are 'job' and 'monitor'.
+
+        :return: The schedule type.
+        :rtype: str
+        """
+        return self._type
+
+    def _to_dict(self) -> Dict:
+        res: dict = self._dump_for_validation()
+        return res
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestSchedule) -> "Schedule":
+        from azure.ai.ml.entities._data_import.schedule import ImportDataSchedule
+        from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule
+
+        if obj.properties.action.action_type == RestScheduleActionType.CREATE_JOB:
+            return JobSchedule._from_rest_object(obj)
+        if obj.properties.action.action_type == RestScheduleActionType.CREATE_MONITOR:
+            res_monitor_schedule: Schedule = MonitorSchedule._from_rest_object(obj)
+            return res_monitor_schedule
+        if obj.properties.action.action_type == RestScheduleActionType.IMPORT_DATA:
+            res_data_schedule: Schedule = ImportDataSchedule._from_rest_object(obj)
+            return res_data_schedule
+        msg = f"Unsupported schedule type {obj.properties.action.action_type}"
+        raise ScheduleException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.SCHEDULE,
+            error_category=ErrorCategory.SYSTEM_ERROR,
+        )
+
+
+class JobSchedule(RestTranslatableMixin, Schedule, TelemetryMixin):
+    """Class for managing job schedules.
+
+    :keyword name: The name of the schedule.
+    :paramtype name: str
+    :keyword trigger: The trigger configuration for the schedule.
+    :paramtype trigger: Union[~azure.ai.ml.entities.CronTrigger, ~azure.ai.ml.entities.RecurrenceTrigger]
+    :keyword create_job: The job definition or an existing job name.
+    :paramtype create_job: Union[~azure.ai.ml.entities.Job, str]
+    :keyword display_name: The display name of the schedule.
+    :paramtype display_name: Optional[str]
+    :keyword description: The description of the schedule.
+    :paramtype description: Optional[str]
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+    :paramtype tags: Optional[dict[str, str]]
+    :keyword properties: A dictionary of properties to associate with the schedule.
+    :paramtype properties: Optional[dict[str, str]]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START job_schedule_configuration]
+            :end-before: [END job_schedule_configuration]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a JobSchedule.
+    """
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        trigger: Optional[Union[CronTrigger, RecurrenceTrigger]],
+        create_job: Union[Job, str],
+        display_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        properties: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(
+            name=name,
+            trigger=trigger,
+            display_name=display_name,
+            description=description,
+            tags=tags,
+            properties=properties,
+            **kwargs,
+        )
+        self._create_job = create_job
+        self._type = ScheduleType.JOB
+
+    @property
+    def create_job(self) -> Union[Job, str]:
+        """Return the job associated with the schedule.
+
+        :return: The job definition or an existing job name.
+        :rtype: Union[~azure.ai.ml.entities.Job, str]
+        """
+        return self._create_job
+
+    @create_job.setter
+    def create_job(self, value: Union[Job, str]) -> None:
+        """Sets the job that will be run when the schedule is triggered.
+
+        :param value: The job definition or an existing job name.
+        :type value: Union[~azure.ai.ml.entities.Job, str]
+        """
+        self._create_job = value
+
+    @classmethod
+    def _load(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "JobSchedule":
+        data = data or {}
+        params_override = params_override or []
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+        return JobSchedule(
+            base_path=context[BASE_PATH_CONTEXT_KEY],
+            **load_from_dict(JobScheduleSchema, data, context, **kwargs),
+        )
+
+    @classmethod
+    def _load_from_rest_dict(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "JobSchedule":
+        """
+        Load job schedule from rest object dict.
+
+        This function is added because the user-faced schema is different from the rest one.
+
+        For example:
+
+        user yaml create_job is a file reference with updates(not a job definition):
+
+        .. code-block:: yaml
+
+            create_job:
+                job: ./job.yaml
+                inputs:
+                    input: 10
+
+        while what we get from rest will be a complete job definition:
+
+        .. code-block:: yaml
+
+            create_job:
+                name: xx
+                jobs:
+                    node1: ...
+                inputs:
+                    input: ..
+
+        :param data: The REST object to convert
+        :type data: Optional[Dict]
+        :param yaml_path: The yaml path
+        :type yaml_path: Optional[Union[PathLike str]]
+        :param params_override: A list of parameter overrides
+        :type params_override: Optional[list]
+        :return: The job schedule
+        :rtype: JobSchedule
+        """
+        data = data or {}
+        params_override = params_override or []
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+        create_job_key = "create_job"
+        if create_job_key not in data:
+            msg = "Job definition for schedule '{}' can not be None."
+            raise ScheduleException(
+                message=msg.format(data["name"]),
+                no_personal_data_message=msg.format("[name]"),
+                target=ErrorTarget.JOB,
+                error_category=ErrorCategory.SYSTEM_ERROR,
+            )
+        # Load the job definition separately
+        create_job_data = data.pop(create_job_key)
+        # Save the id for remote job reference before load job, as data dict will be changed
+        job_id = create_job_data.get("id")
+        if isinstance(job_id, str) and job_id.startswith(ARM_ID_PREFIX):
+            job_id = job_id[len(ARM_ID_PREFIX) :]
+        create_job = Job._load(
+            data=create_job_data,
+            **kwargs,
+        )
+        # Set id manually as it is a dump only field in schema
+        create_job._id = job_id
+        schedule = JobSchedule(
+            base_path=context[BASE_PATH_CONTEXT_KEY],
+            **load_from_dict(JobScheduleSchema, data, context, **kwargs),
+        )
+        schedule.create_job = create_job
+        return schedule
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> JobScheduleSchema:
+        return JobScheduleSchema(context=context)
+
+    def _customized_validate(self) -> MutableValidationResult:
+        """Validate the resource with customized logic.
+
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        if isinstance(self.create_job, PipelineJob):
+            return self.create_job._validate()
+        return self._create_empty_validation_result()
+
+    @classmethod
+    def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]:
+        """Get the fields that should be skipped in schema validation.
+
+        Override this method to add customized validation logic.
+
+        :return: The list of fields to skip in schema validation
+        :rtype: typing.List[str]
+        """
+        return ["create_job"]
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestSchedule) -> "JobSchedule":
+        properties = obj.properties
+        action: JobScheduleAction = properties.action
+        if action.job_definition is None:
+            msg = "Job definition for schedule '{}' can not be None."
+            raise ScheduleException(
+                message=msg.format(obj.name),
+                no_personal_data_message=msg.format("[name]"),
+                target=ErrorTarget.JOB,
+                error_category=ErrorCategory.SYSTEM_ERROR,
+            )
+        if camel_to_snake(action.job_definition.job_type) not in [JobType.PIPELINE, JobType.COMMAND, JobType.SPARK]:
+            msg = f"Unsupported job type {action.job_definition.job_type} for schedule '{{}}'."
+            raise ScheduleException(
+                message=msg.format(obj.name),
+                no_personal_data_message=msg.format("[name]"),
+                target=ErrorTarget.JOB,
+                # Classified as user_error as we may support other type afterwards.
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        # Wrap job definition with JobBase for Job._from_rest_object call.
+        create_job = RestJobBase(properties=action.job_definition)
+        # id is a readonly field so set it after init.
+        # TODO: Add this support after source job id move to JobBaseProperties
+        if hasattr(action.job_definition, "source_job_id"):
+            create_job.id = action.job_definition.source_job_id
+        create_job = Job._from_rest_object(create_job)
+        return cls(
+            trigger=TriggerBase._from_rest_object(properties.trigger),
+            create_job=create_job,
+            name=obj.name,
+            display_name=properties.display_name,
+            description=properties.description,
+            tags=properties.tags,
+            properties=properties.properties,
+            provisioning_state=properties.provisioning_state,
+            is_enabled=properties.is_enabled,
+            creation_context=SystemData._from_rest_object(obj.system_data),
+        )
+
+    def _to_rest_object(self) -> RestSchedule:
+        """Build current parameterized schedule instance to a schedule object before submission.
+
+        :return: Rest schedule.
+        :rtype: RestSchedule
+        """
+        if isinstance(self.create_job, BaseNode):
+            self.create_job = self.create_job._to_job()
+        private_enabled = is_private_preview_enabled()
+        if isinstance(self.create_job, PipelineJob):
+            job_definition = self.create_job._to_rest_object().properties
+            # Set the source job id, as it is used only for schedule scenario.
+            job_definition.source_job_id = self.create_job.id
+        elif private_enabled and isinstance(self.create_job, (CommandJob, SparkJob)):
+            job_definition = self.create_job._to_rest_object().properties
+            # TODO: Merge this branch with PipelineJob after source job id move to JobBaseProperties
+            # job_definition.source_job_id = self.create_job.id
+        elif isinstance(self.create_job, str):  # arm id reference
+            # TODO: Update this after source job id move to JobBaseProperties
+            # Rest pipeline job will hold a 'Default' as experiment_name,
+            # MFE will add default if None, so pass an empty string here.
+            job_definition = RestPipelineJob(source_job_id=self.create_job, experiment_name="")
+        else:
+            msg = "Unsupported job type '{}' in schedule {}."
+            raise ValidationException(
+                message=msg.format(type(self.create_job).__name__, self.name),
+                no_personal_data_message=msg.format("[type]", "[name]"),
+                target=ErrorTarget.SCHEDULE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        return RestSchedule(
+            properties=ScheduleProperties(
+                description=self.description,
+                properties=self.properties,
+                tags=self.tags,
+                action=JobScheduleAction(job_definition=job_definition),
+                display_name=self.display_name,
+                is_enabled=self._is_enabled,
+                trigger=self.trigger._to_rest_object() if self.trigger is not None else None,
+            )
+        )
+
+    def __str__(self) -> str:
+        try:
+            res_yaml: str = self._to_yaml()
+            return res_yaml
+        except BaseException:  # pylint: disable=W0718
+            res_jobSchedule: str = super(JobSchedule, self).__str__()
+            return res_jobSchedule
+
+    # pylint: disable-next=docstring-missing-param
+    def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict[Literal["trigger_type"], str]:
+        """Return the telemetry values of schedule.
+
+        :return: A dictionary with telemetry values
+        :rtype: Dict[Literal["trigger_type"], str]
+        """
+        return {"trigger_type": type(self.trigger).__name__}
+
+
+class ScheduleTriggerResult:
+    """Schedule trigger result returned by trigger an enabled schedule once.
+
+    This class shouldn't be instantiated directly. Instead, it is used as the return type of schedule trigger.
+
+    :ivar str job_name:
+    :ivar str schedule_action_type:
+    """
+
+    def __init__(self, **kwargs):
+        self.job_name = kwargs.get("job_name", None)
+        self.schedule_action_type = kwargs.get("schedule_action_type", None)
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestTriggerRunSubmissionDto) -> "ScheduleTriggerResult":
+        """Construct a ScheduleJob from a rest object.
+
+        :param obj: The rest object to construct from.
+        :type obj: ~azure.ai.ml._restclient.v2024_01_01_preview.models.TriggerRunSubmissionDto
+        :return: The constructed ScheduleJob.
+        :rtype: ScheduleTriggerResult
+        """
+        return cls(
+            schedule_action_type=obj.schedule_action_type,
+            job_name=obj.submission_id,
+        )
+
+    def _to_dict(self) -> dict:
+        """Convert the object to a dictionary.
+        :return: The dictionary representation of the object.
+        :rtype: dict
+        """
+        return {
+            "job_name": self.job_name,
+            "schedule_action_type": self.schedule_action_type,
+        }
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/trigger.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/trigger.py
new file mode 100644
index 00000000..855aac9e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_schedule/trigger.py
@@ -0,0 +1,290 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+import logging
+from abc import ABC
+from datetime import datetime
+from typing import List, Optional, Union
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import CronTrigger as RestCronTrigger
+from azure.ai.ml._restclient.v2023_04_01_preview.models import RecurrenceSchedule as RestRecurrencePattern
+from azure.ai.ml._restclient.v2023_04_01_preview.models import RecurrenceTrigger as RestRecurrenceTrigger
+from azure.ai.ml._restclient.v2023_04_01_preview.models import TriggerBase as RestTriggerBase
+from azure.ai.ml._restclient.v2023_04_01_preview.models import TriggerType as RestTriggerType
+from azure.ai.ml._utils.utils import camel_to_snake, snake_to_camel
+from azure.ai.ml.constants import TimeZone
+from azure.ai.ml.entities._mixins import RestTranslatableMixin
+
+module_logger = logging.getLogger(__name__)
+
+
+class TriggerBase(RestTranslatableMixin, ABC):
+    """Base class of Trigger.
+
+    This class should not be instantiated directly. Instead, use one of its subclasses.
+
+    :keyword type: The type of trigger.
+    :paramtype type: str
+    :keyword start_time: Specifies the start time of the schedule in ISO 8601 format.
+    :paramtype start_time: Optional[Union[str, datetime]]
+    :keyword end_time: Specifies the end time of the schedule in ISO 8601 format.
+        Note that end_time is not supported for compute schedules.
+    :paramtype end_time: Optional[Union[str, datetime]]
+    :keyword time_zone: The time zone where the schedule will run. Defaults to UTC(+00:00).
+        Note that this applies to the start_time and end_time.
+    :paramtype time_zone: ~azure.ai.ml.constants.TimeZone
+    """
+
+    def __init__(
+        self,
+        *,
+        type: str,  # pylint: disable=redefined-builtin
+        start_time: Optional[Union[str, datetime]] = None,
+        end_time: Optional[Union[str, datetime]] = None,
+        time_zone: Union[str, TimeZone] = TimeZone.UTC,
+    ) -> None:
+        super().__init__()
+        self.type = type
+        self.start_time = start_time
+        self.end_time = end_time
+        self.time_zone = time_zone
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestTriggerBase) -> Optional[Union["CronTrigger", "RecurrenceTrigger"]]:
+        if obj.trigger_type == RestTriggerType.RECURRENCE:
+            return RecurrenceTrigger._from_rest_object(obj)
+        if obj.trigger_type == RestTriggerType.CRON:
+            return CronTrigger._from_rest_object(obj)
+
+        return None
+
+
+class RecurrencePattern(RestTranslatableMixin):
+    """Recurrence pattern for a job schedule.
+
+    :keyword hours: The number of hours for the recurrence schedule pattern.
+    :paramtype hours: Union[int, List[int]]
+    :keyword minutes: The number of minutes for the recurrence schedule pattern.
+    :paramtype minutes: Union[int, List[int]]
+    :keyword week_days: A list of days of the week for the recurrence schedule pattern.
+        Acceptable values include: "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"
+    :type week_days: Optional[Union[str, List[str]]]
+    :keyword month_days: A list of days of the month for the recurrence schedule pattern.
+    :paramtype month_days: Optional[Union[int, List[int]]]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START job_schedule_configuration]
+            :end-before: [END job_schedule_configuration]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a JobSchedule to use a RecurrencePattern.
+    """
+
+    def __init__(
+        self,
+        *,
+        hours: Union[int, List[int]],
+        minutes: Union[int, List[int]],
+        week_days: Optional[Union[str, List[str]]] = None,
+        month_days: Optional[Union[int, List[int]]] = None,
+    ) -> None:
+        self.hours = hours
+        self.minutes = minutes
+        self.week_days = week_days
+        self.month_days = month_days
+
+    def _to_rest_object(self) -> RestRecurrencePattern:
+        return RestRecurrencePattern(
+            hours=[self.hours] if not isinstance(self.hours, list) else self.hours,
+            minutes=[self.minutes] if not isinstance(self.minutes, list) else self.minutes,
+            week_days=[self.week_days] if self.week_days and not isinstance(self.week_days, list) else self.week_days,
+            month_days=(
+                [self.month_days] if self.month_days and not isinstance(self.month_days, list) else self.month_days
+            ),
+        )
+
+    def _to_rest_compute_pattern_object(self) -> RestRecurrencePattern:
+        # This function is added because we can't make compute trigger to use same class
+        # with schedule from service side.
+        if self.month_days:
+            module_logger.warning("'month_days' is ignored for not supported on compute recurrence schedule.")
+        return RestRecurrencePattern(
+            hours=[self.hours] if not isinstance(self.hours, list) else self.hours,
+            minutes=[self.minutes] if not isinstance(self.minutes, list) else self.minutes,
+            week_days=[self.week_days] if self.week_days and not isinstance(self.week_days, list) else self.week_days,
+        )
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestRecurrencePattern) -> "RecurrencePattern":
+        return cls(
+            hours=obj.hours,
+            minutes=obj.minutes,
+            week_days=obj.week_days,
+            month_days=obj.month_days if hasattr(obj, "month_days") else None,
+        )
+
+
+class CronTrigger(TriggerBase):
+    """Cron Trigger for a job schedule.
+
+    :keyword expression: The cron expression of schedule, following NCronTab format.
+    :paramtype expression: str
+    :keyword start_time: The start time for the trigger. If using a datetime object, leave the tzinfo as None and use
+        the ``time_zone`` parameter to specify a time zone if needed. If using a string, use the format
+        YYYY-MM-DDThh:mm:ss. Defaults to running the first workload instantly and continuing future workloads
+        based on the schedule. If the start time is in the past, the first workload is run at the next calculated run
+        time.
+    :paramtype start_time: Optional[Union[str, datetime]]
+    :keyword end_time: The start time for the trigger. If using a datetime object, leave the tzinfo as None and use
+        the ``time_zone`` parameter to specify a time zone if needed. If using a string, use the format
+        YYYY-MM-DDThh:mm:ss. Note that end_time is not supported for compute schedules.
+    :paramtype end_time: Optional[Union[str, datetime]]
+    :keyword time_zone: The time zone where the schedule will run. Defaults to UTC(+00:00).
+        Note that this applies to the start_time and end_time.
+    :paramtype time_zone: Union[str, ~azure.ai.ml.constants.TimeZone]
+    :raises Exception: Raised if end_time is in the past.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START cron_trigger_configuration]
+            :end-before: [END cron_trigger_configuration]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a CronTrigger.
+    """
+
+    def __init__(
+        self,
+        *,
+        expression: str,
+        start_time: Optional[Union[str, datetime]] = None,
+        end_time: Optional[Union[str, datetime]] = None,
+        time_zone: Union[str, TimeZone] = TimeZone.UTC,
+    ) -> None:
+        super().__init__(
+            type=RestTriggerType.CRON,
+            start_time=start_time,
+            end_time=end_time,
+            time_zone=time_zone,
+        )
+        self.expression = expression
+
+    def _to_rest_object(self) -> RestCronTrigger:  # v2022_12_01.models.CronTrigger
+        return RestCronTrigger(
+            trigger_type=self.type,
+            expression=self.expression,
+            start_time=self.start_time,
+            end_time=self.end_time,
+            time_zone=self.time_zone,
+        )
+
+    def _to_rest_compute_cron_object(self) -> RestCronTrigger:  # v2022_12_01_preview.models.CronTrigger
+        # This function is added because we can't make compute trigger to use same class
+        # with schedule from service side.
+        if self.end_time:
+            module_logger.warning("'end_time' is ignored for not supported on compute schedule.")
+        return RestCronTrigger(
+            expression=self.expression,
+            start_time=self.start_time,
+            time_zone=self.time_zone,
+        )
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestCronTrigger) -> "CronTrigger":
+        return cls(
+            expression=obj.expression,
+            start_time=obj.start_time,
+            end_time=obj.end_time,
+            time_zone=obj.time_zone,
+        )
+
+
+class RecurrenceTrigger(TriggerBase):
+    """Recurrence trigger for a job schedule.
+
+    :keyword start_time: Specifies the start time of the schedule in ISO 8601 format.
+    :paramtype start_time: Optional[Union[str, datetime]]
+    :keyword end_time: Specifies the end time of the schedule in ISO 8601 format.
+        Note that end_time is not supported for compute schedules.
+    :paramtype end_time: Optional[Union[str, datetime]]
+    :keyword time_zone: The time zone where the schedule will run. Defaults to UTC(+00:00).
+        Note that this applies to the start_time and end_time.
+    :paramtype time_zone: Union[str, ~azure.ai.ml.constants.TimeZone]
+    :keyword frequency: Specifies the frequency that the schedule should be triggered with.
+     Possible values include: "minute", "hour", "day", "week", "month".
+    :type frequency: str
+    :keyword interval: Specifies the interval in conjunction with the frequency that the schedule should be triggered
+        with.
+    :paramtype interval: int
+    :keyword schedule: Specifies the recurrence pattern.
+    :paramtype schedule: Optional[~azure.ai.ml.entities.RecurrencePattern]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START job_schedule_configuration]
+            :end-before: [END job_schedule_configuration]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a JobSchedule to trigger recurrence every 4 weeks.
+    """
+
+    def __init__(
+        self,
+        *,
+        frequency: str,
+        interval: int,
+        schedule: Optional[RecurrencePattern] = None,
+        start_time: Optional[Union[str, datetime]] = None,
+        end_time: Optional[Union[str, datetime]] = None,
+        time_zone: Union[str, TimeZone] = TimeZone.UTC,
+    ) -> None:
+        super().__init__(
+            type=RestTriggerType.RECURRENCE,
+            start_time=start_time,
+            end_time=end_time,
+            time_zone=time_zone,
+        )
+        # Create empty pattern as schedule is required in rest model
+        self.schedule = schedule if schedule else RecurrencePattern(hours=[], minutes=[])
+        self.frequency = frequency
+        self.interval = interval
+
+    def _to_rest_object(self) -> RestRecurrenceTrigger:  # v2022_12_01.models.RecurrenceTrigger
+        return RestRecurrenceTrigger(
+            frequency=snake_to_camel(self.frequency),
+            interval=self.interval,
+            schedule=self.schedule._to_rest_object(),
+            start_time=self.start_time,
+            end_time=self.end_time,
+            time_zone=self.time_zone,
+        )
+
+    def _to_rest_compute_recurrence_object(self) -> RestRecurrenceTrigger:
+        # v2022_12_01_preview.models.RecurrenceTrigger
+        # This function is added because we can't make compute trigger to use same class
+        # with schedule from service side.
+        if self.end_time:
+            module_logger.warning("'end_time' is ignored for not supported on compute schedule.")
+        return RestRecurrenceTrigger(
+            frequency=snake_to_camel(self.frequency),
+            interval=self.interval,
+            schedule=self.schedule._to_rest_compute_pattern_object(),
+            start_time=self.start_time,
+            time_zone=self.time_zone,
+        )
+
+    @classmethod
+    def _from_rest_object(cls, obj: RestRecurrenceTrigger) -> "RecurrenceTrigger":
+        return cls(
+            frequency=camel_to_snake(obj.frequency),
+            interval=obj.interval,
+            schedule=RecurrencePattern._from_rest_object(obj.schedule) if obj.schedule else None,
+            start_time=obj.start_time,
+            end_time=obj.end_time,
+            time_zone=obj.time_zone,
+        )