# --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- # pylint: disable=protected-access import json import logging import traceback from abc import abstractmethod from collections import OrderedDict from os import PathLike from pathlib import Path from typing import IO, Any, AnyStr, Dict, List, Optional, Tuple, Type, Union from azure.ai.ml._restclient.runhistory.models import Run from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase, JobService from azure.ai.ml._restclient.v2023_04_01_preview.models import JobType as RestJobType from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase as JobBase_2401 from azure.ai.ml._restclient.v2024_01_01_preview.models import JobType as RestJobType_20240101Preview from azure.ai.ml._utils._html_utils import make_link, to_html from azure.ai.ml._utils.utils import dump_yaml_to_file from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, CommonYamlFields from azure.ai.ml.constants._compute import ComputeType from azure.ai.ml.constants._job.job import JobServices, JobType from azure.ai.ml.entities._mixins import TelemetryMixin from azure.ai.ml.entities._resource import Resource from azure.ai.ml.entities._util import find_type_in_override from azure.ai.ml.exceptions import ( ErrorCategory, ErrorTarget, JobException, JobParsingError, PipelineChildJobError, ValidationErrorType, ValidationException, ) from ._studio_url_from_job_id import studio_url_from_job_id from .pipeline._component_translatable import ComponentTranslatableMixin module_logger = logging.getLogger(__name__) def _is_pipeline_child_job(job: JobBase) -> bool: # pipeline child job has no properties, so we can check through testing job.properties # if backend has spec changes, this method need to be updated return job.properties is None class Job(Resource, ComponentTranslatableMixin, TelemetryMixin): """Base class for jobs. This class should not be instantiated directly. Instead, use one of its subclasses. :param name: The name of the job. :type name: Optional[str] :param display_name: The display name of the job. :type display_name: Optional[str] :param description: The description of the job. :type description: Optional[str] :param tags: Tag dictionary. Tags can be added, removed, and updated. :type tags: Optional[dict[str, str]] :param properties: The job property dictionary. :type properties: Optional[dict[str, str]] :param experiment_name: The name of the experiment the job will be created under. Defaults to the name of the current directory. :type experiment_name: Optional[str] :param services: Information on services associated with the job. :type services: Optional[dict[str, ~azure.ai.ml.entities.JobService]] :param compute: Information about the compute resources associated with the job. :type compute: Optional[str] """ def __init__( self, name: Optional[str] = None, display_name: Optional[str] = None, description: Optional[str] = None, tags: Optional[Dict] = None, properties: Optional[Dict] = None, experiment_name: Optional[str] = None, compute: Optional[str] = None, services: Optional[Dict[str, JobService]] = None, **kwargs: Any, ) -> None: self._type: Optional[str] = kwargs.pop("type", JobType.COMMAND) self._status: Optional[str] = kwargs.pop("status", None) self._log_files: Optional[Dict] = kwargs.pop("log_files", None) super().__init__( name=name, description=description, tags=tags, properties=properties, **kwargs, ) self.display_name = display_name self.experiment_name = experiment_name self.compute: Any = compute self.services = services @property def type(self) -> Optional[str]: """The type of the job. :return: The type of the job. :rtype: Optional[str] """ return self._type @property def status(self) -> Optional[str]: """The status of the job. Common values returned include "Running", "Completed", and "Failed". All possible values are: * NotStarted - This is a temporary state that client-side Run objects are in before cloud submission. * Starting - The Run has started being processed in the cloud. The caller has a run ID at this point. * Provisioning - On-demand compute is being created for a given job submission. * Preparing - The run environment is being prepared and is in one of two stages: * Docker image build * conda environment setup * Queued - The job is queued on the compute target. For example, in BatchAI, the job is in a queued state while waiting for all the requested nodes to be ready. * Running - The job has started to run on the compute target. * Finalizing - User code execution has completed, and the run is in post-processing stages. * CancelRequested - Cancellation has been requested for the job. * Completed - The run has completed successfully. This includes both the user code execution and run post-processing stages. * Failed - The run failed. Usually the Error property on a run will provide details as to why. * Canceled - Follows a cancellation request and indicates that the run is now successfully cancelled. * NotResponding - For runs that have Heartbeats enabled, no heartbeat has been recently sent. :return: Status of the job. :rtype: Optional[str] """ return self._status @property def log_files(self) -> Optional[Dict[str, str]]: """Job output files. :return: The dictionary of log names and URLs. :rtype: Optional[Dict[str, str]] """ return self._log_files @property def studio_url(self) -> Optional[str]: """Azure ML studio endpoint. :return: The URL to the job details page. :rtype: Optional[str] """ if self.services and (JobServices.STUDIO in self.services.keys()): res: Optional[str] = self.services[JobServices.STUDIO].endpoint return res return studio_url_from_job_id(self.id) if self.id else None def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None: """Dumps the job content into a file in YAML format. :param dest: The local path or file stream to write the YAML content to. If dest is a file path, a new file will be created. If dest is an open file, the file will be written to directly. :type dest: Union[PathLike, str, IO[AnyStr]] :raises FileExistsError: Raised if dest is a file path and the file already exists. :raises IOError: Raised if dest is an open file and the file is not writable. """ path = kwargs.pop("path", None) yaml_serialized = self._to_dict() dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs) def _get_base_info_dict(self) -> OrderedDict: return OrderedDict( [ ("Experiment", self.experiment_name), ("Name", self.name), ("Type", self._type), ("Status", self._status), ] ) def _repr_html_(self) -> str: info = self._get_base_info_dict() if self.studio_url: info.update( [ ( "Details Page", make_link(self.studio_url, "Link to Azure Machine Learning studio"), ), ] ) res: str = to_html(info) return res @abstractmethod def _to_dict(self) -> Dict: pass @classmethod def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple: from azure.ai.ml.entities._builders.command import Command from azure.ai.ml.entities._builders.spark import Spark from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob from azure.ai.ml.entities._job.import_job import ImportJob from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob from azure.ai.ml.entities._job.sweep.sweep_job import SweepJob job_type: Optional[Type["Job"]] = None type_in_override = find_type_in_override(params_override) type_str = type_in_override or data.get(CommonYamlFields.TYPE, JobType.COMMAND) # override takes the priority if type_str == JobType.COMMAND: job_type = Command elif type_str == JobType.SPARK: job_type = Spark elif type_str == JobType.IMPORT: job_type = ImportJob elif type_str == JobType.SWEEP: job_type = SweepJob elif type_str == JobType.AUTOML: job_type = AutoMLJob elif type_str == JobType.PIPELINE: job_type = PipelineJob elif type_str == JobType.FINE_TUNING: job_type = FineTuningJob elif type_str == JobType.DISTILLATION: job_type = DistillationJob else: msg = f"Unsupported job type: {type_str}." raise ValidationException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.USER_ERROR, error_type=ValidationErrorType.INVALID_VALUE, ) return job_type, type_str @classmethod def _load( cls, data: Optional[Dict] = None, yaml_path: Optional[Union[PathLike, str]] = None, params_override: Optional[list] = None, **kwargs: Any, ) -> "Job": """Load a job object from a yaml file. :param cls: Indicates that this is a class method. :type cls: class :param data: Data Dictionary, defaults to None :type data: Dict :param yaml_path: YAML Path, defaults to None :type yaml_path: Union[PathLike, str] :param params_override: Fields to overwrite on top of the yaml file. Format is [{"field1": "value1"}, {"field2": "value2"}], defaults to None :type params_override: List[Dict] :raises Exception: An exception :return: Loaded job object. :rtype: Job """ data = data or {} params_override = params_override or [] context = { BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"), PARAMS_OVERRIDE_KEY: params_override, } job_type, type_str = cls._resolve_cls_and_type(data, params_override) job: Job = job_type._load_from_dict( data=data, context=context, additional_message=f"If you are trying to configure a job that is not of type {type_str}, please specify " f"the correct job type in the 'type' property.", **kwargs, ) if yaml_path: job._source_path = yaml_path return job @classmethod def _from_rest_object( # pylint: disable=too-many-return-statements cls, obj: Union[JobBase, JobBase_2401, Run] ) -> "Job": from azure.ai.ml.entities import PipelineJob from azure.ai.ml.entities._builders.command import Command from azure.ai.ml.entities._builders.spark import Spark from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob from azure.ai.ml.entities._job.base_job import _BaseJob from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob from azure.ai.ml.entities._job.import_job import ImportJob from azure.ai.ml.entities._job.sweep.sweep_job import SweepJob try: if isinstance(obj, Run): # special handling for child jobs return _BaseJob._load_from_rest(obj) if _is_pipeline_child_job(obj): raise PipelineChildJobError(job_id=obj.id) if obj.properties.job_type == RestJobType.COMMAND: # PrP only until new import job type is ready on MFE in PuP # compute type 'DataFactory' is reserved compute name for 'clusterless' ADF jobs if obj.properties.compute_id and obj.properties.compute_id.endswith("/" + ComputeType.ADF): return ImportJob._load_from_rest(obj) res_command: Job = Command._load_from_rest_job(obj) if hasattr(obj, "name"): res_command._name = obj.name # type: ignore[attr-defined] return res_command if obj.properties.job_type == RestJobType.SPARK: res_spark: Job = Spark._load_from_rest_job(obj) if hasattr(obj, "name"): res_spark._name = obj.name # type: ignore[attr-defined] return res_spark if obj.properties.job_type == RestJobType.SWEEP: return SweepJob._load_from_rest(obj) if obj.properties.job_type == RestJobType.AUTO_ML: return AutoMLJob._load_from_rest(obj) if obj.properties.job_type == RestJobType_20240101Preview.FINE_TUNING: if obj.properties.properties.get("azureml.enable_distillation", False): return DistillationJob._load_from_rest(obj) return FineTuningJob._load_from_rest(obj) if obj.properties.job_type == RestJobType.PIPELINE: res_pipeline: Job = PipelineJob._load_from_rest(obj) return res_pipeline except PipelineChildJobError as ex: raise ex except Exception as ex: error_message = json.dumps(obj.as_dict(), indent=2) if obj else None module_logger.info( "Exception: %s.\n%s\nUnable to parse the job resource: %s.\n", ex, traceback.format_exc(), error_message, ) raise JobParsingError( message=str(ex), no_personal_data_message=f"Unable to parse a job resource of type:{type(obj).__name__}", error_category=ErrorCategory.SYSTEM_ERROR, ) from ex msg = f"Unsupported job type {obj.properties.job_type}" raise JobException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.SYSTEM_ERROR, ) def _get_telemetry_values(self) -> Dict: # pylint: disable=arguments-differ telemetry_values = {"type": self.type} return telemetry_values @classmethod @abstractmethod def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job": pass