diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py')
| -rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py new file mode 100644 index 00000000..188d9044 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py @@ -0,0 +1,225 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import logging +from enum import Enum +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast + +from marshmallow import Schema + +from azure.ai.ml.entities._component.component import Component, NodeType +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job.job import Job +from azure.ai.ml.entities._validation import MutableValidationResult + +from ..._schema import PathAwareSchema +from .._job.pipeline.pipeline_job_settings import PipelineJobSettings +from .._util import convert_ordered_dict_to_dict, copy_output_setting, validate_attribute_type +from .base_node import BaseNode + +if TYPE_CHECKING: + from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob + +module_logger = logging.getLogger(__name__) + + +class Pipeline(BaseNode): + """Base class for pipeline node, used for pipeline component version consumption. You should not instantiate this + class directly. Instead, you should use @pipeline decorator to create a pipeline node. + + :param component: Id or instance of the pipeline component/job to be run for the step. + :type component: Union[~azure.ai.ml.entities.Component, str] + :param inputs: Inputs of the pipeline node. + :type inputs: Optional[Dict[str, Union[ + ~azure.ai.ml.entities.Input, + str, bool, int, float, Enum, "Input"]]]. + :param outputs: Outputs of the pipeline node. + :type outputs: Optional[Dict[str, Union[str, ~azure.ai.ml.entities.Output, "Output"]]] + :param settings: Setting of pipeline node, only taking effect for root pipeline job. + :type settings: Optional[~azure.ai.ml.entities._job.pipeline.pipeline_job_settings.PipelineJobSettings] + """ + + def __init__( + self, + *, + component: Union[Component, str], + inputs: Optional[ + Dict[ + str, + Union[ + Input, + str, + bool, + int, + float, + Enum, + "Input", + ], + ] + ] = None, + outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None, + settings: Optional[PipelineJobSettings] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + kwargs.pop("type", None) + + BaseNode.__init__( + self, + type=NodeType.PIPELINE, + component=component, + inputs=inputs, + outputs=outputs, + **kwargs, + ) + # copy pipeline component output's setting to node level + self._copy_pipeline_component_out_setting_to_node() + self._settings: Optional[PipelineJobSettings] = None + self.settings = settings + + @property + def component(self) -> Optional[Union[str, Component]]: + """Id or instance of the pipeline component/job to be run for the step. + + :return: Id or instance of the pipeline component/job. + :rtype: Union[str, ~azure.ai.ml.entities.Component] + """ + res: Union[str, Component] = self._component + return res + + @property + def settings(self) -> Optional[PipelineJobSettings]: + """Settings of the pipeline. + + Note: settings is available only when create node as a job. + i.e. ml_client.jobs.create_or_update(node). + + :return: Settings of the pipeline. + :rtype: ~azure.ai.ml.entities.PipelineJobSettings + """ + if self._settings is None: + self._settings = PipelineJobSettings() + return self._settings + + @settings.setter + def settings(self, value: Union[PipelineJobSettings, Dict]) -> None: + """Set the settings of the pipeline. + + :param value: The settings of the pipeline. + :type value: Union[~azure.ai.ml.entities.PipelineJobSettings, dict] + :raises TypeError: If the value is not an instance of PipelineJobSettings or a dict. + """ + if value is not None: + if isinstance(value, PipelineJobSettings): + # since PipelineJobSettings inherit _AttrDict, we need add this branch to distinguish with dict + pass + elif isinstance(value, dict): + value = PipelineJobSettings(**value) + else: + raise TypeError("settings must be PipelineJobSettings or dict but got {}".format(type(value))) + self._settings = value + + @classmethod + def _get_supported_inputs_types(cls) -> None: + # Return None here to skip validation, + # as input could be custom class object(parameter group). + return None + + @property + def _skip_required_compute_missing_validation(self) -> bool: + return True + + @classmethod + def _get_skip_fields_in_schema_validation(cls) -> List[str]: + # pipeline component must be a file reference when loading from yaml, + # so the created object can't pass schema validation. + return ["component"] + + @classmethod + def _attr_type_map(cls) -> dict: + # Use local import to avoid recursive reference as BaseNode is imported in PipelineComponent. + from azure.ai.ml.entities import PipelineComponent + + return { + "component": (str, PipelineComponent), + } + + def _to_job(self) -> "PipelineJob": + from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob + + return PipelineJob( + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + properties=self.properties, + # Filter None out to avoid case below failed with conflict keys check: + # group: None (user not specified) + # group.xx: 1 (user specified + inputs={k: v for k, v in self._job_inputs.items() if v}, + outputs=self._job_outputs, + component=self.component, + settings=self.settings, + ) + + def _customized_validate(self) -> MutableValidationResult: + """Check unsupported settings when use as a node. + + :return: The validation result + :rtype: MutableValidationResult + """ + # Note: settings is not supported on node, + # jobs.create_or_update(node) will call node._to_job() at first, + # thus won't reach here. + # pylint: disable=protected-access + from azure.ai.ml.entities import PipelineComponent + + validation_result = super(Pipeline, self)._customized_validate() + ignored_keys = PipelineComponent._check_ignored_keys(self) + if ignored_keys: + validation_result.append_warning(message=f"{ignored_keys} ignored on node {self.name!r}.") + if isinstance(self.component, PipelineComponent): + validation_result.merge_with(self.component._customized_validate()) + return validation_result + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj: Dict = super()._to_rest_object(**kwargs) + rest_obj.update( + convert_ordered_dict_to_dict( + { + "componentId": self._get_component_id(), + } + ) + ) + return rest_obj + + def _build_inputs(self) -> Dict: + inputs = super(Pipeline, self)._build_inputs() + built_inputs = {} + # Validate and remove non-specified inputs + for key, value in inputs.items(): + if value is not None: + built_inputs[key] = value + return built_inputs + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline.pipeline_component import PipelineSchema + + return PipelineSchema(context=context) + + def _copy_pipeline_component_out_setting_to_node(self) -> None: + """Copy pipeline component output's setting to node level.""" + from azure.ai.ml.entities import PipelineComponent + from azure.ai.ml.entities._job.pipeline._io import NodeOutput + + if not isinstance(self.component, PipelineComponent): + return + for key, val in self.component.outputs.items(): + node_output = cast(NodeOutput, self.outputs.get(key)) + copy_output_setting(source=val, target=node_output) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job": + raise NotImplementedError() |
