aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
blob: e54c19066d5fd682fb6f269a6698eb484917936c (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access, redefined-builtin
# disable redefined-builtin to use id/type as argument name
import os
from contextlib import contextmanager
from os import PathLike
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Union
from uuid import UUID

import yaml  # type: ignore[import]
from marshmallow import Schema

from ... import Input, Output
from ..._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties
from ..._schema import PathAwareSchema
from ..._utils._arm_id_utils import parse_name_label
from ..._utils._asset_utils import IgnoreFile
from ...constants._common import DefaultOpenEncoding
from ...entities import Component
from ...entities._assets import Code
from ...entities._component._additional_includes import AdditionalIncludes, AdditionalIncludesMixin
from ...entities._component.code import ComponentIgnoreFile
from ...entities._job.distribution import DistributionConfiguration
from ...entities._system_data import SystemData
from ...entities._util import convert_ordered_dict_to_dict
from ...entities._validation import MutableValidationResult
from .._schema.component import InternalComponentSchema
from ._input_outputs import InternalInput, InternalOutput
from ._merkle_tree import create_merkletree
from .code import InternalCode
from .environment import InternalEnvironment
from .node import InternalBaseNode

_ADDITIONAL_INCLUDES_CONFIG_KEY = "additional_includes"
_ADDITIONAL_INCLUDES_SUFFIX = ".additional_includes"


class InternalComponent(Component, AdditionalIncludesMixin):
    # pylint: disable=too-many-instance-attributes, too-many-locals
    """Base class for internal component version, used to define an internal component. Recommended to create instance
    with component_factory.

    :param name: Name of the resource.
    :type name: str
    :param version: Version of the resource.
    :type version: str
    :param id:  Global id of the resource, Azure Resource Manager ID.
    :type id: str
    :param type:  Type of the command, supported is 'command'.
    :type type: str
    :param description: Description of the resource.
    :type description: str
    :param tags: Tag dictionary. Tags can be added, removed, and updated.
    :type tags: dict
    :param properties: Internal use only.
    :type properties: dict
    :param display_name: Display name of the component.
    :type display_name: str
    :param is_deterministic: Whether the component is deterministic.
    :type is_deterministic: bool
    :param inputs: Inputs of the component.
    :type inputs: dict
    :param outputs: Outputs of the component.
    :type outputs: dict
    :param yaml_str: The yaml string of the component.
    :type yaml_str: str
    :param _schema: Schema of the component.
    :type _schema: str
    :param creation_context: Creation metadata of the component.
    :type creation_context: ~azure.ai.ml.entities.SystemData
    """

    def __init__(
        self,
        *,
        _schema: Optional[str] = None,
        name: Optional[str] = None,
        version: Optional[str] = None,
        display_name: Optional[str] = None,
        type: Optional[str] = None,
        description: Optional[str] = None,
        tags: Optional[Dict] = None,
        is_deterministic: Optional[bool] = None,
        successful_return_code: Optional[str] = None,
        inputs: Optional[Dict] = None,
        outputs: Optional[Dict] = None,
        code: Optional[Union[str, os.PathLike]] = None,
        environment: Optional[Dict] = None,
        environment_variables: Optional[Dict] = None,
        command: Optional[str] = None,
        id: Optional[str] = None,
        properties: Optional[Dict] = None,
        yaml_str: Optional[str] = None,
        creation_context: Optional[SystemData] = None,
        scope: Optional[Dict] = None,
        hemera: Optional[Dict] = None,
        hdinsight: Optional[Dict] = None,
        parallel: Optional[Dict] = None,
        starlite: Optional[Dict] = None,
        ae365exepool: Optional[Dict] = None,
        launcher: Optional[Dict] = None,
        datatransfer: Optional[Dict] = None,
        aether: Optional[Dict] = None,
        **kwargs,
    ):
        _type, self._type_label = parse_name_label(type)
        super().__init__(
            name=name,
            version=version,
            id=id,
            type=_type,
            description=description,
            tags=tags,
            properties=properties,
            display_name=display_name,
            is_deterministic=is_deterministic,  # type: ignore[arg-type]
            inputs=inputs,
            outputs=outputs,
            yaml_str=yaml_str,
            _schema=_schema,
            creation_context=creation_context,
            **kwargs,
        )
        # Store original yaml
        self._yaml_str = yaml_str
        self._other_parameter = kwargs

        self.successful_return_code = successful_return_code
        self.code = code
        self.environment = InternalEnvironment(**environment) if isinstance(environment, dict) else environment
        self.environment_variables = environment_variables
        # TODO: remove these to keep it a general component class
        self.command = command
        self.scope = scope
        self.hemera = hemera
        self.hdinsight = hdinsight
        self.parallel = parallel
        self.starlite = starlite
        self.ae365exepool = ae365exepool
        self.launcher = launcher
        self.datatransfer = datatransfer
        self.aether = aether

    @classmethod
    def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool):
        component_io = {}
        for name, port in io_dict.items():
            if is_input:
                component_io[name] = InternalInput._from_base(port)
            else:
                component_io[name] = InternalOutput._from_base(port)
        return component_io

    # region AdditionalIncludesMixin

    @classmethod
    def _read_additional_include_configs(cls, yaml_path: Path) -> List[str]:
        """Read additional include configs from the additional includes file.
        The name of the file is the same as the component spec file, with a suffix of ".additional_includes".
        It can be either a yaml file or a text file:
        1. If it is a yaml file, yaml format of additional_includes looks like below:
        ```
        additional_includes:
         - your/local/path
         - type: artifact
           organization: devops_organization
           project: devops_project
           feed: artifacts_feed_name
           name: universal_package_name
           version: package_version
           scope: scope_type
        ```
        2. If it is a text file, each line is a path to include. Note that artifact config is not supported
        in this format.

        :param yaml_path: The yaml path
        :type yaml_path: Path
        :return: The list of additional includes
        :rtype: List[str]
        """
        additional_includes_config_path = yaml_path.with_suffix(_ADDITIONAL_INCLUDES_SUFFIX)
        if additional_includes_config_path.is_file():
            with open(additional_includes_config_path, encoding=DefaultOpenEncoding.READ) as f:
                file_content = f.read()
                try:
                    configs = yaml.safe_load(file_content)
                    if isinstance(configs, dict):
                        return configs.get(_ADDITIONAL_INCLUDES_CONFIG_KEY, [])
                except Exception:  # pylint: disable=W0718
                    # TODO: check if we should catch yaml.YamlError instead here
                    pass
                return [line.strip() for line in file_content.splitlines(keepends=False) if len(line.strip()) > 0]
        return []

    @classmethod
    def _get_additional_includes_field_name(cls) -> str:
        # additional includes for internal components are configured by a file, which is not a field in the yaml
        # return '*' as diagnostics yaml paths and override _get_all_additional_includes_configs.
        return "*"

    def _get_all_additional_includes_configs(self) -> List:
        # internal components must have a source path
        return self._read_additional_include_configs(Path(self._source_path))  # type: ignore[arg-type]
        # TODO: Bug 2881943

    def _get_base_path_for_code(self) -> Path:
        # internal components must have a source path
        return Path(self._source_path).parent  # type: ignore[arg-type]
        # TODO: Bug 2881943

    def _get_origin_code_value(self) -> Union[str, PathLike, None]:
        return super()._get_origin_code_value() or "."

    # endregion

    def _to_ordered_dict_for_yaml_dump(self) -> Dict:
        """Dump the component content into a sorted yaml string.

        :return: The ordered dict
        :rtype: Dict
        """

        obj = super()._to_ordered_dict_for_yaml_dump()
        # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
        if "code" in obj:
            if not self.code:
                del obj["code"]
            else:
                obj["code"] = self.code
        return obj

    @property
    def _additional_includes(self) -> AdditionalIncludes:
        """This property is kept for compatibility with old mldesigner sdk.

        :return: The additional includes
        :rtype: AdditionalIncludes
        """
        obj = self._generate_additional_includes_obj()
        from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes

        obj.__class__ = InternalAdditionalIncludes
        return obj

    # region SchemaValidatableMixin
    @classmethod
    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
        return InternalComponentSchema(context=context)

    def _customized_validate(self) -> MutableValidationResult:
        validation_result = super(InternalComponent, self)._customized_validate()
        skip_path_validation = not self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
            validation_result
        )
        # resolving additional includes & update self._base_path can be dangerous,
        # so we just skip path validation if additional includes is provided.
        # note that there will still be client-side error on job submission (after code is resolved)
        # if paths in environment are invalid
        if isinstance(self.environment, InternalEnvironment):
            validation_result.merge_with(
                self.environment.validate(
                    self._base_path,
                    skip_path_validation=skip_path_validation,
                ),
                field_name="environment",
            )
        return validation_result

    # endregion

    @classmethod
    def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
        # put it here as distribution is shared by some components, e.g. command
        distribution = obj.properties.component_spec.pop("distribution", None)
        init_kwargs = super()._from_rest_object_to_init_params(obj)
        if distribution:
            init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution)
        return init_kwargs

    def _to_rest_object(self) -> ComponentVersion:
        component: Union[Dict[Any, Any], List[Any]] = convert_ordered_dict_to_dict(self._to_dict())
        component["_source"] = self._source  # type: ignore[call-overload]
        # TODO: 2883063

        properties = ComponentVersionProperties(
            component_spec=component,
            description=self.description,
            is_anonymous=self._is_anonymous,
            properties=self.properties,
            tags=self.tags,
        )
        result = ComponentVersion(properties=properties)
        result.name = self.name
        return result

    @classmethod
    def _get_snapshot_id(
        cls,
        code_path: Union[str, PathLike],
        ignore_file: IgnoreFile,
    ) -> str:
        """Get the snapshot id of a component with specific working directory in ml-components. Use this as the name of
        code asset to reuse steps in a pipeline job from ml-components runs.

        :param code_path: The path of the working directory.
        :type code_path: str
        :param ignore_file: The ignore file of the snapshot.
        :type ignore_file: IgnoreFile
        :return: The snapshot id of a component in ml-components with code_path as its working directory.
        :rtype: str
        """
        curr_root = create_merkletree(code_path, ignore_file.is_file_excluded)
        snapshot_id = str(UUID(curr_root.hexdigest_hash[::4]))
        return snapshot_id

    @contextmanager  # type: ignore[arg-type]
    def _try_build_local_code(self) -> Iterable[Code]:
        """Build final code when origin code is a local code.
        Will merge code path with additional includes into a temp folder if additional includes is specified.
        For internal components, file dependencies in environment will be resolved based on the final code.

        :return: The code instance
        :rtype: Iterable[Code]
        """

        tmp_code_dir: Path
        # origin code value of internal component will never be None. check _get_origin_code_value for details
        with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir:
            # use absolute path in case temp folder & work dir are in different drive
            tmp_code_dir = tmp_code_dir.absolute()

            # file dependency in code will be read during internal environment resolution
            # for example, docker file of the environment may be in additional includes;
            # and it will be read then insert to the environment object during resolution.
            # so we need to resolve environment based on the temporary code path
            if isinstance(self.environment, InternalEnvironment):
                self.environment.resolve(base_path=tmp_code_dir)

            # additional includes config file itself should be ignored
            rebased_ignore_file = ComponentIgnoreFile(
                tmp_code_dir,
                additional_includes_file_name=Path(self._source_path)
                .with_suffix(_ADDITIONAL_INCLUDES_SUFFIX)
                .name,  # type: ignore[arg-type]
                # TODO: Bug 2881943
            )

            # Use the snapshot id in ml-components as code name to enable anonymous
            # component reuse from ml-component runs.
            # calculate snapshot id here instead of inside InternalCode to ensure that
            # snapshot id is calculated based on the built code path
            yield InternalCode(
                name=self._get_snapshot_id(
                    # use absolute path in case temp folder & work dir are in different drive
                    tmp_code_dir,
                    # this ignore-file should be rebased to the built code path
                    rebased_ignore_file,
                ),
                version="1",
                base_path=self._base_path,
                path=tmp_code_dir,
                is_anonymous=True,
                ignore_file=rebased_ignore_file,
            )

    def __call__(self, *args, **kwargs) -> InternalBaseNode:
        return super(InternalComponent, self).__call__(*args, **kwargs)