about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/job.py
blob: b181636ed722f7e96a3b0fc0550a0753f7c094a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access

import json
import logging
import traceback
from abc import abstractmethod
from collections import OrderedDict
from os import PathLike
from pathlib import Path
from typing import IO, Any, AnyStr, Dict, List, Optional, Tuple, Type, Union

from azure.ai.ml._restclient.runhistory.models import Run
from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase, JobService
from azure.ai.ml._restclient.v2023_04_01_preview.models import JobType as RestJobType
from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase as JobBase_2401
from azure.ai.ml._restclient.v2024_01_01_preview.models import JobType as RestJobType_20240101Preview
from azure.ai.ml._utils._html_utils import make_link, to_html
from azure.ai.ml._utils.utils import dump_yaml_to_file
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, CommonYamlFields
from azure.ai.ml.constants._compute import ComputeType
from azure.ai.ml.constants._job.job import JobServices, JobType
from azure.ai.ml.entities._mixins import TelemetryMixin
from azure.ai.ml.entities._resource import Resource
from azure.ai.ml.entities._util import find_type_in_override
from azure.ai.ml.exceptions import (
    ErrorCategory,
    ErrorTarget,
    JobException,
    JobParsingError,
    PipelineChildJobError,
    ValidationErrorType,
    ValidationException,
)

from ._studio_url_from_job_id import studio_url_from_job_id
from .pipeline._component_translatable import ComponentTranslatableMixin

module_logger = logging.getLogger(__name__)


def _is_pipeline_child_job(job: JobBase) -> bool:
    # pipeline child job has no properties, so we can check through testing job.properties
    # if backend has spec changes, this method need to be updated
    return job.properties is None


class Job(Resource, ComponentTranslatableMixin, TelemetryMixin):
    """Base class for jobs.

    This class should not be instantiated directly. Instead, use one of its subclasses.

    :param name: The name of the job.
    :type name: Optional[str]
    :param display_name: The display name of the job.
    :type display_name: Optional[str]
    :param description: The description of the job.
    :type description: Optional[str]
    :param tags: Tag dictionary. Tags can be added, removed, and updated.
    :type tags: Optional[dict[str, str]]
    :param properties: The job property dictionary.
    :type properties: Optional[dict[str, str]]
    :param experiment_name: The name of the experiment the job will be created under. Defaults to the name of the
        current directory.
    :type experiment_name: Optional[str]
    :param services: Information on services associated with the job.
    :type services: Optional[dict[str, ~azure.ai.ml.entities.JobService]]
    :param compute: Information about the compute resources associated with the job.
    :type compute: Optional[str]
    """

    def __init__(
        self,
        name: Optional[str] = None,
        display_name: Optional[str] = None,
        description: Optional[str] = None,
        tags: Optional[Dict] = None,
        properties: Optional[Dict] = None,
        experiment_name: Optional[str] = None,
        compute: Optional[str] = None,
        services: Optional[Dict[str, JobService]] = None,
        **kwargs: Any,
    ) -> None:
        self._type: Optional[str] = kwargs.pop("type", JobType.COMMAND)
        self._status: Optional[str] = kwargs.pop("status", None)
        self._log_files: Optional[Dict] = kwargs.pop("log_files", None)

        super().__init__(
            name=name,
            description=description,
            tags=tags,
            properties=properties,
            **kwargs,
        )

        self.display_name = display_name
        self.experiment_name = experiment_name
        self.compute: Any = compute
        self.services = services

    @property
    def type(self) -> Optional[str]:
        """The type of the job.

        :return: The type of the job.
        :rtype: Optional[str]
        """
        return self._type

    @property
    def status(self) -> Optional[str]:
        """The status of the job.

        Common values returned include "Running", "Completed", and "Failed". All possible values are:

            * NotStarted - This is a temporary state that client-side Run objects are in before cloud submission.
            * Starting - The Run has started being processed in the cloud. The caller has a run ID at this point.
            * Provisioning - On-demand compute is being created for a given job submission.
            * Preparing - The run environment is being prepared and is in one of two stages:
                * Docker image build
                * conda environment setup
            * Queued - The job is queued on the compute target. For example, in BatchAI, the job is in a queued state
                while waiting for all the requested nodes to be ready.
            * Running - The job has started to run on the compute target.
            * Finalizing - User code execution has completed, and the run is in post-processing stages.
            * CancelRequested - Cancellation has been requested for the job.
            * Completed - The run has completed successfully. This includes both the user code execution and run
                post-processing stages.
            * Failed - The run failed. Usually the Error property on a run will provide details as to why.
            * Canceled - Follows a cancellation request and indicates that the run is now successfully cancelled.
            * NotResponding - For runs that have Heartbeats enabled, no heartbeat has been recently sent.

        :return: Status of the job.
        :rtype: Optional[str]
        """
        return self._status

    @property
    def log_files(self) -> Optional[Dict[str, str]]:
        """Job output files.

        :return: The dictionary of log names and URLs.
        :rtype: Optional[Dict[str, str]]
        """
        return self._log_files

    @property
    def studio_url(self) -> Optional[str]:
        """Azure ML studio endpoint.

        :return: The URL to the job details page.
        :rtype: Optional[str]
        """
        if self.services and (JobServices.STUDIO in self.services.keys()):
            res: Optional[str] = self.services[JobServices.STUDIO].endpoint
            return res

        return studio_url_from_job_id(self.id) if self.id else None

    def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
        """Dumps the job content into a file in YAML format.

        :param dest: The local path or file stream to write the YAML content to.
            If dest is a file path, a new file will be created.
            If dest is an open file, the file will be written to directly.
        :type dest: Union[PathLike, str, IO[AnyStr]]
        :raises FileExistsError: Raised if dest is a file path and the file already exists.
        :raises IOError: Raised if dest is an open file and the file is not writable.
        """
        path = kwargs.pop("path", None)
        yaml_serialized = self._to_dict()
        dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)

    def _get_base_info_dict(self) -> OrderedDict:
        return OrderedDict(
            [
                ("Experiment", self.experiment_name),
                ("Name", self.name),
                ("Type", self._type),
                ("Status", self._status),
            ]
        )

    def _repr_html_(self) -> str:
        info = self._get_base_info_dict()
        if self.studio_url:
            info.update(
                [
                    (
                        "Details Page",
                        make_link(self.studio_url, "Link to Azure Machine Learning studio"),
                    ),
                ]
            )
        res: str = to_html(info)
        return res

    @abstractmethod
    def _to_dict(self) -> Dict:
        pass

    @classmethod
    def _resolve_cls_and_type(cls, data: Dict, params_override: Optional[List[Dict]] = None) -> Tuple:
        from azure.ai.ml.entities._builders.command import Command
        from azure.ai.ml.entities._builders.spark import Spark
        from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
        from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob
        from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob
        from azure.ai.ml.entities._job.import_job import ImportJob
        from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
        from azure.ai.ml.entities._job.sweep.sweep_job import SweepJob

        job_type: Optional[Type["Job"]] = None
        type_in_override = find_type_in_override(params_override)
        type_str = type_in_override or data.get(CommonYamlFields.TYPE, JobType.COMMAND)  # override takes the priority
        if type_str == JobType.COMMAND:
            job_type = Command
        elif type_str == JobType.SPARK:
            job_type = Spark
        elif type_str == JobType.IMPORT:
            job_type = ImportJob
        elif type_str == JobType.SWEEP:
            job_type = SweepJob
        elif type_str == JobType.AUTOML:
            job_type = AutoMLJob
        elif type_str == JobType.PIPELINE:
            job_type = PipelineJob
        elif type_str == JobType.FINE_TUNING:
            job_type = FineTuningJob
        elif type_str == JobType.DISTILLATION:
            job_type = DistillationJob
        else:
            msg = f"Unsupported job type: {type_str}."
            raise ValidationException(
                message=msg,
                no_personal_data_message=msg,
                target=ErrorTarget.JOB,
                error_category=ErrorCategory.USER_ERROR,
                error_type=ValidationErrorType.INVALID_VALUE,
            )
        return job_type, type_str

    @classmethod
    def _load(
        cls,
        data: Optional[Dict] = None,
        yaml_path: Optional[Union[PathLike, str]] = None,
        params_override: Optional[list] = None,
        **kwargs: Any,
    ) -> "Job":
        """Load a job object from a yaml file.

        :param cls: Indicates that this is a class method.
        :type cls: class
        :param data: Data Dictionary, defaults to None
        :type data: Dict
        :param yaml_path: YAML Path, defaults to None
        :type yaml_path: Union[PathLike, str]
        :param params_override: Fields to overwrite on top of the yaml file.
            Format is [{"field1": "value1"}, {"field2": "value2"}], defaults to None
        :type params_override: List[Dict]
        :raises Exception: An exception
        :return: Loaded job object.
        :rtype: Job
        """
        data = data or {}
        params_override = params_override or []
        context = {
            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
            PARAMS_OVERRIDE_KEY: params_override,
        }
        job_type, type_str = cls._resolve_cls_and_type(data, params_override)
        job: Job = job_type._load_from_dict(
            data=data,
            context=context,
            additional_message=f"If you are trying to configure a job that is not of type {type_str}, please specify "
            f"the correct job type in the 'type' property.",
            **kwargs,
        )
        if yaml_path:
            job._source_path = yaml_path
        return job

    @classmethod
    def _from_rest_object(  # pylint: disable=too-many-return-statements
        cls, obj: Union[JobBase, JobBase_2401, Run]
    ) -> "Job":
        from azure.ai.ml.entities import PipelineJob
        from azure.ai.ml.entities._builders.command import Command
        from azure.ai.ml.entities._builders.spark import Spark
        from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
        from azure.ai.ml.entities._job.base_job import _BaseJob
        from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob
        from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob
        from azure.ai.ml.entities._job.import_job import ImportJob
        from azure.ai.ml.entities._job.sweep.sweep_job import SweepJob

        try:
            if isinstance(obj, Run):
                # special handling for child jobs
                return _BaseJob._load_from_rest(obj)
            if _is_pipeline_child_job(obj):
                raise PipelineChildJobError(job_id=obj.id)
            if obj.properties.job_type == RestJobType.COMMAND:
                # PrP only until new import job type is ready on MFE in PuP
                # compute type 'DataFactory' is reserved compute name for 'clusterless' ADF jobs
                if obj.properties.compute_id and obj.properties.compute_id.endswith("/" + ComputeType.ADF):
                    return ImportJob._load_from_rest(obj)

                res_command: Job = Command._load_from_rest_job(obj)
                if hasattr(obj, "name"):
                    res_command._name = obj.name  # type: ignore[attr-defined]
                return res_command
            if obj.properties.job_type == RestJobType.SPARK:
                res_spark: Job = Spark._load_from_rest_job(obj)
                if hasattr(obj, "name"):
                    res_spark._name = obj.name  # type: ignore[attr-defined]
                return res_spark
            if obj.properties.job_type == RestJobType.SWEEP:
                return SweepJob._load_from_rest(obj)
            if obj.properties.job_type == RestJobType.AUTO_ML:
                return AutoMLJob._load_from_rest(obj)
            if obj.properties.job_type == RestJobType_20240101Preview.FINE_TUNING:
                if obj.properties.properties.get("azureml.enable_distillation", False):
                    return DistillationJob._load_from_rest(obj)
                return FineTuningJob._load_from_rest(obj)
            if obj.properties.job_type == RestJobType.PIPELINE:
                res_pipeline: Job = PipelineJob._load_from_rest(obj)
                return res_pipeline
        except PipelineChildJobError as ex:
            raise ex
        except Exception as ex:
            error_message = json.dumps(obj.as_dict(), indent=2) if obj else None
            module_logger.info(
                "Exception: %s.\n%s\nUnable to parse the job resource: %s.\n",
                ex,
                traceback.format_exc(),
                error_message,
            )
            raise JobParsingError(
                message=str(ex),
                no_personal_data_message=f"Unable to parse a job resource of type:{type(obj).__name__}",
                error_category=ErrorCategory.SYSTEM_ERROR,
            ) from ex
        msg = f"Unsupported job type {obj.properties.job_type}"
        raise JobException(
            message=msg,
            no_personal_data_message=msg,
            target=ErrorTarget.JOB,
            error_category=ErrorCategory.SYSTEM_ERROR,
        )

    def _get_telemetry_values(self) -> Dict:  # pylint: disable=arguments-differ
        telemetry_values = {"type": self.type}
        return telemetry_values

    @classmethod
    @abstractmethod
    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job":
        pass