about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py
blob: 188d90447f4003de9c3bbabe0072c7a3d9f42096 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast

from marshmallow import Schema

from azure.ai.ml.entities._component.component import Component, NodeType
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._job.job import Job
from azure.ai.ml.entities._validation import MutableValidationResult

from ..._schema import PathAwareSchema
from .._job.pipeline.pipeline_job_settings import PipelineJobSettings
from .._util import convert_ordered_dict_to_dict, copy_output_setting, validate_attribute_type
from .base_node import BaseNode

if TYPE_CHECKING:
    from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob

module_logger = logging.getLogger(__name__)


class Pipeline(BaseNode):
    """Base class for pipeline node, used for pipeline component version consumption. You should not instantiate this
    class directly. Instead, you should use @pipeline decorator to create a pipeline node.

    :param component: Id or instance of the pipeline component/job to be run for the step.
    :type component: Union[~azure.ai.ml.entities.Component, str]
    :param inputs: Inputs of the pipeline node.
    :type inputs: Optional[Dict[str, Union[
                                    ~azure.ai.ml.entities.Input,
                                    str, bool, int, float, Enum, "Input"]]].
    :param outputs: Outputs of the pipeline node.
    :type outputs: Optional[Dict[str, Union[str, ~azure.ai.ml.entities.Output, "Output"]]]
    :param settings: Setting of pipeline node, only taking effect for root pipeline job.
    :type settings: Optional[~azure.ai.ml.entities._job.pipeline.pipeline_job_settings.PipelineJobSettings]
    """

    def __init__(
        self,
        *,
        component: Union[Component, str],
        inputs: Optional[
            Dict[
                str,
                Union[
                    Input,
                    str,
                    bool,
                    int,
                    float,
                    Enum,
                    "Input",
                ],
            ]
        ] = None,
        outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
        settings: Optional[PipelineJobSettings] = None,
        **kwargs: Any,
    ) -> None:
        # validate init params are valid type
        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
        kwargs.pop("type", None)

        BaseNode.__init__(
            self,
            type=NodeType.PIPELINE,
            component=component,
            inputs=inputs,
            outputs=outputs,
            **kwargs,
        )
        # copy pipeline component output's setting to node level
        self._copy_pipeline_component_out_setting_to_node()
        self._settings: Optional[PipelineJobSettings] = None
        self.settings = settings

    @property
    def component(self) -> Optional[Union[str, Component]]:
        """Id or instance of the pipeline component/job to be run for the step.

        :return: Id or instance of the pipeline component/job.
        :rtype: Union[str, ~azure.ai.ml.entities.Component]
        """
        res: Union[str, Component] = self._component
        return res

    @property
    def settings(self) -> Optional[PipelineJobSettings]:
        """Settings of the pipeline.

        Note: settings is available only when create node as a job.
            i.e. ml_client.jobs.create_or_update(node).

        :return: Settings of the pipeline.
        :rtype: ~azure.ai.ml.entities.PipelineJobSettings
        """
        if self._settings is None:
            self._settings = PipelineJobSettings()
        return self._settings

    @settings.setter
    def settings(self, value: Union[PipelineJobSettings, Dict]) -> None:
        """Set the settings of the pipeline.

        :param value: The settings of the pipeline.
        :type value: Union[~azure.ai.ml.entities.PipelineJobSettings, dict]
        :raises TypeError: If the value is not an instance of PipelineJobSettings or a dict.
        """
        if value is not None:
            if isinstance(value, PipelineJobSettings):
                # since PipelineJobSettings inherit _AttrDict, we need add this branch to distinguish with dict
                pass
            elif isinstance(value, dict):
                value = PipelineJobSettings(**value)
            else:
                raise TypeError("settings must be PipelineJobSettings or dict but got {}".format(type(value)))
        self._settings = value

    @classmethod
    def _get_supported_inputs_types(cls) -> None:
        # Return None here to skip validation,
        # as input could be custom class object(parameter group).
        return None

    @property
    def _skip_required_compute_missing_validation(self) -> bool:
        return True

    @classmethod
    def _get_skip_fields_in_schema_validation(cls) -> List[str]:
        # pipeline component must be a file reference when loading from yaml,
        # so the created object can't pass schema validation.
        return ["component"]

    @classmethod
    def _attr_type_map(cls) -> dict:
        # Use local import to avoid recursive reference as BaseNode is imported in PipelineComponent.
        from azure.ai.ml.entities import PipelineComponent

        return {
            "component": (str, PipelineComponent),
        }

    def _to_job(self) -> "PipelineJob":
        from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob

        return PipelineJob(
            name=self.name,
            display_name=self.display_name,
            description=self.description,
            tags=self.tags,
            properties=self.properties,
            # Filter None out to avoid case below failed with conflict keys check:
            # group: None (user not specified)
            # group.xx: 1 (user specified
            inputs={k: v for k, v in self._job_inputs.items() if v},
            outputs=self._job_outputs,
            component=self.component,
            settings=self.settings,
        )

    def _customized_validate(self) -> MutableValidationResult:
        """Check unsupported settings when use as a node.

        :return: The validation result
        :rtype: MutableValidationResult
        """
        # Note: settings is not supported on node,
        # jobs.create_or_update(node) will call node._to_job() at first,
        # thus won't reach here.
        # pylint: disable=protected-access
        from azure.ai.ml.entities import PipelineComponent

        validation_result = super(Pipeline, self)._customized_validate()
        ignored_keys = PipelineComponent._check_ignored_keys(self)
        if ignored_keys:
            validation_result.append_warning(message=f"{ignored_keys} ignored on node {self.name!r}.")
        if isinstance(self.component, PipelineComponent):
            validation_result.merge_with(self.component._customized_validate())
        return validation_result

    def _to_rest_object(self, **kwargs: Any) -> dict:
        rest_obj: Dict = super()._to_rest_object(**kwargs)
        rest_obj.update(
            convert_ordered_dict_to_dict(
                {
                    "componentId": self._get_component_id(),
                }
            )
        )
        return rest_obj

    def _build_inputs(self) -> Dict:
        inputs = super(Pipeline, self)._build_inputs()
        built_inputs = {}
        # Validate and remove non-specified inputs
        for key, value in inputs.items():
            if value is not None:
                built_inputs[key] = value
        return built_inputs

    @classmethod
    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
        from azure.ai.ml._schema.pipeline.pipeline_component import PipelineSchema

        return PipelineSchema(context=context)

    def _copy_pipeline_component_out_setting_to_node(self) -> None:
        """Copy pipeline component output's setting to node level."""
        from azure.ai.ml.entities import PipelineComponent
        from azure.ai.ml.entities._job.pipeline._io import NodeOutput

        if not isinstance(self.component, PipelineComponent):
            return
        for key, val in self.component.outputs.items():
            node_output = cast(NodeOutput, self.outputs.get(key))
            copy_output_setting(source=val, target=node_output)

    @classmethod
    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job":
        raise NotImplementedError()