about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/job.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/job.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/job.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/job.py363
1 files changed, 363 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/job.py
new file mode 100644
index 00000000..b181636e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/job.py
@@ -0,0 +1,363 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import json
+import logging
+import traceback
+from abc import abstractmethod
+from collections import OrderedDict
+from os import PathLike
+from pathlib import Path
+from typing import IO, Any, AnyStr, Dict, List, Optional, Tuple, Type, Union
+
+from azure.ai.ml._restclient.runhistory.models import Run
+from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase, JobService
+from azure.ai.ml._restclient.v2023_04_01_preview.models import JobType as RestJobType
+from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase as JobBase_2401
+from azure.ai.ml._restclient.v2024_01_01_preview.models import JobType as RestJobType_20240101Preview
+from azure.ai.ml._utils._html_utils import make_link, to_html
+from azure.ai.ml._utils.utils import dump_yaml_to_file
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, CommonYamlFields
+from azure.ai.ml.constants._compute import ComputeType
+from azure.ai.ml.constants._job.job import JobServices, JobType
+from azure.ai.ml.entities._mixins import TelemetryMixin
+from azure.ai.ml.entities._resource import Resource
+from azure.ai.ml.entities._util import find_type_in_override
+from azure.ai.ml.exceptions import (
+    ErrorCategory,
+    ErrorTarget,
+    JobException,
+    JobParsingError,
+    PipelineChildJobError,
+    ValidationErrorType,
+    ValidationException,
+)
+
+from ._studio_url_from_job_id import studio_url_from_job_id
+from .pipeline._component_translatable import ComponentTranslatableMixin
+
+module_logger = logging.getLogger(__name__)
+
+
+def _is_pipeline_child_job(job: JobBase) -> bool:
+    # pipeline child job has no properties, so we can check through testing job.properties
+    # if backend has spec changes, this method need to be updated
+    return job.properties is None
+
+
+class Job(Resource, ComponentTranslatableMixin, TelemetryMixin):
+    """Base class for jobs.
+
+    This class should not be instantiated directly. Instead, use one of its subclasses.
+
+    :param name: The name of the job.
+    :type name: Optional[str]
+    :param display_name: The display name of the job.
+    :type display_name: Optional[str]
+    :param description: The description of the job.
+    :type description: Optional[str]
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: Optional[dict[str, str]]
+    :param properties: The job property dictionary.
+    :type properties: Optional[dict[str, str]]
+    :param experiment_name: The name of the experiment the job will be created under. Defaults to the name of the
+        current directory.
+    :type experiment_name: Optional[str]
+    :param services: Information on services associated with the job.
+    :type services: Optional[dict[str, ~azure.ai.ml.entities.JobService]]
+    :param compute: Information about the compute resources associated with the job.
+    :type compute: Optional[str]
+    """
+
+    def __init__(
+        self,
+        name: Optional[str] = None,
+        display_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        properties: Optional[Dict] = None,
+        experiment_name: Optional[str] = None,
+        compute: Optional[str] = None,
+        services: Optional[Dict[str, JobService]] = None,
+        **kwargs: Any,
+    ) -> None:
+        self._type: Optional[str] = kwargs.pop("type", JobType.COMMAND)
+        self._status: Optional[str] = kwargs.pop("status", None)
+        self._log_files: Optional[Dict] = kwargs.pop("log_files", None)
+
+        super().__init__(
+            name=name,
+            description=description,
+            tags=tags,
+            properties=properties,
+            **kwargs,
+        )
+
+        self.display_name = display_name
+        self.experiment_name = experiment_name
+        self.compute: Any = compute
+        self.services = services
+
+    @property
+    def type(self) -> Optional[str]:
+        """The type of the job.
+
+        :return: The type of the job.
+        :rtype: Optional[str]
+        """
+        return self._type
+
+    @property
+    def status(self) -> Optional[str]:
+        """The status of the job.
+
+        Common values returned include "Running", "Completed", and "Failed". All possible values are:
+
+            * NotStarted - This is a temporary state that client-side Run objects are in before cloud submission.
+            * Starting - The Run has started being processed in the cloud. The caller has a run ID at this point.
+            * Provisioning - On-demand compute is being created for a given job submission.
+            * Preparing - The run environment is being prepared and is in one of two stages:
+                * Docker image build
+                * conda environment setup
+            * Queued - The job is queued on the compute target. For example, in BatchAI, the job is in a queued state
+                while waiting for all the requested nodes to be ready.
+            * Running - The job has started to run on the compute target.
+            * Finalizing - User code execution has completed, and the run is in post-processing stages.
+            * CancelRequested - Cancellation has been requested for the job.
+            * Completed - The run has completed successfully. This includes both the user code execution and run
+                post-processing stages.
+            * Failed - The run failed. Usually the Error property on a run will provide details as to why.
+            * Canceled - Follows a cancellation request and indicates that the run is now successfully cancelled.
+            * NotResponding - For runs that have Heartbeats enabled, no heartbeat has been recently sent.
+
+        :return: Status of the job.
+        :rtype: Optional[str]
+        """
+        return self._status
+
+    @property
+    def log_files(self) -> Optional[Dict[str, str]]:
+        """Job output files.
+
+        :return: The dictionary of log names and URLs.
+        :rtype: Optional[Dict[str, str]]
+        """
+        return self._log_files
+
+    @property
+    def studio_url(self) -> Optional[str]:
+        """Azure ML studio endpoint.
+
+        :return: The URL to the job details page.
+        :rtype: Optional[str]
+        """
+        if self.services and (JobServices.STUDIO in self.services.keys()):
+            res: Optional[str] = self.services[JobServices.STUDIO].endpoint
+            return res
+
+        return studio_url_from_job_id(self.id) if self.id else None
+
+    def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
+        """Dumps the job content into a file in YAML format.
+
+        :param dest: The local path or file stream to write the YAML content to.
+            If dest is a file path, a new file will be created.
+            If dest is an open file, the file will be written to directly.
+        :type dest: Union[PathLike, str, IO[AnyStr]]
+        :raises FileExistsError: Raised if dest is a file path and the file already exists.
+        :raises IOError: Raised if dest is an open file and the file is not writable.
+        """
+        path = kwargs.pop("path", None)
+        yaml_serialized = self._to_dict()
+        dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)
+
+    def _get_base_info_dict(self) -> OrderedDict:
+        return OrderedDict(
+            [
+                ("Experiment", self.experiment_name),
+                ("Name", self.name),
+                ("Type", self._type),
+                ("Status", self._status),
+            ]
+        )
+
+    def _repr_html_(self) -> str:
+        info = self._get_base_info_dict()
+        if self.studio_url:
+            info.update(
+                [
+                    (
+                        "Details Page",
+                        make_link(self.studio_url, "Link to Azure Machine Learning studio"),
+                    ),
+                ]
+            )
+        res: str = to_html(info)
+        return res
+
+    @abstractmethod
+    def _to_dict(self) -> Dict:
+        pass
+
+    @classmethod
+    def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple:
+        from azure.ai.ml.entities._builders.command import Command
+        from azure.ai.ml.entities._builders.spark import Spark
+        from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+        from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob
+        from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob
+        from azure.ai.ml.entities._job.import_job import ImportJob
+        from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
+        from azure.ai.ml.entities._job.sweep.sweep_job import SweepJob
+
+        job_type: Optional[Type["Job"]] = None
+        type_in_override = find_type_in_override(params_override)
+        type_str = type_in_override or data.get(CommonYamlFields.TYPE, JobType.COMMAND)  # override takes the priority
+        if type_str == JobType.COMMAND:
+            job_type = Command
+        elif type_str == JobType.SPARK:
+            job_type = Spark
+        elif type_str == JobType.IMPORT:
+            job_type = ImportJob
+        elif type_str == JobType.SWEEP:
+            job_type = SweepJob
+        elif type_str == JobType.AUTOML:
+            job_type = AutoMLJob
+        elif type_str == JobType.PIPELINE:
+            job_type = PipelineJob
+        elif type_str == JobType.FINE_TUNING:
+            job_type = FineTuningJob
+        elif type_str == JobType.DISTILLATION:
+            job_type = DistillationJob
+        else:
+            msg = f"Unsupported job type: {type_str}."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.JOB,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+        return job_type, type_str
+
+    @classmethod
+    def _load(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "Job":
+        """Load a job object from a yaml file.
+
+        :param cls: Indicates that this is a class method.
+        :type cls: class
+        :param data: Data Dictionary, defaults to None
+        :type data: Dict
+        :param yaml_path: YAML Path, defaults to None
+        :type yaml_path: Union[PathLike, str]
+        :param params_override: Fields to overwrite on top of the yaml file.
+            Format is [{"field1": "value1"}, {"field2": "value2"}], defaults to None
+        :type params_override: List[Dict]
+        :raises Exception: An exception
+        :return: Loaded job object.
+        :rtype: Job
+        """
+        data = data or {}
+        params_override = params_override or []
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+        job_type, type_str = cls._resolve_cls_and_type(data, params_override)
+        job: Job = job_type._load_from_dict(
+            data=data,
+            context=context,
+            additional_message=f"If you are trying to configure a job that is not of type {type_str}, please specify "
+            f"the correct job type in the 'type' property.",
+            **kwargs,
+        )
+        if yaml_path:
+            job._source_path = yaml_path
+        return job
+
+    @classmethod
+    def _from_rest_object(  # pylint: disable=too-many-return-statements
+        cls, obj: Union[JobBase, JobBase_2401, Run]
+    ) -> "Job":
+        from azure.ai.ml.entities import PipelineJob
+        from azure.ai.ml.entities._builders.command import Command
+        from azure.ai.ml.entities._builders.spark import Spark
+        from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+        from azure.ai.ml.entities._job.base_job import _BaseJob
+        from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob
+        from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob
+        from azure.ai.ml.entities._job.import_job import ImportJob
+        from azure.ai.ml.entities._job.sweep.sweep_job import SweepJob
+
+        try:
+            if isinstance(obj, Run):
+                # special handling for child jobs
+                return _BaseJob._load_from_rest(obj)
+            if _is_pipeline_child_job(obj):
+                raise PipelineChildJobError(job_id=obj.id)
+            if obj.properties.job_type == RestJobType.COMMAND:
+                # PrP only until new import job type is ready on MFE in PuP
+                # compute type 'DataFactory' is reserved compute name for 'clusterless' ADF jobs
+                if obj.properties.compute_id and obj.properties.compute_id.endswith("/" + ComputeType.ADF):
+                    return ImportJob._load_from_rest(obj)
+
+                res_command: Job = Command._load_from_rest_job(obj)
+                if hasattr(obj, "name"):
+                    res_command._name = obj.name  # type: ignore[attr-defined]
+                return res_command
+            if obj.properties.job_type == RestJobType.SPARK:
+                res_spark: Job = Spark._load_from_rest_job(obj)
+                if hasattr(obj, "name"):
+                    res_spark._name = obj.name  # type: ignore[attr-defined]
+                return res_spark
+            if obj.properties.job_type == RestJobType.SWEEP:
+                return SweepJob._load_from_rest(obj)
+            if obj.properties.job_type == RestJobType.AUTO_ML:
+                return AutoMLJob._load_from_rest(obj)
+            if obj.properties.job_type == RestJobType_20240101Preview.FINE_TUNING:
+                if obj.properties.properties.get("azureml.enable_distillation", False):
+                    return DistillationJob._load_from_rest(obj)
+                return FineTuningJob._load_from_rest(obj)
+            if obj.properties.job_type == RestJobType.PIPELINE:
+                res_pipeline: Job = PipelineJob._load_from_rest(obj)
+                return res_pipeline
+        except PipelineChildJobError as ex:
+            raise ex
+        except Exception as ex:
+            error_message = json.dumps(obj.as_dict(), indent=2) if obj else None
+            module_logger.info(
+                "Exception: %s.\n%s\nUnable to parse the job resource: %s.\n",
+                ex,
+                traceback.format_exc(),
+                error_message,
+            )
+            raise JobParsingError(
+                message=str(ex),
+                no_personal_data_message=f"Unable to parse a job resource of type:{type(obj).__name__}",
+                error_category=ErrorCategory.SYSTEM_ERROR,
+            ) from ex
+        msg = f"Unsupported job type {obj.properties.job_type}"
+        raise JobException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.JOB,
+            error_category=ErrorCategory.SYSTEM_ERROR,
+        )
+
+    def _get_telemetry_values(self) -> Dict:  # pylint: disable=arguments-differ
+        telemetry_values = {"type": self.type}
+        return telemetry_values
+
+    @classmethod
+    @abstractmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job":
+        pass