diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline')
13 files changed, 4199 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/__init__.py new file mode 100644 index 00000000..fdf8caba --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/__init__.py @@ -0,0 +1,5 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_attr_dict.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_attr_dict.py new file mode 100644 index 00000000..cf8d92be --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_attr_dict.py @@ -0,0 +1,161 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import logging +from abc import ABC +from typing import Any, Dict, Generic, List, Optional, TypeVar + +K = TypeVar("K") +V = TypeVar("V") + + +class _AttrDict(Generic[K, V], Dict, ABC): + """This class is used for accessing values with instance.some_key. It supports the following scenarios: + + 1. Setting arbitrary attribute, eg: obj.resource_layout.node_count = 2 + 1.1 Setting same nested filed twice will return same object, eg: + obj.resource_layout.node_count = 2 + obj.resource_layout.process_count_per_node = 2 + obj.resource_layout will be {"node_count": 2, "process_count_per_node": 2} + 1.2 Only public attribute is supported, eg: obj._resource_layout._node_count = 2 will raise AttributeError + 1.3 All set attribute can be recorded, eg: + obj.target = "aml" + obj.resource_layout.process_count_per_node = 2 + obj.get_attr() will return {"target": "aml", "resource_layout": {"process_count_per_node": 2}} + 2. Getting arbitrary attribute, getting non-exist attribute will return an empty dict. + 3. Calling arbitrary methods is not allowed, eg: obj.resource_layout() should raise AttributeError + """ + + def __init__(self, allowed_keys: Optional[Dict] = None, **kwargs: Any): + """Initialize a attribute dictionary. + + :param allowed_keys: A dictionary of keys that allowed to set as arbitrary attributes. None means all keys can + be set as arbitrary attributes. + + :type dict + :param kwargs: A dictionary of additional configuration parameters. + :type kwargs: dict + """ + super(_AttrDict, self).__init__(**kwargs) + if allowed_keys is None: + # None allowed_keys means no restriction on keys can be set for _AttrDict + self._allowed_keys = {} + self._key_restriction = False + else: + # Otherwise use allowed_keys to restrict keys can be set for _AttrDict + self._allowed_keys = dict(allowed_keys) + self._key_restriction = True + self._logger = logging.getLogger("attr_dict") + + def _initializing(self) -> bool: + # use this to indicate ongoing init process, sub class need to make sure this return True during init process. + return False + + def _get_attrs(self) -> dict: + """Get all arbitrary attributes which has been set, empty values are excluded. + + :return: A dict which contains all arbitrary attributes set by user. + :rtype: dict + """ + + # TODO: check this + def remove_empty_values(data: Dict) -> Dict: + if not isinstance(data, dict): + return data + # skip empty dicts as default value of _AttrDict is empty dict + return {k: remove_empty_values(v) for k, v in data.items() if v or not isinstance(v, dict)} + + return remove_empty_values(self) + + def _is_arbitrary_attr(self, attr_name: str) -> bool: + """Checks if a given attribute name should be treat as arbitrary attribute. + + Attributes inside _AttrDict can be non-arbitrary attribute or arbitrary attribute. + Non-arbitrary attributes are normal attributes like other object which stores in self.__dict__. + Arbitrary attributes are attributes stored in the dictionary it self, what makes it special it it's value + can be an instance of _AttrDict + Take `obj = _AttrDict(allowed_keys={"resource_layout": {"node_count": None}})` as an example. + `obj.some_key` is accessing non-arbitrary attribute. + `obj.resource_layout` is accessing arbitrary attribute, user can use `obj.resource_layout.node_count = 1` to + assign value to it. + + :param attr_name: Attribute name + :type attr_name: str + :return: If the given attribute name should be treated as arbitrary attribute. + :rtype: bool + """ + # Internal attribute won't be set as arbitrary attribute. + if attr_name.startswith("_"): + return False + # All attributes set in __init__ won't be set as arbitrary attribute + if self._initializing(): + return False + # If there's key restriction, only keys in it can be set as arbitrary attribute. + if self._key_restriction and attr_name not in self._allowed_keys: + return False + # Attributes already in attribute dict will not be set as arbitrary attribute. + try: + self.__getattribute__(attr_name) + except AttributeError: + return True + return False + + def __getattr__(self, key: Any) -> Any: + if not self._is_arbitrary_attr(key): + return super().__getattribute__(key) + self._logger.debug("getting %s", key) + try: + return super().__getitem__(key) + except KeyError: + allowed_keys = self._allowed_keys.get(key, None) if self._key_restriction else None + result: Any = _AttrDict(allowed_keys=allowed_keys) + self.__setattr__(key, result) + return result + + def __setattr__(self, key: Any, value: V) -> None: + if not self._is_arbitrary_attr(key): + super().__setattr__(key, value) + else: + self._logger.debug("setting %s to %s", key, value) + super().__setitem__(key, value) + + def __setitem__(self, key: Any, value: V) -> None: + self.__setattr__(key, value) + + def __getitem__(self, item: V) -> Any: + # support attr_dict[item] since dumping it in marshmallow requires this. + return self.__getattr__(item) + + def __dir__(self) -> List: + # For Jupyter Notebook auto-completion + return list(super().__dir__()) + list(self.keys()) + + +def has_attr_safe(obj: Any, attr: Any) -> bool: + if isinstance(obj, _AttrDict): + has_attr = not obj._is_arbitrary_attr(attr) + elif isinstance(obj, dict): + return attr in obj + else: + has_attr = hasattr(obj, attr) + return has_attr + + +def try_get_non_arbitrary_attr(obj: Any, attr: str) -> Optional[Any]: + """Try to get non-arbitrary attribute for potential attribute dict. + + Will not create target attribute if it is an arbitrary attribute in _AttrDict. + + :param obj: The obj + :type obj: Any + :param attr: The attribute name + :type attr: str + :return: obj.attr + :rtype: Any + """ + if has_attr_safe(obj, attr): + return obj[attr] if isinstance(obj, dict) else getattr(obj, attr) + return None diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_component_translatable.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_component_translatable.py new file mode 100644 index 00000000..22be939d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_component_translatable.py @@ -0,0 +1,412 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access, redefined-builtin +# disable redefined-builtin to use input as argument name +import re +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union + +from pydash import get + +from azure.ai.ml._utils.utils import is_data_binding_expression +from azure.ai.ml.constants._common import AssetTypes +from azure.ai.ml.constants._component import ComponentJobConstants +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job.pipeline._io import PipelineInput, PipelineOutput +from azure.ai.ml.entities._job.sweep.search_space import Choice, Randint, SweepDistribution +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, JobException + +# avoid circular import error +if TYPE_CHECKING: + from azure.ai.ml.entities._builders import BaseNode + from azure.ai.ml.entities._component.component import Component + + +class ComponentTranslatableMixin: + _PYTHON_SDK_TYPE_MAPPING = { + float: "number", + int: "integer", + bool: "boolean", + str: "string", + } + + @classmethod + def _find_source_from_parent_inputs(cls, input: str, pipeline_job_inputs: dict) -> Tuple[str, Optional[str]]: + """Find source type and mode of input/output from parent input. + + :param input: The input name + :type input: str + :param pipeline_job_inputs: The pipeline job inputs + :type pipeline_job_inputs: dict + :return: A 2-tuple of the type and the mode + :rtype: Tuple[str, Optional[str]] + """ + _input_name = input.split(".")[2][:-2] + if _input_name not in pipeline_job_inputs.keys(): + msg = "Failed to find top level definition for input binding {}." + raise JobException( + message=msg.format(input), + no_personal_data_message=msg.format("[input]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + input_data = pipeline_job_inputs[_input_name] + input_type = type(input_data) + if input_type in cls._PYTHON_SDK_TYPE_MAPPING: + return cls._PYTHON_SDK_TYPE_MAPPING[input_type], None + return getattr(input_data, "type", AssetTypes.URI_FOLDER), getattr(input_data, "mode", None) + + @classmethod + def _find_source_from_parent_outputs(cls, input: str, pipeline_job_outputs: dict) -> Tuple[str, Optional[str]]: + """Find source type and mode of input/output from parent output. + + :param input: The input name + :type input: str + :param pipeline_job_outputs: The pipeline job outputs + :type pipeline_job_outputs: dict + :return: A 2-tuple of the type and the mode + :rtype: Tuple[str, Optional[str]] + """ + _output_name = input.split(".")[2][:-2] + if _output_name not in pipeline_job_outputs.keys(): + msg = "Failed to find top level definition for output binding {}." + raise JobException( + message=msg.format(input), + no_personal_data_message=msg.format("[input]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + output_data = pipeline_job_outputs[_output_name] + output_type = type(output_data) + if output_type in cls._PYTHON_SDK_TYPE_MAPPING: + return cls._PYTHON_SDK_TYPE_MAPPING[output_type], None + if isinstance(output_data, dict): + if "type" in output_data: + output_data_type = output_data["type"] + else: + output_data_type = AssetTypes.URI_FOLDER + if "mode" in output_data: + output_data_mode = output_data["mode"] + else: + output_data_mode = None + return output_data_type, output_data_mode + return getattr(output_data, "type", AssetTypes.URI_FOLDER), getattr(output_data, "mode", None) + + @classmethod + def _find_source_from_other_jobs( + cls, input: str, jobs_dict: dict, pipeline_job_dict: dict + ) -> Tuple[str, Optional[str]]: + """Find source type and mode of input/output from other job. + + :param input: The input name + :type input: str + :param jobs_dict: The job dict + :type jobs_dict: + :param pipeline_job_dict: The pipeline job dict + :type pipeline_job_dict: dict + :return: A 2-tuple of the type and the mode + :rtype: Tuple[str, Optional[str]] + """ + from azure.ai.ml.entities import CommandJob + from azure.ai.ml.entities._builders import BaseNode + from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob + from azure.ai.ml.parallel import ParallelJob + + _input_regex = r"\${{parent.jobs.([^.]+).([^.]+).([^.]+)}}" + m = re.match(_input_regex, input) + if m is None: + msg = "Failed to find top level definition for job binding {}." + raise JobException( + message=msg.format(input), + no_personal_data_message=msg.format("[input]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + _input_job_name, _io_type, _name = m.groups() + _input_job = jobs_dict[_input_job_name] + + # we only support input of one job is from output of another output, but input mode should be decoupled with + # output mode, so we always return None source_mode + source_mode = None + if isinstance(_input_job, BaseNode): + # If source is base node, get type from io builder + _source = _input_job[_io_type][_name] + try: + source_type = _source.type + # Todo: get component type for registered component, and no need following codes + # source_type is None means _input_job's component is registered component which results in its + # input/output type is None. + if source_type is None: + if _source._data is None: + # return default type if _input_job's output data is None + source_type = AssetTypes.URI_FOLDER + elif isinstance(_source._data, Output): + # if _input_job data is a Output object and we return its type. + source_type = _source._data.type + else: + # otherwise _input_job's input/output is bound to pipeline input/output, we continue + # infer the type according to _source._data. Will return corresponding pipeline + # input/output type because we didn't get the component. + source_type, _ = cls._find_source_input_output_type(_source._data, pipeline_job_dict) + return source_type, source_mode + except AttributeError as e: + msg = "Failed to get referenced component type {}." + raise JobException( + message=msg.format(_input_regex), + no_personal_data_message=msg.format("[_input_regex]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) from e + if isinstance(_input_job, (CommandJob, ParallelJob)): + # If source has not parsed to Command yet, infer type + _source = get(_input_job, f"{_io_type}.{_name}") + if isinstance(_source, str): + source_type, _ = cls._find_source_input_output_type(_source, pipeline_job_dict) + return source_type, source_mode + return getattr(_source, "type", AssetTypes.URI_FOLDER), source_mode + if isinstance(_input_job, AutoMLJob): + # If source is AutoMLJob, only outputs is supported + if _io_type != "outputs": + msg = f"Only binding to AutoMLJob output is supported, currently got {_io_type}" + raise JobException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + # AutoMLJob's output type can only be MLTABLE + return AssetTypes.MLTABLE, source_mode + msg = f"Unknown referenced source job type: {type(_input_job)}." + raise JobException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + + @classmethod + def _find_source_input_output_type(cls, input: str, pipeline_job_dict: dict) -> Tuple[str, Optional[str]]: + """Find source type and mode of input/output. + + :param input: The input binding + :type input: str + :param pipeline_job_dict: The pipeline job dict + :type pipeline_job_dict: dict + :return: A 2-tuple of the type and the mode + :rtype: Tuple[str, Optional[str]] + """ + pipeline_job_inputs = pipeline_job_dict.get("inputs", {}) + pipeline_job_outputs = pipeline_job_dict.get("outputs", {}) + jobs_dict = pipeline_job_dict.get("jobs", {}) + if is_data_binding_expression(input, ["parent", "inputs"]): + return cls._find_source_from_parent_inputs(input, pipeline_job_inputs) + if is_data_binding_expression(input, ["parent", "outputs"]): + return cls._find_source_from_parent_outputs(input, pipeline_job_outputs) + if is_data_binding_expression(input, ["parent", "jobs"]): + try: + return cls._find_source_from_other_jobs(input, jobs_dict, pipeline_job_dict) + except JobException as e: + raise e + except Exception as e: + msg = "Failed to find referenced source for input binding {}" + raise JobException( + message=msg.format(input), + no_personal_data_message=msg.format("[input]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.SYSTEM_ERROR, + ) from e + else: + msg = "Job input in a pipeline can bind only to a job output or a pipeline input" + raise JobException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + + @classmethod + def _to_input( + cls, # pylint: disable=unused-argument + input: Union[Input, str, bool, int, float], + pipeline_job_dict: Optional[dict] = None, + **kwargs: Any, + ) -> Input: + """Convert a single job input value to component input. + + :param input: The input + :type input: Union[Input, str, bool, int, float] + :param pipeline_job_dict: The pipeline job dict + :type pipeline_job_dict: Optional[dict] + :return: The Component Input + :rtype: Input + """ + pipeline_job_dict = pipeline_job_dict or {} + input_variable: Dict = {} + + if isinstance(input, str) and bool(re.search(ComponentJobConstants.INPUT_PATTERN, input)): + # handle input bindings + input_variable["type"], input_variable["mode"] = cls._find_source_input_output_type( + input, pipeline_job_dict + ) + + elif isinstance(input, Input): + input_variable = input._to_dict() + elif isinstance(input, SweepDistribution): + if isinstance(input, Choice): + if input.values is not None: + input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(input.values[0])] + elif isinstance(input, Randint): + input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[int] + else: + input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[float] + + input_variable["optional"] = False + elif type(input) in cls._PYTHON_SDK_TYPE_MAPPING: + input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(input)] + input_variable["default"] = input + elif isinstance(input, PipelineInput): + # Infer input type from input data + input_variable = input._to_input()._to_dict() + else: + msg = "'{}' is not supported as component input, supported types are '{}'.".format( + type(input), cls._PYTHON_SDK_TYPE_MAPPING.keys() + ) + raise JobException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + return Input(**input_variable) + + @classmethod + def _to_input_builder_function(cls, input: Union[Dict, SweepDistribution, Input, str, bool, int, float]) -> Input: + input_variable = {} + + if isinstance(input, Input): + input_variable = input._to_dict() + elif isinstance(input, SweepDistribution): + if isinstance(input, Choice): + if input.values is not None: + input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(input.values[0])] + elif isinstance(input, Randint): + input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[int] + else: + input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[float] + + input_variable["optional"] = False + else: + input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(input)] + input_variable["default"] = input + return Input(**input_variable) + + @classmethod + def _to_output( + cls, # pylint: disable=unused-argument + output: Optional[Union[Output, Dict, str, bool, int, float]], + pipeline_job_dict: Optional[dict] = None, + **kwargs: Any, + ) -> Output: + """Translate output value to Output and infer component output type + from linked pipeline output, its original type or default type. + + :param output: The output + :type output: Union[Output, str, bool, int, float] + :param pipeline_job_dict: The pipeline job dict + :type pipeline_job_dict: Optional[dict] + :return: The output object + :rtype: Output + """ + pipeline_job_dict = pipeline_job_dict or {} + output_type = None + if not pipeline_job_dict or output is None: + try: + output_type = output.type # type: ignore + except AttributeError: + # default to url_folder if failed to get type + output_type = AssetTypes.URI_FOLDER + output_variable = {"type": output_type} + return Output(**output_variable) + output_variable = {} + + if isinstance(output, str) and bool(re.search(ComponentJobConstants.OUTPUT_PATTERN, output)): + # handle output bindings + output_variable["type"], output_variable["mode"] = cls._find_source_input_output_type( + output, pipeline_job_dict + ) + + elif isinstance(output, Output): + output_variable = output._to_dict() + + elif isinstance(output, PipelineOutput): + output_variable = output._to_output()._to_dict() + + elif type(output) in cls._PYTHON_SDK_TYPE_MAPPING: + output_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(output)] + output_variable["default"] = output + else: + msg = "'{}' is not supported as component output, supported types are '{}'.".format( + type(output), cls._PYTHON_SDK_TYPE_MAPPING.keys() + ) + raise JobException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + return Output(**output_variable) + + def _to_inputs(self, inputs: Optional[Dict], **kwargs: Any) -> Dict: + """Translate inputs to Inputs. + + :param inputs: mapping from input name to input object. + :type inputs: Dict[str, Union[Input, str, bool, int, float]] + :return: mapping from input name to translated component input. + :rtype: Dict[str, Input] + """ + pipeline_job_dict = kwargs.get("pipeline_job_dict", {}) + translated_component_inputs = {} + if inputs is not None: + for io_name, io_value in inputs.items(): + translated_component_inputs[io_name] = self._to_input(io_value, pipeline_job_dict) + return translated_component_inputs + + def _to_outputs(self, outputs: Optional[Dict], **kwargs: Any) -> Dict: + """Translate outputs to Outputs. + + :param outputs: mapping from output name to output object. + :type outputs: Dict[str, Output] + :return: mapping from output name to translated component output. + :rtype: Dict[str, Output] + """ + # Translate outputs to Outputs. + pipeline_job_dict = kwargs.get("pipeline_job_dict", {}) + translated_component_outputs = {} + if outputs is not None: + for output_name, output_value in outputs.items(): + translated_component_outputs[output_name] = self._to_output(output_value, pipeline_job_dict) + return translated_component_outputs + + def _to_component(self, context: Optional[Dict] = None, **kwargs: Any) -> Union["Component", str]: + """Translate to Component. + + :param context: The context + :type context: Optional[context] + :return: Translated Component. + :rtype: Component + """ + # Note: Source of translated component should be same with Job + # And should be set after called _to_component/_to_node as job has no _source now. + raise NotImplementedError() + + def _to_node(self, context: Optional[Dict] = None, **kwargs: Any) -> "BaseNode": + """Translate to pipeline node. + + :param context: The context + :type context: Optional[context] + :return: Translated node. + :rtype: BaseNode + """ + # Note: Source of translated component should be same with Job + # And should be set after called _to_component/_to_node as job has no _source now. + raise NotImplementedError() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/__init__.py new file mode 100644 index 00000000..3ccde947 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/__init__.py @@ -0,0 +1,21 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +"""Classes in this package converts input & output set by user to pipeline job input & output.""" + +from .attr_dict import OutputsAttrDict, _GroupAttrDict +from .base import InputOutputBase, NodeInput, NodeOutput, PipelineInput, PipelineOutput +from .mixin import AutoMLNodeIOMixin, NodeWithGroupInputMixin, PipelineJobIOMixin + +__all__ = [ + "PipelineOutput", + "PipelineInput", + "NodeOutput", + "NodeInput", + "InputOutputBase", + "OutputsAttrDict", + "_GroupAttrDict", + "NodeWithGroupInputMixin", + "AutoMLNodeIOMixin", + "PipelineJobIOMixin", +] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/attr_dict.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/attr_dict.py new file mode 100644 index 00000000..0ae08bcd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/attr_dict.py @@ -0,0 +1,170 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from enum import Enum +from typing import Any, Dict, List, Optional, Union + +from azure.ai.ml.entities._assets import Data +from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output +from azure.ai.ml.entities._job.pipeline._attr_dict import K +from azure.ai.ml.entities._job.pipeline._io.base import NodeInput, NodeOutput, PipelineInput +from azure.ai.ml.exceptions import ( + ErrorCategory, + ErrorTarget, + UnexpectedAttributeError, + UnexpectedKeywordError, + ValidationException, +) + + +class InputsAttrDict(dict): + def __init__(self, inputs: dict, **kwargs: Any): + self._validate_inputs(inputs) + super(InputsAttrDict, self).__init__(**inputs, **kwargs) + + @classmethod + def _validate_inputs(cls, inputs: Any) -> None: + msg = "Pipeline/component input should be a \ + azure.ai.ml.entities._job.pipeline._io.NodeInput with owner, got {}." + for val in inputs.values(): + if isinstance(val, NodeInput) and val._owner is not None: # pylint: disable=protected-access + continue + if isinstance(val, _GroupAttrDict): + continue + raise ValidationException( + message=msg.format(val), + no_personal_data_message=msg.format("[val]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + + def __setattr__( + self, + key: str, + value: Union[int, bool, float, str, NodeOutput, PipelineInput, Input], + ) -> None: + # Extract enum value. + value = value.value if isinstance(value, Enum) else value + original_input = self.__getattr__(key) # Note that an exception will be raised if the keyword is invalid. + if isinstance(original_input, _GroupAttrDict) or isinstance(value, _GroupAttrDict): + # Set the value directly if is parameter group. + self._set_group_with_type_check(key, GroupInput.custom_class_value_to_attr_dict(value)) + return + original_input._data = original_input._build_data(value) + + def _set_group_with_type_check(self, key: Any, value: Any) -> None: + msg = "{!r} is expected to be a parameter group, but got {}." + if not isinstance(value, _GroupAttrDict): + raise ValidationException( + message=msg.format(key, type(value)), + no_personal_data_message=msg.format("[key]", "[value_type]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + self.__setitem__(key, GroupInput.custom_class_value_to_attr_dict(value)) + + def __getattr__(self, item: Any) -> NodeInput: + res: NodeInput = self.__getitem__(item) + return res + + +class _GroupAttrDict(InputsAttrDict): + """This class is used for accessing values with instance.some_key.""" + + @classmethod + def _validate_inputs(cls, inputs: Any) -> None: + msg = "Pipeline/component input should be a azure.ai.ml.entities._job.pipeline._io.NodeInput, got {}." + for val in inputs.values(): + if isinstance(val, NodeInput) and val._owner is not None: # pylint: disable=protected-access + continue + if isinstance(val, _GroupAttrDict): + continue + # Allow PipelineInput as Group may appear at top level pipeline input. + if isinstance(val, PipelineInput): + continue + raise ValidationException( + message=msg.format(val), + no_personal_data_message=msg.format("[val]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + + def __getattr__(self, name: K) -> Any: + if name not in self: + raise UnexpectedAttributeError(keyword=name, keywords=list(self)) + return super().__getitem__(name) + + def __getitem__(self, item: K) -> Any: + # We raise this exception instead of KeyError + if item not in self: + raise UnexpectedKeywordError(func_name="ParameterGroup", keyword=item, keywords=list(self)) + return super().__getitem__(item) + + # For Jupyter Notebook auto-completion + def __dir__(self) -> List: + return list(super().__dir__()) + list(self.keys()) + + def flatten(self, group_parameter_name: Optional[str]) -> Dict: + # Return the flattened result of self + + group_parameter_name = group_parameter_name if group_parameter_name else "" + flattened_parameters = {} + msg = "'%s' in parameter group should be a azure.ai.ml.entities._job._io.NodeInput, got '%s'." + for k, v in self.items(): + flattened_name = ".".join([group_parameter_name, k]) + if isinstance(v, _GroupAttrDict): + flattened_parameters.update(v.flatten(flattened_name)) + elif isinstance(v, NodeInput): + flattened_parameters[flattened_name] = v._to_job_input() # pylint: disable=protected-access + else: + raise ValidationException( + message=msg % (flattened_name, type(v)), + no_personal_data_message=msg % ("name", "type"), + target=ErrorTarget.PIPELINE, + ) + return flattened_parameters + + def insert_group_name_for_items(self, group_name: Any) -> None: + # Insert one group name for all items. + for v in self.values(): + if isinstance(v, _GroupAttrDict): + v.insert_group_name_for_items(group_name) + elif isinstance(v, PipelineInput): + # Insert group names for pipeline input + v._group_names = [group_name] + v._group_names # pylint: disable=protected-access + + +class OutputsAttrDict(dict): + def __init__(self, outputs: dict, **kwargs: Any): + for val in outputs.values(): + if not isinstance(val, NodeOutput) or val._owner is None: + msg = "Pipeline/component output should be a azure.ai.ml.dsl.Output with owner, got {}." + raise ValidationException( + message=msg.format(val), + no_personal_data_message=msg.format("[val]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + super(OutputsAttrDict, self).__init__(**outputs, **kwargs) + + def __getattr__(self, item: Any) -> NodeOutput: + return self.__getitem__(item) + + def __getitem__(self, item: Any) -> NodeOutput: + if item not in self: + # We raise this exception instead of KeyError as OutputsAttrDict doesn't support add new item after + # __init__. + raise UnexpectedAttributeError(keyword=item, keywords=list(self)) + res: NodeOutput = super().__getitem__(item) + return res + + def __setattr__(self, key: str, value: Union[Data, Output]) -> None: + if isinstance(value, Output): + mode = value.mode + value = Output(type=value.type, path=value.path, mode=mode, name=value.name, version=value.version) + original_output = self.__getattr__(key) # Note that an exception will be raised if the keyword is invalid. + original_output._data = original_output._build_data(value) + + def __setitem__(self, key: str, value: Output) -> None: + return self.__setattr__(key, value) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/base.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/base.py new file mode 100644 index 00000000..b17972ae --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/base.py @@ -0,0 +1,848 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import copy +import re +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, TypeVar, Union, cast, overload + +from azure.ai.ml._utils.utils import is_data_binding_expression +from azure.ai.ml.constants import AssetTypes +from azure.ai.ml.constants._component import IOConstants +from azure.ai.ml.entities._assets._artifacts.data import Data +from azure.ai.ml.entities._assets._artifacts.model import Model +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpressionMixin +from azure.ai.ml.entities._util import resolve_pipeline_parameter +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, UserErrorException, ValidationException + +# avoid circular import error +if TYPE_CHECKING: + from azure.ai.ml.entities import PipelineJob + from azure.ai.ml.entities._builders import BaseNode + +T = TypeVar("T") + + +def _build_data_binding(data: Union[str, "PipelineInput", "Output"]) -> Union[str, Output]: + """Build input builders to data bindings. + + :param data: The data to build a data binding from + :type data: Union[str, PipelineInput, Output] + :return: A data binding string if data isn't a str, otherwise data + :rtype: str + """ + result: Union[str, Output] = "" + + if isinstance(data, (InputOutputBase)): + # Build data binding when data is PipelineInput, Output + result = data._data_binding() + else: + # Otherwise just return the data + result = data + return result + + +def _resolve_builders_2_data_bindings( + data: Union[list, dict, str, "PipelineInput", "Output"] +) -> Union[dict, list, str, Output]: + """Traverse data and build input builders inside it to data bindings. + + :param data: The bindings to resolve + :type data: Union[list, dict, str, "PipelineInput", "Output"] + :return: + * A dict if data was a dict + * A list if data was a list + * A str otherwise + :rtype: Union[list, dict, str] + """ + if isinstance(data, dict): + for key, val in data.items(): + if isinstance(val, (dict, list)): + data[key] = _resolve_builders_2_data_bindings(val) + else: + data[key] = _build_data_binding(val) + return data + if isinstance(data, list): + resolved_data = [] + for val in data: + resolved_data.append(_resolve_builders_2_data_bindings(val)) + return resolved_data + return _build_data_binding(data) + + +def _data_to_input(data: Union[Data, Model]) -> Input: + """Convert a Data object to an Input object. + + :param data: The data to convert + :type data: Data + :return: The Input object + :rtype: Input + """ + if data.id: + return Input(type=data.type, path=data.id) + return Input(type=data.type, path=f"{data.name}:{data.version}") + + +class InputOutputBase(ABC): + # TODO: refine this code, always use _data to store builder level settings and use _meta to store definition + # TODO: when _data missing, return value from _meta + + def __init__( + self, + meta: Optional[Union[Input, Output]], + data: Optional[Union[int, bool, float, str, Input, Output, "PipelineInput"]], + default_data: Optional[Union[int, bool, float, str, Input, Output]] = None, + **kwargs: Any, + ): + """Base class of input & output. + + :param meta: Metadata of this input/output, eg: type, min, max, etc. + :type meta: Union[Input, Output] + :param data: Actual value of input/output, None means un-configured data. + :type data: Union[None, int, bool, float, str, + azure.ai.ml.Input, + azure.ai.ml.Output] + :param default_data: default value of input/output, None means un-configured data. + :type default_data: Union[None, int, bool, float, str, + azure.ai.ml.Input, + azure.ai.ml.Output] + """ + self._meta = meta + self._original_data = data + self._data: Any = self._build_data(data) + self._default_data = default_data + self._type: str = meta.type if meta is not None else kwargs.pop("type", None) + self._mode = self._get_mode(original_data=data, data=self._data, kwargs=kwargs) + self._description = ( + self._data.description + if self._data is not None and hasattr(self._data, "description") and self._data.description + else kwargs.pop("description", None) + ) + # TODO: remove this + self._attribute_map: Dict = {} + self._name: Optional[str] = "" + self._version: Optional[str] = "" + super(InputOutputBase, self).__init__(**kwargs) + + @abstractmethod + def _build_data(self, data: T) -> Union[T, str, Input, "InputOutputBase"]: + """Validate if data matches type and translate it to Input/Output acceptable type. + + :param data: The data + :type data: T + :return: The built data + :rtype: Union[T, str, Input, InputOutputBase] + """ + + @abstractmethod + def _build_default_data(self) -> None: + """Build default data when data not configured.""" + + @property + def type(self) -> str: + """Type of input/output. + + :return: The type + :rtype: str + """ + return self._type + + @type.setter + def type(self, type: Any) -> None: # pylint: disable=redefined-builtin + # For un-configured input/output, we build a default data entry for them. + self._build_default_data() + self._type = type + if isinstance(self._data, (Input, Output)): + self._data.type = type + elif self._data is not None and not isinstance( + self._data, (int, float, str) + ): # when type of self._data is InputOutputBase or its child class + self._data._type = type + + @property + def mode(self) -> Optional[str]: + return self._mode + + @mode.setter + def mode(self, mode: Optional[str]) -> None: + # For un-configured input/output, we build a default data entry for them. + self._build_default_data() + self._mode = mode + if isinstance(self._data, (Input, Output)): + self._data.mode = mode + elif self._data is not None and not isinstance(self._data, (int, float, str)): + self._data._mode = mode + + @property + def description(self) -> Any: + return self._description + + @description.setter + def description(self, description: str) -> None: + # For un-configured input/output, we build a default data entry for them. + self._build_default_data() + self._description = description + if isinstance(self._data, (Input, Output)): + self._data.description = description + elif self._data is not None and not isinstance(self._data, (int, float, str)): + self._data._description = description + + @property + def path(self) -> Optional[str]: + # This property is introduced for static intellisense. + if hasattr(self._data, "path"): + if self._data is not None and not isinstance(self._data, (int, float, str)): + res: Optional[str] = self._data.path + return res + msg = f"{type(self._data)} does not have path." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + + @path.setter + def path(self, path: str) -> None: + # For un-configured input/output, we build a default data entry for them. + self._build_default_data() + if hasattr(self._data, "path"): + if self._data is not None and not isinstance(self._data, (int, float, str)): + self._data.path = path + else: + msg = f"{type(self._data)} does not support setting path." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + + def _data_binding(self) -> str: + """Return data binding string representation for this input/output. + + :return: The data binding string + :rtype: str + """ + raise NotImplementedError() + + # Why did we have this function? It prevents the DictMixin from being applied. + # Unclear if we explicitly do NOT want the mapping protocol to be applied to this, or it this was just + # confirmation that it didn't at the time. + def keys(self) -> None: + # This property is introduced to raise catchable Exception in marshmallow mapping validation trial. + raise TypeError(f"'{type(self).__name__}' object is not a mapping") + + def __str__(self) -> str: + try: + return self._data_binding() + except AttributeError: + return super(InputOutputBase, self).__str__() + + def __hash__(self) -> int: + return id(self) + + @classmethod + def _get_mode( + cls, + original_data: Optional[Union[int, bool, float, str, Input, Output, "PipelineInput"]], + data: Optional[Union[int, bool, float, str, Input, Output]], + kwargs: dict, + ) -> Optional[str]: + """Get mode of this input/output builder. + + :param original_data: Original value of input/output. + :type original_data: Union[None, int, bool, float, str + azure.ai.ml.Input, + azure.ai.ml.Output, + azure.ai.ml.entities._job.pipeline._io.PipelineInput] + :param data: Built input/output data. + :type data: Union[None, int, bool, float, str + azure.ai.ml.Input, + azure.ai.ml.Output] + :param kwargs: The kwargs + :type kwargs: Dict + :return: The mode + :rtype: Optional[str] + """ + # pipeline level inputs won't pass mode to bound node level inputs + if isinstance(original_data, PipelineInput): + return None + return data.mode if data is not None and hasattr(data, "mode") else kwargs.pop("mode", None) + + @property + def _is_primitive_type(self) -> bool: + return self.type in IOConstants.PRIMITIVE_STR_2_TYPE + + +class NodeInput(InputOutputBase): + """Define one input of a Component.""" + + def __init__( + self, + port_name: str, + meta: Optional[Input], + *, + data: Optional[Union[int, bool, float, str, Output, "PipelineInput", Input]] = None, + # TODO: Bug Item number: 2883405 + owner: Optional[Union["BaseComponent", "PipelineJob"]] = None, # type: ignore + **kwargs: Any, + ): + """Initialize an input of a component. + + :param name: The name of the input. + :type name: str + :param meta: Metadata of this input, eg: type, min, max, etc. + :type meta: Input + :param data: The input data. Valid types include int, bool, float, str, + Output of another component or pipeline input and Input. + Note that the output of another component or pipeline input associated should be reachable in the scope + of current pipeline. Input is introduced to support case like + TODO: new examples + component.inputs.xxx = Input(path="arm_id") + :type data: Union[int, bool, float, str + azure.ai.ml.Output, + azure.ai.ml.Input] + :param owner: The owner component of the input, used to calculate binding. + :type owner: Union[azure.ai.ml.entities.BaseNode, azure.ai.ml.entities.PipelineJob] + :param kwargs: A dictionary of additional configuration parameters. + :type kwargs: dict + """ + # TODO: validate data matches type in meta + # TODO: validate supported data + self._port_name = port_name + self._owner = owner + super().__init__(meta=meta, data=data, **kwargs) + + def _build_default_data(self) -> None: + """Build default data when input not configured.""" + if self._data is None: + self._data = Input() + + def _build_data(self, data: T) -> Union[T, str, Input, InputOutputBase]: + """Build input data according to assigned input + + eg: node.inputs.key = data + + :param data: The data + :type data: T + :return: The built data + :rtype: Union[T, str, Input, "PipelineInput", "NodeOutput"] + """ + _data: Union[T, str, NodeOutput] = resolve_pipeline_parameter(data) + if _data is None: + return _data + # Unidiomatic typecheck: Checks that data is _exactly_ this type, and not potentially a subtype + if type(_data) is NodeInput: # pylint: disable=unidiomatic-typecheck + msg = "Can not bind input to another component's input." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + if isinstance(_data, (PipelineInput, NodeOutput)): + # If value is input or output, it's a data binding, we require it have a owner so we can convert it to + # a data binding, eg: ${{inputs.xxx}} + if isinstance(_data, NodeOutput) and _data._owner is None: + msg = "Setting input binding {} to output without owner is not allowed." + raise ValidationException( + message=msg.format(_data), + no_personal_data_message=msg.format("[_data]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + return _data + # for data binding case, set is_singular=False for case like "${{parent.inputs.job_in_folder}}/sample1.csv" + if isinstance(_data, Input) or is_data_binding_expression(_data, is_singular=False): + return _data + if isinstance(_data, (Data, Model)): + return _data_to_input(_data) + # self._meta.type could be None when sub pipeline has no annotation + if isinstance(self._meta, Input) and self._meta.type and not self._meta._is_primitive_type: + if isinstance(_data, str): + return Input(type=self._meta.type, path=_data) + msg = "only path input is supported now but get {}: {}." + raise UserErrorException( + message=msg.format(type(_data), _data), + no_personal_data_message=msg.format(type(_data), "[_data]"), + ) + return _data + + def _to_job_input(self) -> Optional[Union[Input, str, Output]]: + """convert the input to Input, this logic will change if backend contract changes.""" + result: Optional[Union[Input, str, Output]] = None + + if self._data is None: + # None data means this input is not configured. + result = None + elif isinstance(self._data, (PipelineInput, NodeOutput)): + # Build data binding when data is PipelineInput, Output + result = Input(path=self._data._data_binding(), mode=self.mode) + elif is_data_binding_expression(self._data): + result = Input(path=self._data, mode=self.mode) + else: + data_binding = _build_data_binding(self._data) + if is_data_binding_expression(self._data): + result = Input(path=data_binding, mode=self.mode) + else: + result = data_binding + # TODO: validate is self._data is supported + + return result + + def _data_binding(self) -> str: + msg = "Input binding {} can only come from a pipeline, currently got {}" + # call type(self._owner) to avoid circular import + raise ValidationException( + message=msg.format(self._port_name, type(self._owner)), + target=ErrorTarget.PIPELINE, + no_personal_data_message=msg.format("[port_name]", "[owner]"), + error_category=ErrorCategory.USER_ERROR, + ) + + def _copy(self, owner: Any) -> "NodeInput": + return NodeInput( + port_name=self._port_name, + data=self._data, + owner=owner, + meta=cast(Input, self._meta), + ) + + def _deepcopy(self) -> "NodeInput": + return NodeInput( + port_name=self._port_name, + data=copy.copy(self._data), + owner=self._owner, + meta=cast(Input, self._meta), + ) + + def _get_data_owner(self) -> Optional["BaseNode"]: + """Gets the data owner of the node + + Note: This only works for @pipeline, not for YAML pipeline. + + Note: Inner step will be returned as the owner when node's input is from sub pipeline's output. + @pipeline + def sub_pipeline(): + inner_node = component_func() + return inner_node.outputs + + @pipeline + def root_pipeline(): + pipeline_node = sub_pipeline() + node = copy_files_component_func(input_dir=pipeline_node.outputs.output_dir) + owner = node.inputs.input_dir._get_data_owner() + assert owner == pipeline_node.nodes[0] + + :return: The node if Input is from another node's output. Returns None for literal value. + :rtype: Optional[BaseNode] + """ + from azure.ai.ml.entities import Pipeline + from azure.ai.ml.entities._builders import BaseNode + + def _resolve_data_owner(data: Any) -> Optional["BaseNode"]: + if isinstance(data, BaseNode) and not isinstance(data, Pipeline): + return data + while isinstance(data, PipelineInput): + # for pipeline input, it's original value(can be literal value or another node's output) + # is stored in _original_data + return _resolve_data_owner(data._original_data) + if isinstance(data, NodeOutput): + if isinstance(data._owner, Pipeline): + # for input from subgraph's output, trace back to inner node + return _resolve_data_owner(data._binding_output) + # for input from another node's output, return the node + return _resolve_data_owner(data._owner) + return None + + return _resolve_data_owner(self._data) + + +class NodeOutput(InputOutputBase, PipelineExpressionMixin): + """Define one output of a Component.""" + + def __init__( + self, + port_name: str, + meta: Optional[Union[Input, Output]], + *, + data: Optional[Union[Output, str]] = None, + # TODO: Bug Item number: 2883405 + owner: Optional[Union["BaseComponent", "PipelineJob"]] = None, # type: ignore + binding_output: Optional["NodeOutput"] = None, + **kwargs: Any, + ): + """Initialize an Output of a component. + + :param port_name: The port_name of the output. + :type port_name: str + :param name: The name used to register NodeOutput/PipelineOutput data. + :type name: str + :param version: The version used to register NodeOutput/PipelineOutput data. + :ype version: str + :param data: The output data. Valid types include str, Output + :type data: Union[str + azure.ai.ml.entities.Output] + :param mode: The mode of the output. + :type mode: str + :param owner: The owner component of the output, used to calculate binding. + :type owner: Union[azure.ai.ml.entities.BaseNode, azure.ai.ml.entities.PipelineJob] + :param binding_output: The node output bound to pipeline output, only available for pipeline. + :type binding_output: azure.ai.ml.entities.NodeOutput + :param kwargs: A dictionary of additional configuration parameters. + :type kwargs: dict + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if object cannot be successfully validated. + Details will be provided in the error message. + """ + # Allow inline output binding with string, eg: "component_out_path_1": "${{parents.outputs.job_out_data_1}}" + if data is not None and not isinstance(data, (Output, str)): + msg = "Got unexpected type for output: {}." + raise ValidationException( + message=msg.format(data), + target=ErrorTarget.PIPELINE, + no_personal_data_message=msg.format("[data]"), + ) + super().__init__(meta=meta, data=data, **kwargs) + self._port_name = port_name + self._owner = owner + self._name: Optional[str] = self._data.name if isinstance(self._data, Output) else None + self._version: Optional[str] = self._data.version if isinstance(self._data, Output) else None + + self._assert_name_and_version() + + # store original node output to be able to trace back to inner node from a pipeline output builder. + self._binding_output = binding_output + + @property + def port_name(self) -> str: + """The output port name, eg: node.outputs.port_name. + + :return: The port name + :rtype: str + """ + return self._port_name + + @property + def name(self) -> Optional[str]: + """Used in registering output data. + + :return: The output name + :rtype: str + """ + return self._name + + @name.setter + def name(self, name: str) -> None: + """Assigns the name to NodeOutput/PipelineOutput and builds data according to the name. + + :param name: The new name + :type name: str + """ + self._build_default_data() + self._name = name + if isinstance(self._data, Output): + self._data.name = name + elif isinstance(self._data, InputOutputBase): + self._data._name = name + else: + raise UserErrorException( + f"We support self._data of Input, Output, InputOutputBase, NodeOutput and NodeInput," + f"but got type: {type(self._data)}." + ) + + @property + def version(self) -> Optional[str]: + """Used in registering output data. + + :return: The output data + :rtype: str + """ + return self._version + + @version.setter + def version(self, version: str) -> None: + """Assigns the version to NodeOutput/PipelineOutput and builds data according to the version. + + :param version: The new version + :type version: str + """ + self._build_default_data() + self._version = version + if isinstance(self._data, Output): + self._data.version = version + elif isinstance(self._data, InputOutputBase): + self._data._version = version + else: + raise UserErrorException( + f"We support self._data of Input, Output, InputOutputBase, NodeOutput and NodeInput," + f"but got type: {type(self._data)}." + ) + + @property + def path(self) -> Any: + # For node output path, + if self._data is not None and hasattr(self._data, "path"): + return self._data.path + return None + + @path.setter + def path(self, path: Optional[str]) -> None: + # For un-configured output, we build a default data entry for them. + self._build_default_data() + if self._data is not None and hasattr(self._data, "path"): + self._data.path = path + else: + # YAML job will have string output binding and do not support setting path for it. + msg = f"{type(self._data)} does not support setting path." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + + def _assert_name_and_version(self) -> None: + if self.name and not (re.match("^[A-Za-z0-9_-]*$", self.name) and len(self.name) <= 255): + raise UserErrorException( + f"The output name {self.name} can only contain alphanumeric characters, dashes and underscores, " + f"with a limit of 255 characters." + ) + if self.version and not self.name: + raise UserErrorException("Output name is required when output version is specified.") + + def _build_default_data(self) -> None: + """Build default data when output not configured.""" + if self._data is None: + # _meta will be None when node._component is not a Component object + # so we just leave the type inference work to backend + self._data = Output(type=None) # type: ignore[call-overload] + + def _build_data(self, data: T) -> Any: + """Build output data according to assigned input, eg: node.outputs.key = data + + :param data: The data + :type data: T + :return: `data` + :rtype: T + """ + if data is None: + return data + if not isinstance(data, (Output, str)): + msg = f"{self.__class__.__name__} only allow set {Output.__name__} object, {type(data)} is not supported." + raise ValidationException( + message=msg, + target=ErrorTarget.PIPELINE, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + res: T = cast(T, data) + return res + + def _to_job_output(self) -> Optional[Output]: + """Convert the output to Output, this logic will change if backend contract changes.""" + if self._data is None: + # None data means this output is not configured. + result = None + elif isinstance(self._data, str): + result = Output( + type=AssetTypes.URI_FOLDER, path=self._data, mode=self.mode, name=self.name, version=self.version + ) + elif isinstance(self._data, Output): + result = self._data + elif isinstance(self._data, PipelineOutput): + result = Output( + type=AssetTypes.URI_FOLDER, + path=self._data._data_binding(), + mode=self.mode, + name=self._data.name, + version=self._data.version, + description=self.description, + ) + else: + msg = "Got unexpected type for output: {}." + raise ValidationException( + message=msg.format(self._data), + target=ErrorTarget.PIPELINE, + no_personal_data_message=msg.format("[data]"), + ) + return result + + def _data_binding(self) -> str: + if self._owner is not None: + return f"${{{{parent.jobs.{self._owner.name}.outputs.{self._port_name}}}}}" + + return "" + + def _copy(self, owner: Any) -> "NodeOutput": + return NodeOutput( + port_name=self._port_name, + data=cast(Output, self._data), + owner=owner, + meta=self._meta, + ) + + def _deepcopy(self) -> "NodeOutput": + return NodeOutput( + port_name=self._port_name, + data=cast(Output, copy.copy(self._data)), + owner=self._owner, + meta=self._meta, + binding_output=self._binding_output, + ) + + +class PipelineInput(NodeInput, PipelineExpressionMixin): + """Define one input of a Pipeline.""" + + def __init__(self, name: str, meta: Optional[Input], group_names: Optional[List[str]] = None, **kwargs: Any): + """Initialize a PipelineInput. + + :param name: The name of the input. + :type name: str + :param meta: Metadata of this input, eg: type, min, max, etc. + :type meta: Input + :param group_names: The input parameter's group names. + :type group_names: List[str] + """ + super(PipelineInput, self).__init__(port_name=name, meta=meta, **kwargs) + self._group_names = group_names if group_names else [] + + def result(self) -> Any: + """Return original value of pipeline input. + + :return: The original value of pipeline input + :rtype: Any + + Example: + + .. code-block:: python + + @pipeline + def pipeline_func(param1): + # node1's param1 will get actual value of param1 instead of a input binding. + node1 = component_func(param1=param1.result()) + """ + + # use this to break self loop + original_data_cache: Set = set() + original_data = self._original_data + while isinstance(original_data, PipelineInput) and original_data not in original_data_cache: + original_data_cache.add(original_data) + original_data = original_data._original_data + return original_data + + def __str__(self) -> str: + return self._data_binding() + + @overload + def _build_data(self, data: Union[Model, Data]) -> Input: ... + + @overload + def _build_data(self, data: T) -> Any: ... + + def _build_data(self, data: Union[Model, Data, T]) -> Any: + """Build data according to input type. + + :param data: The data + :type data: Union[Model, Data, T] + :return: + * Input if data is a Model or Data + * data otherwise + :rtype: Union[Input, T] + """ + if data is None: + return data + # Unidiomatic typecheck: Checks that data is _exactly_ this type, and not potentially a subtype + if type(data) is NodeInput: # pylint: disable=unidiomatic-typecheck + msg = "Can not bind input to another component's input." + raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.PIPELINE) + if isinstance(data, (PipelineInput, NodeOutput)): + # If value is input or output, it's a data binding, owner is required to convert it to + # a data binding, eg: ${{parent.inputs.xxx}} + if isinstance(data, NodeOutput) and data._owner is None: + msg = "Setting input binding {} to output without owner is not allowed." + raise ValidationException( + message=msg.format(data), + no_personal_data_message=msg.format("[data]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + return data + if isinstance(data, (Data, Model)): + # If value is Data, we convert it to an corresponding Input + return _data_to_input(data) + return data + + def _data_binding(self) -> str: + full_name = "%s.%s" % (".".join(self._group_names), self._port_name) if self._group_names else self._port_name + return f"${{{{parent.inputs.{full_name}}}}}" + + def _to_input(self) -> Optional[Union[Input, Output]]: + """Convert pipeline input to component input for pipeline component. + + :return: The component input + :rtype: Input + """ + if self._data is None: + # None data means this input is not configured. + return self._meta + data_type = self._data.type if isinstance(self._data, Input) else None + # If type is asset type, return data type without default. + # Else infer type from data and set it as default. + if data_type and data_type.lower() in AssetTypes.__dict__.values(): + if not isinstance(self._data, (int, float, str)): + result = Input(type=data_type, mode=self._data.mode) + elif type(self._data) in IOConstants.PRIMITIVE_TYPE_2_STR: + result = Input( + type=IOConstants.PRIMITIVE_TYPE_2_STR[type(self._data)], + default=self._data, + ) + else: + msg = f"Unsupported Input type {type(self._data)} detected when translate job to component." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + return result # pylint: disable=possibly-used-before-assignment + + +class PipelineOutput(NodeOutput): + """Define one output of a Pipeline.""" + + def _to_job_output(self) -> Optional[Output]: + result: Optional[Output] = None + if isinstance(self._data, Output): + # For pipeline output with type Output, always pass to backend. + result = self._data + elif self._data is None and self._meta and self._meta.type: + # For un-configured pipeline output with meta, we need to return Output with accurate type, + # so it won't default to uri_folder. + result = Output(type=self._meta.type, mode=self._meta.mode, description=self._meta.description) + else: + result = super(PipelineOutput, self)._to_job_output() + # Copy meta type to avoid built output's None type default to uri_folder. + if self.type and result is not None and not result.type: + result.type = self.type + return result + + def _data_binding(self) -> str: + return f"${{{{parent.outputs.{self._port_name}}}}}" + + def _to_output(self) -> Optional[Output]: + """Convert pipeline output to component output for pipeline component.""" + if self._data is None: + # None data means this input is not configured. + return None + if isinstance(self._meta, Output): + return self._meta + # Assign type directly as we didn't have primitive output type for now. + if not isinstance(self._data, (int, float, str)): + return Output(type=self._data.type, mode=self._data.mode) + return Output() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/mixin.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/mixin.py new file mode 100644 index 00000000..6c3d9357 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/mixin.py @@ -0,0 +1,623 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import copy +from typing import Any, Dict, List, Optional, Tuple, Type, Union + +from azure.ai.ml._restclient.v2023_04_01_preview.models import JobInput as RestJobInput +from azure.ai.ml._restclient.v2023_04_01_preview.models import JobOutput as RestJobOutput +from azure.ai.ml.constants._component import ComponentJobConstants +from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output +from azure.ai.ml.entities._util import copy_output_setting +from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException + +from ..._input_output_helpers import ( + from_rest_data_outputs, + from_rest_inputs_to_dataset_literal, + to_rest_data_outputs, + to_rest_dataset_literal_inputs, +) +from .._pipeline_job_helpers import from_dict_to_rest_io, process_sdk_component_job_io +from .attr_dict import InputsAttrDict, OutputsAttrDict, _GroupAttrDict +from .base import NodeInput, NodeOutput, PipelineInput, PipelineOutput + + +class NodeIOMixin: + """Provides ability to wrap node inputs/outputs and build data bindings + dynamically.""" + + @classmethod + def _get_supported_inputs_types(cls) -> Optional[Any]: + return None + + @classmethod + def _get_supported_outputs_types(cls) -> Optional[Any]: + return None + + @classmethod + def _validate_io(cls, value: Any, allowed_types: Optional[tuple], *, key: Optional[str] = None) -> None: + if allowed_types is None: + return + + if value is None or isinstance(value, allowed_types): + pass + else: + msg = "Expecting {} for input/output {}, got {} instead." + raise ValidationException( + message=msg.format(allowed_types, key, type(value)), + no_personal_data_message=msg.format(allowed_types, "[key]", type(value)), + target=ErrorTarget.PIPELINE, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + def _build_input( + self, + name: str, + meta: Optional[Input], + data: Optional[Union[dict, int, bool, float, str, Output, "PipelineInput", Input]], + ) -> NodeInput: + # output mode of last node should not affect input mode of next node + if isinstance(data, NodeOutput): + # Decoupled input and output + # value = copy.deepcopy(value) + data = data._deepcopy() # pylint: disable=protected-access + data.mode = None + elif isinstance(data, dict): + # Use type comparison instead of is_instance to skip _GroupAttrDict + # when loading from yaml io will be a dict, + # like {'job_data_path': '${{parent.inputs.pipeline_job_data_path}}'} + # parse dict to allowed type + data = Input(**data) + + # parameter group can be of custom type, so we don't check it here + if meta is not None and not isinstance(meta, GroupInput): + self._validate_io(data, self._get_supported_inputs_types(), key=name) + return NodeInput(port_name=name, meta=meta, data=data, owner=self) + + def _build_output(self, name: str, meta: Optional[Output], data: Optional[Union[Output, str]]) -> NodeOutput: + if isinstance(data, dict): + data = Output(**data) + + self._validate_io(data, self._get_supported_outputs_types(), key=name) + # For un-configured outputs, settings it to None, so we won't pass extra fields(eg: default mode) + return NodeOutput(port_name=name, meta=meta, data=data, owner=self) + + # pylint: disable=unused-argument + def _get_default_input_val(self, val: Any): # type: ignore + # use None value as data placeholder for unfilled inputs. + # server side will fill the default value + return None + + def _build_inputs_dict( + self, + inputs: Dict[str, Union[Input, str, bool, int, float]], + *, + input_definition_dict: Optional[dict] = None, + ) -> InputsAttrDict: + """Build an input attribute dict so user can get/set inputs by + accessing attribute, eg: node1.inputs.xxx. + + :param inputs: Provided kwargs when parameterizing component func. + :type inputs: Dict[str, Union[Input, str, bool, int, float]] + :keyword input_definition_dict: Static input definition dict. If not provided, will build inputs without meta. + :paramtype input_definition_dict: dict + :return: Built dynamic input attribute dict. + :rtype: InputsAttrDict + """ + if input_definition_dict is not None: + # TODO: validate inputs.keys() in input_definitions.keys() + input_dict = {} + for key, val in input_definition_dict.items(): + if key in inputs.keys(): + # If input is set through component functions' kwargs, create an input object with real value. + data = inputs[key] + else: + data = self._get_default_input_val(val) # pylint: disable=assignment-from-none + + val = self._build_input(name=key, meta=val, data=data) + input_dict[key] = val + else: + input_dict = {key: self._build_input(name=key, meta=None, data=val) for key, val in inputs.items()} + return InputsAttrDict(input_dict) + + def _build_outputs_dict( + self, outputs: Dict, *, output_definition_dict: Optional[dict] = None, none_data: bool = False + ) -> OutputsAttrDict: + """Build an output attribute dict so user can get/set outputs by + accessing attribute, eg: node1.outputs.xxx. + + :param outputs: Provided kwargs when parameterizing component func. + :type outputs: Dict[str, Output] + :keyword output_definition_dict: Static output definition dict. + :paramtype output_definition_dict: Dict + :keyword none_data: If True, will set output data to None. + :paramtype none_data: bool + :return: Built dynamic output attribute dict. + :rtype: OutputsAttrDict + """ + if output_definition_dict is not None: + # TODO: check if we need another way to mark a un-configured output instead of just set None. + # Create None as data placeholder for all outputs. + output_dict = {} + for key, val in output_definition_dict.items(): + if key in outputs.keys(): + # If output has given value, create an output object with real value. + val = self._build_output(name=key, meta=val, data=outputs[key]) + else: + val = self._build_output(name=key, meta=val, data=None) + output_dict[key] = val + else: + output_dict = {} + for key, val in outputs.items(): + output_val = self._build_output(name=key, meta=None, data=val if not none_data else None) + output_dict[key] = output_val + return OutputsAttrDict(output_dict) + + def _build_inputs(self) -> Dict: + """Build inputs of this component to a dict dict which maps output to + actual value. + + The built input dict will have same input format as other jobs, eg: + { + "input_data": Input(path="path/to/input/data", mode="Mount"), + "input_value": 10, + "learning_rate": "${{jobs.step1.inputs.learning_rate}}" + } + + :return: The input dict + :rtype: Dict[str, Union[Input, str, bool, int, float]] + """ + inputs = {} + # pylint: disable=redefined-builtin + for name, input in self.inputs.items(): # type: ignore + if isinstance(input, _GroupAttrDict): + # Flatten group inputs into inputs dict + inputs.update(input.flatten(group_parameter_name=name)) + continue + inputs[name] = input._to_job_input() # pylint: disable=protected-access + return inputs + + def _build_outputs(self) -> Dict[str, Output]: + """Build outputs of this component to a dict which maps output to + actual value. + + The built output dict will have same output format as other jobs, eg: + { + "eval_output": "${{jobs.eval.outputs.eval_output}}" + } + + :return: The output dict + :rtype: Dict[str, Output] + """ + outputs = {} + for name, output in self.outputs.items(): # type: ignore + if isinstance(output, NodeOutput): + output = output._to_job_output() # pylint: disable=protected-access + outputs[name] = output + # Remove non-configured output + return {k: v for k, v in outputs.items() if v is not None} + + def _to_rest_inputs(self) -> Dict[str, Dict]: + """Translate input builders to rest input dicts. + + The built dictionary's format aligns with component job's input yaml, eg: + { + "input_data": {"data": {"path": "path/to/input/data"}, "mode"="Mount"}, + "input_value": 10, + "learning_rate": "${{jobs.step1.inputs.learning_rate}}" + } + + :return: The REST inputs + :rtype: Dict[str, Dict] + """ + built_inputs = self._build_inputs() + return self._input_entity_to_rest_inputs(input_entity=built_inputs) + + @classmethod + def _input_entity_to_rest_inputs(cls, input_entity: Dict[str, Input]) -> Dict[str, Dict]: + # Convert io entity to rest io objects + input_bindings, dataset_literal_inputs = process_sdk_component_job_io( + input_entity, [ComponentJobConstants.INPUT_PATTERN] + ) + + # parse input_bindings to InputLiteral(value=str(binding)) + rest_inputs = {**input_bindings, **dataset_literal_inputs} + # Note: The function will only be called from BaseNode, + # and job_type is used to enable dot in pipeline job input keys, + # so pass job_type as None directly here. + rest_inputs = to_rest_dataset_literal_inputs(rest_inputs, job_type=None) + + # convert rest io to dict + rest_dataset_literal_inputs = {} + for name, val in rest_inputs.items(): + rest_dataset_literal_inputs[name] = val.as_dict() + if hasattr(val, "mode") and val.mode: + rest_dataset_literal_inputs[name].update({"mode": val.mode.value}) + return rest_dataset_literal_inputs + + def _to_rest_outputs(self) -> Dict[str, Dict]: + """Translate output builders to rest output dicts. + + The built dictionary's format aligns with component job's output yaml, eg: + {"eval_output": "${{jobs.eval.outputs.eval_output}}"} + + :return: The REST outputs + :rtype: Dict[str, Dict] + """ + built_outputs = self._build_outputs() + + # Convert io entity to rest io objects + output_bindings, data_outputs = process_sdk_component_job_io( + built_outputs, [ComponentJobConstants.OUTPUT_PATTERN] + ) + rest_data_outputs = to_rest_data_outputs(data_outputs) + + # convert rest io to dict + # parse output_bindings to {"value": binding, "type": "literal"} since there's no mode for it + rest_output_bindings = {} + for key, binding in output_bindings.items(): + rest_output_bindings[key] = {"value": binding["value"], "type": "literal"} + if "mode" in binding: + rest_output_bindings[key].update({"mode": binding["mode"].value}) + if "name" in binding: + rest_output_bindings[key].update({"name": binding["name"]}) + if "version" in binding: + rest_output_bindings[key].update({"version": binding["version"]}) + + def _rename_name_and_version(output_dict: Dict) -> Dict: + # NodeOutput can only be registered with name and version, therefore we rename here + if "asset_name" in output_dict.keys(): + output_dict["name"] = output_dict.pop("asset_name") + if "asset_version" in output_dict.keys(): + output_dict["version"] = output_dict.pop("asset_version") + return output_dict + + rest_data_outputs = {name: _rename_name_and_version(val.as_dict()) for name, val in rest_data_outputs.items()} + self._update_output_types(rest_data_outputs) + rest_data_outputs.update(rest_output_bindings) + return rest_data_outputs + + @classmethod + def _from_rest_inputs(cls, inputs: Dict) -> Dict[str, Union[Input, str, bool, int, float]]: + """Load inputs from rest inputs. + + :param inputs: The REST inputs + :type inputs: Dict[str, Union[str, dict]] + :return: Input dict + :rtype: Dict[str, Union[Input, str, bool, int, float]] + """ + + # JObject -> RestJobInput/RestJobOutput + input_bindings, rest_inputs = from_dict_to_rest_io(inputs, RestJobInput, [ComponentJobConstants.INPUT_PATTERN]) + + # RestJobInput/RestJobOutput -> Input/Output + dataset_literal_inputs = from_rest_inputs_to_dataset_literal(rest_inputs) + + return {**dataset_literal_inputs, **input_bindings} + + @classmethod + def _from_rest_outputs(cls, outputs: Dict[str, Union[str, dict]]) -> Dict: + """Load outputs from rest outputs. + + :param outputs: The REST outputs + :type outputs: Dict[str, Union[str, dict]] + :return: Output dict + :rtype: Dict[str, Output] + """ + + # JObject -> RestJobInput/RestJobOutput + output_bindings, rest_outputs = from_dict_to_rest_io( + outputs, RestJobOutput, [ComponentJobConstants.OUTPUT_PATTERN] + ) + + # RestJobInput/RestJobOutput -> Input/Output + data_outputs = from_rest_data_outputs(rest_outputs) + + return {**data_outputs, **output_bindings} + + def _update_output_types(self, rest_data_outputs: dict) -> None: + """Update output types in rest_data_outputs according to meta level output. + + :param rest_data_outputs: The REST data outputs + :type rest_data_outputs: Dict + """ + + for name, rest_output in rest_data_outputs.items(): + original_output = self.outputs[name] # type: ignore + # for configured output with meta, "correct" the output type to file to avoid the uri_folder default value + if original_output and original_output.type: + if original_output.type in ["AnyFile", "uri_file"]: + rest_output["job_output_type"] = "uri_file" + + +def flatten_dict( + dct: Optional[Dict], + _type: Union[Type["_GroupAttrDict"], Type[GroupInput]], + *, + allow_dict_fields: Optional[List[str]] = None, +) -> Dict: + """Flatten inputs/input_definitions dict for inputs dict build. + + :param dct: The dictionary to flatten + :type dct: Dict + :param _type: Either _GroupAttrDict or GroupInput (both have the method `flatten`) + :type _type: Union[Type["_GroupAttrDict"], Type[GroupInput]] + :keyword allow_dict_fields: A list of keys for dictionary values that will be included in flattened output + :paramtype allow_dict_fields: Optional[List[str]] + :return: The flattened dict + :rtype: Dict + """ + _result = {} + if dct is not None: + for key, val in dct.items(): + # to support passing dict value as parameter group + if allow_dict_fields and key in allow_dict_fields and isinstance(val, dict): + # for child dict, all values are allowed to be dict + for flattened_key, flattened_val in flatten_dict( + val, _type, allow_dict_fields=list(val.keys()) + ).items(): + _result[key + "." + flattened_key] = flattened_val + continue + val = GroupInput.custom_class_value_to_attr_dict(val) + if isinstance(val, _type): + _result.update(val.flatten(group_parameter_name=key)) + continue + _result[key] = val + return _result + + +class NodeWithGroupInputMixin(NodeIOMixin): + """This class provide build_inputs_dict for a node to use ParameterGroup as an input.""" + + @classmethod + def _validate_group_input_type( + cls, + input_definition_dict: dict, + inputs: Dict[str, Union[Input, str, bool, int, float]], + ) -> None: + """Raise error when group input receive a value not group type. + + :param input_definition_dict: The input definition dict + :type input_definition_dict: dict + :param inputs: The inputs + :type inputs: Dict[str, Union[Input, str, bool, int, float]] + """ + # Note: We put and extra validation here instead of doing it in pipeline._validate() + # due to group input will be discarded silently if assign it to a non-group parameter. + group_msg = "'%s' is defined as a parameter group but got input '%s' with type '%s'." + non_group_msg = "'%s' is defined as a parameter but got a parameter group as input." + for key, val in inputs.items(): + definition = input_definition_dict.get(key) + val = GroupInput.custom_class_value_to_attr_dict(val) + if val is None: + continue + # 1. inputs.group = 'a string' + if isinstance(definition, GroupInput) and not isinstance(val, (_GroupAttrDict, dict)): + raise ValidationException( + message=group_msg % (key, val, type(val)), + no_personal_data_message=group_msg % ("[key]", "[val]", "[type(val)]"), + target=ErrorTarget.PIPELINE, + type=ValidationErrorType.INVALID_VALUE, + ) + # 2. inputs.str_param = group + if not isinstance(definition, GroupInput) and isinstance(val, _GroupAttrDict): + raise ValidationException( + message=non_group_msg % key, + no_personal_data_message=non_group_msg % "[key]", + target=ErrorTarget.PIPELINE, + type=ValidationErrorType.INVALID_VALUE, + ) + + @classmethod + def _flatten_inputs_and_definition( + cls, + inputs: Dict[str, Union[Input, str, bool, int, float]], + input_definition_dict: dict, + ) -> Tuple[Dict, Dict]: + """ + Flatten all GroupInput(definition) and GroupAttrDict recursively and build input dict. + For example: + input_definition_dict = { + "group1": GroupInput( + values={ + "param1": GroupInput( + values={ + "param1_1": Input(type="str"), + } + ), + "param2": Input(type="int"), + } + ), + "group2": GroupInput( + values={ + "param3": Input(type="str"), + } + ), + } => { + "group1.param1.param1_1": Input(type="str"), + "group1.param2": Input(type="int"), + "group2.param3": Input(type="str"), + } + inputs = { + "group1": { + "param1": { + "param1_1": "value1", + }, + "param2": 2, + }, + "group2": { + "param3": "value3", + }, + } => { + "group1.param1.param1_1": "value1", + "group1.param2": 2, + "group2.param3": "value3", + } + :param inputs: The inputs + :type inputs: Dict[str, Union[Input, str, bool, int, float]] + :param input_definition_dict: The input definition dict + :type input_definition_dict: dict + :return: The flattened inputs and definition + :rtype: Tuple[Dict, Dict] + """ + group_input_names = [key for key, val in input_definition_dict.items() if isinstance(val, GroupInput)] + flattened_inputs = flatten_dict(inputs, _GroupAttrDict, allow_dict_fields=group_input_names) + flattened_definition_dict = flatten_dict(input_definition_dict, GroupInput) + return flattened_inputs, flattened_definition_dict + + def _build_inputs_dict( + self, + inputs: Dict[str, Union[Input, str, bool, int, float]], + *, + input_definition_dict: Optional[dict] = None, + ) -> InputsAttrDict: + """Build an input attribute dict so user can get/set inputs by + accessing attribute, eg: node1.inputs.xxx. + + :param inputs: Provided kwargs when parameterizing component func. + :type inputs: Dict[str, Union[Input, str, bool, int, float]] + :keyword input_definition_dict: Input definition dict from component entity. + :paramtype input_definition_dict: dict + :return: Built input attribute dict. + :rtype: InputsAttrDict + """ + + # TODO: should we support group input when there is no local input definition? + if input_definition_dict is not None: + # Validate group mismatch + self._validate_group_input_type(input_definition_dict, inputs) + + # Flatten inputs and definition + flattened_inputs, flattened_definition_dict = self._flatten_inputs_and_definition( + inputs, input_definition_dict + ) + # Build: zip all flattened parameter with definition + inputs = super()._build_inputs_dict(flattened_inputs, input_definition_dict=flattened_definition_dict) + return InputsAttrDict(GroupInput.restore_flattened_inputs(inputs)) + return super()._build_inputs_dict(inputs) + + +class PipelineJobIOMixin(NodeWithGroupInputMixin): + """Provides ability to wrap pipeline job inputs/outputs and build data bindings + dynamically.""" + + def _build_input(self, name: str, meta: Optional[Input], data: Any) -> "PipelineInput": + return PipelineInput(name=name, meta=meta, data=data, owner=self) + + def _build_output( + self, name: str, meta: Optional[Union[Input, Output]], data: Optional[Union[Output, str]] + ) -> "PipelineOutput": + # TODO: settings data to None for un-configured outputs so we won't passing extra fields(eg: default mode) + result = PipelineOutput(port_name=name, meta=meta, data=data, owner=self) + return result + + def _build_inputs_dict( + self, + inputs: Dict[str, Union[Input, str, bool, int, float]], + *, + input_definition_dict: Optional[dict] = None, + ) -> InputsAttrDict: + """Build an input attribute dict so user can get/set inputs by + accessing attribute, eg: node1.inputs.xxx. + + :param inputs: Provided kwargs when parameterizing component func. + :type inputs: Dict[str, Union[Input, str, bool, int, float]] + :keyword input_definition_dict: Input definition dict from component entity. + :return: Built input attribute dict. + :rtype: InputsAttrDict + """ + input_dict = super()._build_inputs_dict(inputs, input_definition_dict=input_definition_dict) + # TODO: should we do this when input_definition_dict is not None? + # TODO: should we put this in super()._build_inputs_dict? + if input_definition_dict is None: + return InputsAttrDict(GroupInput.restore_flattened_inputs(input_dict)) + return input_dict + + def _build_output_for_pipeline(self, name: str, data: Optional[Union[Output, NodeOutput]]) -> "PipelineOutput": + """Build an output object for pipeline and copy settings from source output. + + :param name: Output name. + :type name: str + :param data: Output data. + :type data: Optional[Union[Output, NodeOutput]] + :return: Built output object. + :rtype: PipelineOutput + """ + # pylint: disable=protected-access + if data is None: + # For None output, build an empty output builder + output_val = self._build_output(name=name, meta=None, data=None) + elif isinstance(data, Output): + # For output entity, build an output builder with data points to it + output_val = self._build_output(name=name, meta=data, data=data) + elif isinstance(data, NodeOutput): + # For output builder, build a new output builder and copy settings from it + output_val = self._build_output(name=name, meta=data._meta, data=None) + copy_output_setting(source=data, target=output_val) + else: + message = "Unsupported output type: {} for pipeline output: {}: {}" + raise ValidationException( + message=message.format(type(data), name, data), + no_personal_data_message=message, + target=ErrorTarget.PIPELINE, + ) + return output_val + + def _build_pipeline_outputs_dict(self, outputs: Dict) -> OutputsAttrDict: + """Build an output attribute dict without output definition metadata. + For pipeline outputs, its setting should be copied from node level outputs. + + :param outputs: Node output dict or pipeline component's outputs. + :type outputs: Dict[str, Union[Output, NodeOutput]] + :return: Built dynamic output attribute dict. + :rtype: OutputsAttrDict + """ + output_dict = {} + for key, val in outputs.items(): + output_dict[key] = self._build_output_for_pipeline(name=key, data=val) + return OutputsAttrDict(output_dict) + + def _build_outputs(self) -> Dict[str, Output]: + """Build outputs of this pipeline to a dict which maps output to actual + value. + + The built dictionary's format aligns with component job's output yaml, + un-configured outputs will be None, eg: + {"eval_output": "${{jobs.eval.outputs.eval_output}}", "un_configured": None} + + :return: The output dict + :rtype: Dict[str, Output] + """ + outputs = {} + for name, output in self.outputs.items(): # type: ignore + if isinstance(output, NodeOutput): + output = output._to_job_output() # pylint: disable=protected-access + outputs[name] = output + return outputs + + def _get_default_input_val(self, val: Any): # type: ignore + # use Default value as data placeholder for unfilled inputs. + # client side need to fill the default value for dsl.pipeline + if isinstance(val, GroupInput): + # Copy default value dict for group + return copy.deepcopy(val.default) + return val.default + + def _update_output_types(self, rest_data_outputs: Dict) -> None: + """Won't clear output type for pipeline level outputs since it's required in rest object. + + :param rest_data_outputs: The REST data outputs + :type rest_data_outputs: Dict + """ + + +class AutoMLNodeIOMixin(NodeIOMixin): + """Wrap outputs of automl node and build data bindings dynamically.""" + + def __init__(self, **kwargs): # type: ignore + # add a inputs field to align with other nodes + self.inputs = {} + super(AutoMLNodeIOMixin, self).__init__(**kwargs) + if getattr(self, "outputs", None): + self._outputs = self._build_outputs_dict(self.outputs or {}) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_load_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_load_component.py new file mode 100644 index 00000000..60c4cbe7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_load_component.py @@ -0,0 +1,313 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access +from typing import Any, Callable, Dict, List, Mapping, Optional, Union, cast + +from marshmallow import INCLUDE + +from azure.ai.ml import Output +from azure.ai.ml._schema import NestedField +from azure.ai.ml._schema.pipeline.component_job import SweepSchema +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, SOURCE_PATH_CONTEXT_KEY, CommonYamlFields +from azure.ai.ml.constants._component import ControlFlowType, DataTransferTaskType, NodeType +from azure.ai.ml.constants._compute import ComputeType +from azure.ai.ml.dsl._component_func import to_component_func +from azure.ai.ml.dsl._overrides_definition import OverrideDefinition +from azure.ai.ml.entities._builders import ( + BaseNode, + Command, + DataTransferCopy, + DataTransferExport, + DataTransferImport, + Import, + Parallel, + Spark, + Sweep, +) +from azure.ai.ml.entities._builders.condition_node import ConditionNode +from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode +from azure.ai.ml.entities._builders.do_while import DoWhile +from azure.ai.ml.entities._builders.parallel_for import ParallelFor +from azure.ai.ml.entities._builders.pipeline import Pipeline +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob +from azure.ai.ml.entities._util import get_type_from_spec +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException + + +class _PipelineNodeFactory: + """A class to create pipeline node instances from yaml dict or rest objects without hard-coded type check.""" + + def __init__(self) -> None: + self._create_instance_funcs: dict = {} + self._load_from_rest_object_funcs: dict = {} + + self.register_type( + _type=NodeType.COMMAND, + create_instance_func=lambda: Command.__new__(Command), + load_from_rest_object_func=Command._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=NodeType.IMPORT, + create_instance_func=lambda: Import.__new__(Import), + load_from_rest_object_func=Import._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=NodeType.PARALLEL, + create_instance_func=lambda: Parallel.__new__(Parallel), + load_from_rest_object_func=Parallel._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=NodeType.PIPELINE, + create_instance_func=lambda: Pipeline.__new__(Pipeline), + load_from_rest_object_func=Pipeline._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=NodeType.SWEEP, + create_instance_func=lambda: Sweep.__new__(Sweep), + load_from_rest_object_func=Sweep._from_rest_object, + nested_schema=NestedField(SweepSchema, unknown=INCLUDE), + ) + self.register_type( + _type=NodeType.AUTOML, + create_instance_func=None, + load_from_rest_object_func=self._automl_from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=NodeType.SPARK, + create_instance_func=lambda: Spark.__new__(Spark), + load_from_rest_object_func=Spark._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=ControlFlowType.DO_WHILE, + create_instance_func=None, + load_from_rest_object_func=DoWhile._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=ControlFlowType.IF_ELSE, + create_instance_func=None, + load_from_rest_object_func=ConditionNode._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=ControlFlowType.PARALLEL_FOR, + create_instance_func=None, + load_from_rest_object_func=ParallelFor._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.COPY_DATA]), + create_instance_func=lambda: DataTransferCopy.__new__(DataTransferCopy), + load_from_rest_object_func=DataTransferCopy._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.IMPORT_DATA]), + create_instance_func=lambda: DataTransferImport.__new__(DataTransferImport), + load_from_rest_object_func=DataTransferImport._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.EXPORT_DATA]), + create_instance_func=lambda: DataTransferExport.__new__(DataTransferExport), + load_from_rest_object_func=DataTransferExport._from_rest_object, + nested_schema=None, + ) + self.register_type( + _type=NodeType.FLOW_PARALLEL, + create_instance_func=lambda: Parallel.__new__(Parallel), + load_from_rest_object_func=None, + nested_schema=None, + ) + + @classmethod + def _get_func(cls, _type: str, funcs: Dict[str, Callable]) -> Callable: + if _type == NodeType._CONTAINER: + msg = ( + "Component returned by 'list' is abbreviated and can not be used directly, " + "please use result from 'get'." + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + ) + _type = get_type_from_spec({CommonYamlFields.TYPE: _type}, valid_keys=funcs) + return funcs[_type] + + def get_create_instance_func(self, _type: str) -> Callable[..., BaseNode]: + """Get the function to create a new instance of the node. + + :param _type: The type of the node. + :type _type: str + :return: The create instance function + :rtype: Callable[..., BaseNode] + """ + return self._get_func(_type, self._create_instance_funcs) + + def get_load_from_rest_object_func(self, _type: str) -> Callable: + """Get the function to load a node from a rest object. + + :param _type: The type of the node. + :type _type: str + :return: The `_load_from_rest_object` function + :rtype: Callable[[Any], Union[BaseNode, AutoMLJob, ControlFlowNode]] + """ + return self._get_func(_type, self._load_from_rest_object_funcs) + + def register_type( + self, + _type: str, + *, + create_instance_func: Optional[Callable[..., Union[BaseNode, AutoMLJob]]] = None, + load_from_rest_object_func: Optional[Callable] = None, + nested_schema: Optional[Union[NestedField, List[NestedField]]] = None, + ) -> None: + """Register a type of node. + + :param _type: The type of the node. + :type _type: str + :keyword create_instance_func: A function to create a new instance of the node + :paramtype create_instance_func: typing.Optional[typing.Callable[..., typing.Union[BaseNode, AutoMLJob]]] + :keyword load_from_rest_object_func: A function to load a node from a rest object + :paramtype load_from_rest_object_func: typing.Optional[typing.Callable[[Any], typing.Union[BaseNode, AutoMLJob\ + , ControlFlowNode]]] + :keyword nested_schema: schema/schemas of corresponding nested field, will be used in \ + PipelineJobSchema.jobs.value + :paramtype nested_schema: typing.Optional[typing.Union[NestedField, List[NestedField]]] + """ + if create_instance_func is not None: + self._create_instance_funcs[_type] = create_instance_func + if load_from_rest_object_func is not None: + self._load_from_rest_object_funcs[_type] = load_from_rest_object_func + if nested_schema is not None: + from azure.ai.ml._schema.core.fields import TypeSensitiveUnionField + from azure.ai.ml._schema.pipeline.pipeline_component import PipelineComponentSchema + from azure.ai.ml._schema.pipeline.pipeline_job import PipelineJobSchema + + for declared_fields in [ + PipelineJobSchema._declared_fields, + PipelineComponentSchema._declared_fields, + ]: + jobs_value_field: TypeSensitiveUnionField = declared_fields["jobs"].value_field + if not isinstance(nested_schema, list): + nested_schema = [nested_schema] + for nested_field in nested_schema: + jobs_value_field.insert_type_sensitive_field(type_name=_type, field=nested_field) + + def load_from_dict(self, *, data: dict, _type: Optional[str] = None) -> Union[BaseNode, AutoMLJob]: + """Load a node from a dict. + + :keyword data: A dict containing the node's data. + :paramtype data: dict + :keyword _type: The type of the node. If not specified, it will be inferred from the data. + :paramtype _type: str + :return: The node + :rtype: Union[BaseNode, AutoMLJob] + """ + if _type is None: + _type = data[CommonYamlFields.TYPE] if CommonYamlFields.TYPE in data else NodeType.COMMAND + # todo: refine Hard code for now to support different task type for DataTransfer node + if _type == NodeType.DATA_TRANSFER: + _type = "_".join([NodeType.DATA_TRANSFER, data.get("task", " ")]) + else: + data[CommonYamlFields.TYPE] = _type + + new_instance: Union[BaseNode, AutoMLJob] = self.get_create_instance_func(_type)() + + if isinstance(new_instance, BaseNode): + # parse component + component_key = new_instance._get_component_attr_name() + if component_key in data and isinstance(data[component_key], dict): + data[component_key] = Component._load( + data=data[component_key], + yaml_path=data[component_key].pop(SOURCE_PATH_CONTEXT_KEY, None), + ) + # TODO: Bug Item number: 2883415 + new_instance.__init__(**data) # type: ignore + return new_instance + + def load_from_rest_object( + self, *, obj: dict, _type: Optional[str] = None, **kwargs: Any + ) -> Union[BaseNode, AutoMLJob, ControlFlowNode]: + """Load a node from a rest object. + + :keyword obj: A rest object containing the node's data. + :paramtype obj: dict + :keyword _type: The type of the node. If not specified, it will be inferred from the data. + :paramtype _type: str + :return: The node + :rtype: Union[BaseNode, AutoMLJob, ControlFlowNode] + """ + + # TODO: Remove in PuP with native import job/component type support in MFE/Designer + if "computeId" in obj and obj["computeId"] and obj["computeId"].endswith("/" + ComputeType.ADF): + _type = NodeType.IMPORT + + if _type is None: + _type = obj[CommonYamlFields.TYPE] if CommonYamlFields.TYPE in obj else NodeType.COMMAND + # todo: refine Hard code for now to support different task type for DataTransfer node + if _type == NodeType.DATA_TRANSFER: + _type = "_".join([NodeType.DATA_TRANSFER, obj.get("task", " ")]) + else: + obj[CommonYamlFields.TYPE] = _type + + res: Union[BaseNode, AutoMLJob, ControlFlowNode] = self.get_load_from_rest_object_func(_type)(obj, **kwargs) + return res + + @classmethod + def _automl_from_rest_object(cls, node: Dict) -> AutoMLJob: + _outputs = cast(Dict[str, Union[str, dict]], node.get("outputs")) + # rest dict outputs -> Output objects + outputs = AutoMLJob._from_rest_outputs(_outputs) + # Output objects -> yaml dict outputs + parsed_outputs = {} + for key, val in outputs.items(): + if isinstance(val, Output): + val = val._to_dict() + parsed_outputs[key] = val + node["outputs"] = parsed_outputs + return AutoMLJob._load_from_dict( + node, + context={BASE_PATH_CONTEXT_KEY: "./"}, + additional_message="Failed to load automl task from backend.", + inside_pipeline=True, + ) + + +def _generate_component_function( + component_entity: Component, + override_definitions: Optional[Mapping[str, OverrideDefinition]] = None, # pylint: disable=unused-argument +) -> Callable[..., Union[Command, Parallel]]: + # Generate a function which returns a component node. + def create_component_func(**kwargs: Any) -> Union[BaseNode, AutoMLJob]: + # todo: refine Hard code for now to support different task type for DataTransfer node + _type = component_entity.type + if _type == NodeType.DATA_TRANSFER: + # TODO: Bug Item number: 2883431 + _type = "_".join([NodeType.DATA_TRANSFER, component_entity.task]) # type: ignore + if component_entity.task == DataTransferTaskType.IMPORT_DATA: # type: ignore + return pipeline_node_factory.load_from_dict( + data={"component": component_entity, "_from_component_func": True, **kwargs}, + _type=_type, + ) + return pipeline_node_factory.load_from_dict( + data={"component": component_entity, "inputs": kwargs, "_from_component_func": True}, + _type=_type, + ) + + res: Callable = to_component_func(component_entity, create_component_func) + return res + + +pipeline_node_factory = _PipelineNodeFactory() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_expression.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_expression.py new file mode 100644 index 00000000..49bb8a61 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_expression.py @@ -0,0 +1,662 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import re +import tempfile +from collections import namedtuple +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast + +from azure.ai.ml._utils.utils import dump_yaml_to_file, get_all_data_binding_expressions, load_yaml +from azure.ai.ml.constants._common import AZUREML_PRIVATE_FEATURES_ENV_VAR, DefaultOpenEncoding +from azure.ai.ml.constants._component import ComponentParameterTypes, IOConstants +from azure.ai.ml.exceptions import UserErrorException + +if TYPE_CHECKING: + from azure.ai.ml.entities._builders import BaseNode + +ExpressionInput = namedtuple("ExpressionInput", ["name", "type", "value"]) +NONE_PARAMETER_TYPE = "None" + + +class PipelineExpressionOperator: + """Support operator in native Python experience.""" + + ADD = "+" + SUB = "-" + MUL = "*" + DIV = "/" + MOD = "%" + POW = "**" + FLOORDIV = "//" + LT = "<" + GT = ">" + LTE = "<=" + GTE = ">=" + EQ = "==" + NE = "!=" + AND = "&" + OR = "|" + XOR = "^" + + +_SUPPORTED_OPERATORS = { + getattr(PipelineExpressionOperator, attr) + for attr in PipelineExpressionOperator.__dict__ + if not attr.startswith("__") +} + + +def _enumerate_operation_combination() -> Dict[str, Union[str, Exception]]: + """Enumerate the result type of binary operations on types + + Leverages `eval` to validate operation and get its result type. + + :return: A dictionary that maps an operation to either: + * A result type + * An Exception + :rtype: Dict[str, Union[str, Exception]] + """ + res: Dict = {} + primitive_types_values = { + NONE_PARAMETER_TYPE: repr(None), + ComponentParameterTypes.BOOLEAN: repr(True), + ComponentParameterTypes.INTEGER: repr(1), + ComponentParameterTypes.NUMBER: repr(1.0), + ComponentParameterTypes.STRING: repr("1"), + } + for type1, operand1 in primitive_types_values.items(): + for type2, operand2 in primitive_types_values.items(): + for operator in _SUPPORTED_OPERATORS: + k = f"{type1} {operator} {type2}" + try: + eval_result = eval(f"{operand1} {operator} {operand2}") # pylint: disable=eval-used # nosec + res[k] = IOConstants.PRIMITIVE_TYPE_2_STR[type(eval_result)] + except TypeError: + error_message = ( + f"Operator '{operator}' is not supported between instances of '{type1}' and '{type2}'." + ) + res[k] = UserErrorException(message=error_message, no_personal_data_message=error_message) + return res + + +# enumerate and store as a lookup table: +# key format is "<operand1_type> <operator> <operand2_type>" +# value can be either result type as str and UserErrorException for invalid operation +_OPERATION_RESULT_TYPE_LOOKUP = _enumerate_operation_combination() + + +class PipelineExpressionMixin: + _SUPPORTED_PRIMITIVE_TYPES = (bool, int, float, str) + _SUPPORTED_PIPELINE_INPUT_TYPES = ( + ComponentParameterTypes.BOOLEAN, + ComponentParameterTypes.INTEGER, + ComponentParameterTypes.NUMBER, + ComponentParameterTypes.STRING, + ) + + def _validate_binary_operation(self, other: Any, operator: str) -> None: + from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput + + if ( + other is not None + and not isinstance(other, self._SUPPORTED_PRIMITIVE_TYPES) + and not isinstance(other, (PipelineInput, NodeOutput, PipelineExpression)) + ): + error_message = ( + f"Operator '{operator}' is not supported with {type(other)}; " + "currently only support primitive types (None, bool, int, float and str), " + "pipeline input, component output and expression." + ) + raise UserErrorException(message=error_message, no_personal_data_message=error_message) + + def __add__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.ADD) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.ADD) + + def __radd__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.ADD) + return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.ADD) + + def __sub__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.SUB) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.SUB) + + def __rsub__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.SUB) + return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.SUB) + + def __mul__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.MUL) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.MUL) + + def __rmul__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.MUL) + return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.MUL) + + def __truediv__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.DIV) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.DIV) + + def __rtruediv__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.DIV) + return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.DIV) + + def __mod__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.MOD) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.MOD) + + def __rmod__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.MOD) + return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.MOD) + + def __pow__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.POW) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.POW) + + def __rpow__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.POW) + return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.POW) + + def __floordiv__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.FLOORDIV) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.FLOORDIV) + + def __rfloordiv__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.FLOORDIV) + return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.FLOORDIV) + + def __lt__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.LT) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.LT) + + def __gt__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.GT) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.GT) + + def __le__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.LTE) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.LTE) + + def __ge__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.GTE) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.GTE) + + # TODO: Bug Item number: 2883354 + def __eq__(self, other: Any) -> "PipelineExpression": # type: ignore + self._validate_binary_operation(other, PipelineExpressionOperator.EQ) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.EQ) + + # TODO: Bug Item number: 2883354 + def __ne__(self, other: Any) -> "PipelineExpression": # type: ignore + self._validate_binary_operation(other, PipelineExpressionOperator.NE) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.NE) + + def __and__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.AND) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.AND) + + def __or__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.OR) + return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.OR) + + def __xor__(self, other: Any) -> "PipelineExpression": + self._validate_binary_operation(other, PipelineExpressionOperator.XOR) + return PipelineExpression._from_operation(self, None, PipelineExpressionOperator.XOR) + + def __bool__(self) -> bool: + """Python method that is used to implement truth value testing and the built-in operation bool(). + + This method is not supported as PipelineExpressionMixin is designed to record operation history, + while this method can only return False or True, leading to history breaks here. + As overloadable boolean operators PEP (refer to: https://www.python.org/dev/peps/pep-0335/) + was rejected, logical operations are also not supported. + + :return: True if not inside dsl pipeline func, raises otherwise + :rtype: bool + """ + from azure.ai.ml.dsl._pipeline_component_builder import _is_inside_dsl_pipeline_func + + # note: unexpected bool test always be checking if the object is None; + # so for non-pipeline scenarios, directly return True to avoid unexpected breaking, + # and for pipeline scenarios, will use is not None to replace bool test. + if not _is_inside_dsl_pipeline_func(): + return True + + error_message = f"Type {type(self)} is not supported for operation bool()." + raise UserErrorException(message=error_message, no_personal_data_message=error_message) + + +class PipelineExpression(PipelineExpressionMixin): + """Pipeline expression entity. + + Use PipelineExpression to support simple and trivial parameter transformation tasks with constants + or other parameters. Operations are recorded in this class during executions, and expected result + will be generated for corresponding scenario. + """ + + _PIPELINE_INPUT_PREFIX = ["parent", "inputs"] + _PIPELINE_INPUT_PATTERN = re.compile(pattern=r"parent.inputs.(?P<pipeline_input_name>[^.]+)") + _PIPELINE_INPUT_NAME_GROUP = "pipeline_input_name" + # AML type to Python type, for generated Python code + _TO_PYTHON_TYPE = { + ComponentParameterTypes.BOOLEAN: bool.__name__, + ComponentParameterTypes.INTEGER: int.__name__, + ComponentParameterTypes.NUMBER: float.__name__, + ComponentParameterTypes.STRING: str.__name__, + } + + _INDENTATION = " " + _IMPORT_MLDESIGNER_LINE = "from mldesigner import command_component, Output" + _DECORATOR_LINE = "@command_component(@@decorator_parameters@@)" + _COMPONENT_FUNC_NAME = "expression_func" + _COMPONENT_FUNC_DECLARATION_LINE = ( + f"def {_COMPONENT_FUNC_NAME}(@@component_parameters@@)" " -> Output(type=@@return_type@@):" + ) + _PYTHON_CACHE_FOLDER_NAME = "__pycache__" + + def __init__(self, postfix: List[str], inputs: Dict[str, ExpressionInput]): + self._postfix = postfix + self._inputs = inputs.copy() # including PiplineInput and Output, extra stored name and type + self._result_type: Optional[str] = None + self._created_component = None + + @property + def expression(self) -> str: + """Infix expression string, wrapped with parentheses. + + :return: The infix expression + :rtype: str + """ + return self._to_infix() + + def __str__(self) -> str: + return self._to_data_binding() + + def _data_binding(self) -> str: + return self._to_data_binding() + + def _to_infix(self) -> str: + stack = [] + for token in self._postfix: + if token not in _SUPPORTED_OPERATORS: + stack.append(token) + continue + operand2, operand1 = stack.pop(), stack.pop() + stack.append(f"({operand1} {token} {operand2})") + return stack.pop() + + # pylint: disable=too-many-statements + @staticmethod + def _handle_operand( + operand: "PipelineExpression", + postfix: List[str], + expression_inputs: Dict[str, ExpressionInput], + pipeline_inputs: dict, + ) -> Tuple[List[str], Dict[str, ExpressionInput]]: + """Handle operand in expression, update postfix expression and expression inputs. + + :param operand: The operand + :type operand: "PipelineExpression" + :param postfix: + :type postfix: List[str] + :param expression_inputs: The expression inputs + :type expression_inputs: Dict[str, ExpressionInput] + :param pipeline_inputs: The pipeline inputs + :type pipeline_inputs: dict + :return: A 2-tuple of the updated postfix expression and expression inputs + :rtype: Tuple[List[str], Dict[str, ExpressionInput]] + """ + from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput + + def _update_postfix(_postfix: List[str], _old_name: str, _new_name: str) -> List[str]: + return list(map(lambda _x: _new_name if _x == _old_name else _x, _postfix)) + + def _get_or_create_input_name( + _original_name: str, + _operand: Union[PipelineInput, NodeOutput], + _expression_inputs: Dict[str, ExpressionInput], + ) -> str: + """Get or create expression input name as current operand may have appeared in expression. + + :param _original_name: The original name + :type _original_name: str + :param _operand: The expression operand + :type _operand: Union[PipelineInput, NodeOutput] + :param _expression_inputs: The expression inputs + :type _expression_inputs: Dict[str, ExpressionInput] + :return: The input name + :rtype: str + """ + _existing_id_to_name = {id(_v.value): _k for _k, _v in _expression_inputs.items()} + if id(_operand) in _existing_id_to_name: + return _existing_id_to_name[id(_operand)] + # use a counter to generate a unique name for current operand + _name, _counter = _original_name, 0 + while _name in _expression_inputs: + _name = f"{_original_name}_{_counter}" + _counter += 1 + return _name + + def _handle_pipeline_input( + _pipeline_input: PipelineInput, + _postfix: List[str], + _expression_inputs: Dict[str, ExpressionInput], + ) -> Tuple[List[str], dict]: + _name = _pipeline_input._port_name + # 1. use name with counter for pipeline input; 2. add component's name to component output + if _name in _expression_inputs: + _seen_input = _expression_inputs[_name] + if isinstance(_seen_input.value, PipelineInput): + _name = _get_or_create_input_name(_name, _pipeline_input, _expression_inputs) + else: + _expression_inputs.pop(_name) + _new_name = f"{_seen_input.value._owner.component.name}__{_seen_input.value._port_name}" + _postfix = _update_postfix(_postfix, _name, _new_name) + _expression_inputs[_new_name] = ExpressionInput(_new_name, _seen_input.type, _seen_input) + _postfix.append(_name) + + param_input = pipeline_inputs + for group_name in _pipeline_input._group_names: + param_input = param_input[group_name].values + _expression_inputs[_name] = ExpressionInput( + _name, param_input[_pipeline_input._port_name].type, _pipeline_input + ) + return _postfix, _expression_inputs + + def _handle_component_output( + _component_output: NodeOutput, + _postfix: List[str], + _expression_inputs: Dict[str, ExpressionInput], + ) -> Tuple[List[str], dict]: + if _component_output._meta is not None and not _component_output._meta._is_primitive_type: + error_message = ( + f"Component output {_component_output._port_name} in expression must " + f"be a primitive type with value {True!r}, " + f"got {_component_output._meta._is_primitive_type!r}" + ) + raise UserErrorException(message=error_message, no_personal_data_message=error_message) + _name = _component_output._port_name + _has_prefix = False + # "output" is the default output name for command component, add component's name as prefix + if _name == "output": + if _component_output._owner is not None and not isinstance(_component_output._owner.component, str): + _name = f"{_component_output._owner.component.name}__output" + _has_prefix = True + # following loop is expected to execute at most twice: + # 1. add component's name to output(s) + # 2. use name with counter + while _name in _expression_inputs: + _seen_input = _expression_inputs[_name] + if isinstance(_seen_input.value, PipelineInput): + if not _has_prefix: + if _component_output._owner is not None and not isinstance( + _component_output._owner.component, str + ): + _name = f"{_component_output._owner.component.name}__{_component_output._port_name}" + _has_prefix = True + continue + _name = _get_or_create_input_name(_name, _component_output, _expression_inputs) + else: + if not _has_prefix: + _expression_inputs.pop(_name) + _new_name = f"{_seen_input.value._owner.component.name}__{_seen_input.value._port_name}" + _postfix = _update_postfix(_postfix, _name, _new_name) + _expression_inputs[_new_name] = ExpressionInput(_new_name, _seen_input.type, _seen_input) + if _component_output._owner is not None and not isinstance( + _component_output._owner.component, str + ): + _name = f"{_component_output._owner.component.name}__{_component_output._port_name}" + _has_prefix = True + _name = _get_or_create_input_name(_name, _component_output, _expression_inputs) + _postfix.append(_name) + _expression_inputs[_name] = ExpressionInput(_name, _component_output.type, _component_output) + return _postfix, _expression_inputs + + if operand is None or isinstance(operand, PipelineExpression._SUPPORTED_PRIMITIVE_TYPES): + postfix.append(repr(operand)) + elif isinstance(operand, PipelineInput): + postfix, expression_inputs = _handle_pipeline_input(operand, postfix, expression_inputs) + elif isinstance(operand, NodeOutput): + postfix, expression_inputs = _handle_component_output(operand, postfix, expression_inputs) + elif isinstance(operand, PipelineExpression): + postfix.extend(operand._postfix.copy()) + expression_inputs.update(operand._inputs.copy()) + return postfix, expression_inputs + + @staticmethod + def _from_operation(operand1: Any, operand2: Any, operator: str) -> "PipelineExpression": + if operator not in _SUPPORTED_OPERATORS: + error_message = ( + f"Operator '{operator}' is not supported operator, " + f"currently supported operators are {','.join(_SUPPORTED_OPERATORS)}." + ) + raise UserErrorException(message=error_message, no_personal_data_message=error_message) + + # get all pipeline input types from builder stack + # TODO: check if there is pipeline input we cannot know its type (missing in `PipelineComponentBuilder.inputs`)? + from azure.ai.ml.dsl._pipeline_component_builder import _definition_builder_stack + + res = _definition_builder_stack.top() + pipeline_inputs = res.inputs if res is not None else {} + postfix: List[str] = [] + inputs: Dict[str, ExpressionInput] = {} + postfix, inputs = PipelineExpression._handle_operand(operand1, postfix, inputs, pipeline_inputs) + postfix, inputs = PipelineExpression._handle_operand(operand2, postfix, inputs, pipeline_inputs) + postfix.append(operator) + return PipelineExpression(postfix, inputs) + + @property + def _string_concatenation(self) -> bool: + """If all operands are string and operations are addition, it is a string concatenation expression. + + :return: Whether this represents string concatenation + :rtype: bool + """ + for token in self._postfix: + # operator can only be "+" for string concatenation + if token in _SUPPORTED_OPERATORS: + if token != PipelineExpressionOperator.ADD: + return False + continue + # constant and PiplineInput should be type string + if token in self._inputs: + if self._inputs[token].type != ComponentParameterTypes.STRING: + return False + else: + if not isinstance(eval(token), str): # pylint: disable=eval-used # nosec + return False + return True + + def _to_data_binding(self) -> str: + """Convert operands to data binding and concatenate them in the order of postfix expression. + + :return: The data binding + :rtype: str + """ + if not self._string_concatenation: + error_message = ( + "Only string concatenation expression is supported to convert to data binding, " + f"current expression is '{self.expression}'." + ) + raise UserErrorException(message=error_message, no_personal_data_message=error_message) + + stack = [] + for token in self._postfix: + if token != PipelineExpressionOperator.ADD: + if token in self._inputs: + stack.append(self._inputs[token].value._data_binding()) + else: + stack.append(eval(token)) # pylint: disable=eval-used # nosec + continue + operand2, operand1 = stack.pop(), stack.pop() + stack.append(operand1 + operand2) + res: str = stack.pop() + return res + + def resolve(self) -> Union[str, "BaseNode"]: + """Resolve expression to data binding or component, depend on the operations. + + :return: The data binding string or the component + :rtype: Union[str, BaseNode] + """ + if self._string_concatenation: + return self._to_data_binding() + return cast(Union[str, "BaseNode"], self._create_component()) + + @staticmethod + def parse_pipeline_inputs_from_data_binding(data_binding: str) -> List[str]: + """Parse all PipelineInputs name from data binding expression. + + :param data_binding: Data binding expression + :type data_binding: str + :return: List of PipelineInput's name from given data binding expression + :rtype: List[str] + """ + pipeline_input_names = [] + for single_data_binding in get_all_data_binding_expressions( + value=data_binding, + binding_prefix=PipelineExpression._PIPELINE_INPUT_PREFIX, + is_singular=False, + ): + m = PipelineExpression._PIPELINE_INPUT_PATTERN.match(single_data_binding) + # `get_all_data_binding_expressions` should work as pre-filter, so no need to concern `m` is None + if m is not None: + pipeline_input_names.append(m.group(PipelineExpression._PIPELINE_INPUT_NAME_GROUP)) + return pipeline_input_names + + @staticmethod + def _get_operation_result_type(type1: str, operator: str, type2: str) -> str: + def _validate_operand_type(_type: str) -> None: + if _type != NONE_PARAMETER_TYPE and _type not in PipelineExpression._SUPPORTED_PIPELINE_INPUT_TYPES: + error_message = ( + f"Pipeline input type {_type!r} is not supported in expression; " + f"currently only support None, " + + ", ".join(PipelineExpression._SUPPORTED_PIPELINE_INPUT_TYPES) + + "." + ) + raise UserErrorException(message=error_message, no_personal_data_message=error_message) + + _validate_operand_type(type1) + _validate_operand_type(type2) + operation = f"{type1} {operator} {type2}" + lookup_value = _OPERATION_RESULT_TYPE_LOOKUP.get(operation) + if isinstance(lookup_value, str): + return lookup_value # valid operation, return result type + _user_exception: UserErrorException = lookup_value + raise _user_exception # invalid operation, raise UserErrorException + + def _get_operand_type(self, operand: str) -> str: + if operand in self._inputs: + res: str = self._inputs[operand].type + return res + primitive_type = type(eval(operand)) # pylint: disable=eval-used # nosec + res_type: str = IOConstants.PRIMITIVE_TYPE_2_STR.get(primitive_type, NONE_PARAMETER_TYPE) + return res_type + + @property + def _component_code(self) -> str: + def _generate_function_code_lines() -> Tuple[List[str], str]: + """Return lines of code and return type. + + :return: A 2-tuple of (function body, return type name) + :rtype: Tuple[List[str], str] + """ + _inter_id, _code, _stack = 0, [], [] + _line_recorder: Dict = {} + for _token in self._postfix: + if _token not in _SUPPORTED_OPERATORS: + _type = self._get_operand_type(_token) + _stack.append((_token, _type)) + continue + _operand2, _type2 = _stack.pop() + _operand1, _type1 = _stack.pop() + _current_line = f"{_operand1} {_token} {_operand2}" + if _current_line in _line_recorder: + _inter_var, _inter_var_type = _line_recorder[_current_line] + else: + _inter_var = f"inter_var_{_inter_id}" + _inter_id += 1 + _inter_var_type = self._get_operation_result_type(_type1, _token, _type2) + _code.append(f"{self._INDENTATION}{_inter_var} = {_current_line}") + _line_recorder[_current_line] = (_inter_var, _inter_var_type) + _stack.append((_inter_var, _inter_var_type)) + _return_var, _result_type = _stack.pop() + _code.append(f"{self._INDENTATION}return {_return_var}") + return _code, _result_type + + def _generate_function_decorator_and_declaration_lines(_return_type: str) -> List[str]: + # decorator parameters + _display_name = f'{self._INDENTATION}display_name="Expression: {self.expression}",' + _decorator_parameters = "\n" + "\n".join([_display_name]) + "\n" + # component parameters + _component_parameters = [] + for _name in sorted(self._inputs): + _type = self._TO_PYTHON_TYPE[self._inputs[_name].type] + _component_parameters.append(f"{_name}: {_type}") + _component_parameters_str = ( + "\n" + + "\n".join( + [f"{self._INDENTATION}{_component_parameter}," for _component_parameter in _component_parameters] + ) + + "\n" + ) + return [ + self._IMPORT_MLDESIGNER_LINE + "\n\n", + self._DECORATOR_LINE.replace("@@decorator_parameters@@", _decorator_parameters), + self._COMPONENT_FUNC_DECLARATION_LINE.replace( + "@@component_parameters@@", _component_parameters_str + ).replace("@@return_type@@", f'"{_return_type}"'), + ] + + lines, result_type = _generate_function_code_lines() + self._result_type = result_type + code = _generate_function_decorator_and_declaration_lines(result_type) + lines + return "\n".join(code) + "\n" + + def _create_component(self) -> Any: + def _generate_python_file(_folder: Path) -> None: + _folder.mkdir() + with open(_folder / "expression_component.py", "w", encoding=DefaultOpenEncoding.WRITE) as _f: + _f.write(self._component_code) + + def _generate_yaml_file(_path: Path) -> None: + _data_folder = Path(__file__).parent / "data" + # update YAML content from template and dump + with open(_data_folder / "expression_component_template.yml", "r", encoding=DefaultOpenEncoding.READ) as _f: + _data = load_yaml(_f) + _data["display_name"] = f"Expression: {self.expression}" + _data["inputs"] = {} + _data["outputs"]["output"]["type"] = self._result_type + _command_inputs_items = [] + for _name in sorted(self._inputs): + _type = self._inputs[_name].type + _data["inputs"][_name] = {"type": _type} + _command_inputs_items.append(_name + '="${{inputs.' + _name + '}}"') + _command_inputs_string = " ".join(_command_inputs_items) + _command_output_string = 'output="${{outputs.output}}"' + _command = ( + "mldesigner execute --source expression_component.py --name expression_func" + " --inputs " + _command_inputs_string + " --outputs " + _command_output_string + ) + _data["command"] = _data["command"].format(command_placeholder=_command) + dump_yaml_to_file(_path, _data) + + if self._created_component is None: + tmp_folder = Path(tempfile.mkdtemp()) + code_folder = tmp_folder / "src" + yaml_path = tmp_folder / "component_spec.yml" + _generate_python_file(code_folder) + _generate_yaml_file(yaml_path) + + from azure.ai.ml import load_component + + component_func = load_component(yaml_path) + component_kwargs = {k: v.value for k, v in self._inputs.items()} + self._created_component = component_func(**component_kwargs) + if self._created_component is not None: + self._created_component.environment_variables = {AZUREML_PRIVATE_FEATURES_ENV_VAR: "true"} + return self._created_component diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py new file mode 100644 index 00000000..3a7d89e7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py @@ -0,0 +1,182 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import re +from typing import Dict, List, Tuple, Type, Union + +from azure.ai.ml._restclient.v2023_04_01_preview.models import InputDeliveryMode +from azure.ai.ml._restclient.v2023_04_01_preview.models import JobInput as RestJobInput +from azure.ai.ml._restclient.v2023_04_01_preview.models import JobOutput as RestJobOutput +from azure.ai.ml._restclient.v2023_04_01_preview.models import Mpi, PyTorch, Ray, TensorFlow +from azure.ai.ml.constants._component import ComponentJobConstants +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job._input_output_helpers import ( + INPUT_MOUNT_MAPPING_FROM_REST, + INPUT_MOUNT_MAPPING_TO_REST, + OUTPUT_MOUNT_MAPPING_FROM_REST, + OUTPUT_MOUNT_MAPPING_TO_REST, +) +from azure.ai.ml.entities._util import normalize_job_input_output_type +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException + + +def process_sdk_component_job_io( + io: Dict, + io_binding_regex_list: List[str], +) -> Tuple: + """Separates SDK ComponentJob inputs that are data bindings (i.e. string inputs prefixed with 'inputs.' or + 'outputs.') and dataset and literal inputs/outputs. + + :param io: Input or output dictionary of an SDK ComponentJob + :type io: Dict[str, Union[str, float, bool, Input]] + :param io_binding_regex_list: A list of regexes for io bindings + :type io_binding_regex_list: List[str] + :return: A tuple of dictionaries: + * One mapping inputs to REST formatted ComponentJobInput/ComponentJobOutput for data binding io. + * The other dictionary contains any IO that is not a databinding that is yet to be turned into REST form + :rtype: Tuple[Dict[str, str], Dict[str, Union[str, float, bool, Input]]] + """ + io_bindings: Dict = {} + dataset_literal_io: Dict = {} + legacy_io_binding_regex_list = [ + ComponentJobConstants.LEGACY_INPUT_PATTERN, + ComponentJobConstants.LEGACY_OUTPUT_PATTERN, + ] + for io_name, io_value in io.items(): + if isinstance(io_value, (Input, Output)) and isinstance(io_value.path, str): + mode = io_value.mode + path = io_value.path + name = io_value.name if hasattr(io_value, "name") else None + version = io_value.version if hasattr(io_value, "version") else None + if any(re.match(item, path) for item in io_binding_regex_list): + # Yaml syntax requires using ${{}} to enclose inputs and outputs bindings + # io_bindings[io_name] = io_value + io_bindings.update({io_name: {"value": path}}) + # add mode to literal value for binding input + if mode: + if isinstance(io_value, Input): + io_bindings[io_name].update({"mode": INPUT_MOUNT_MAPPING_TO_REST[mode]}) + else: + io_bindings[io_name].update({"mode": OUTPUT_MOUNT_MAPPING_TO_REST[mode]}) + if name or version: + assert isinstance(io_value, Output) + if name: + io_bindings[io_name].update({"name": name}) + if version: + io_bindings[io_name].update({"version": version}) + if isinstance(io_value, Output) and io_value.name is not None: + # when the output should be registered, + # we add io_value to dataset_literal_io for further to_rest_data_outputs + dataset_literal_io[io_name] = io_value + elif any(re.match(item, path) for item in legacy_io_binding_regex_list): + new_format = path.replace("{{", "{{parent.") + msg = "{} has changed to {}, please change to use new format." + raise ValidationException( + message=msg.format(path, new_format), + no_personal_data_message=msg.format("[io_value]", "[io_value_new_format]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + else: + dataset_literal_io[io_name] = io_value + else: + # Collect non-input data inputs + dataset_literal_io[io_name] = io_value + return io_bindings, dataset_literal_io + + +def from_dict_to_rest_io( + io: Dict[str, Union[str, dict]], + rest_object_class: Union[Type[RestJobInput], Type[RestJobOutput]], + io_binding_regex_list: List[str], +) -> Tuple[Dict[str, str], Dict[str, Union[RestJobInput, RestJobOutput]]]: + """Translate rest JObject dictionary to rest inputs/outputs and bindings. + + :param io: Input or output dictionary. + :type io: Dict[str, Union[str, dict]] + :param rest_object_class: RestJobInput or RestJobOutput + :type rest_object_class: Union[Type[RestJobInput], Type[RestJobOutput]] + :param io_binding_regex_list: A list of regexes for io bindings + :type io_binding_regex_list: List[str] + :return: Map from IO name to IO bindings and Map from IO name to IO objects. + :rtype: Tuple[Dict[str, str], Dict[str, Union[RestJobInput, RestJobOutput]]] + """ + io_bindings: dict = {} + rest_io_objects = {} + DIRTY_MODE_MAPPING = { + "Mount": InputDeliveryMode.READ_ONLY_MOUNT, + "RoMount": InputDeliveryMode.READ_ONLY_MOUNT, + "RwMount": InputDeliveryMode.READ_WRITE_MOUNT, + } + for key, val in io.items(): + if isinstance(val, dict): + # convert the input of camel to snake to be compatible with the Jun api + # todo: backend help convert node level input/output type + normalize_job_input_output_type(val) + + # Add casting as sometimes we got value like 1(int) + io_value = str(val.get("value", "")) + io_mode = val.get("mode", None) + io_name = val.get("name", None) + io_version = val.get("version", None) + if any(re.match(item, io_value) for item in io_binding_regex_list): + io_bindings.update({key: {"path": io_value}}) + # add mode to literal value for binding input + if io_mode: + # deal with dirty mode data submitted before + if io_mode in DIRTY_MODE_MAPPING: + io_mode = DIRTY_MODE_MAPPING[io_mode] + val["mode"] = io_mode + if io_mode in OUTPUT_MOUNT_MAPPING_FROM_REST: + io_bindings[key].update({"mode": OUTPUT_MOUNT_MAPPING_FROM_REST[io_mode]}) + else: + io_bindings[key].update({"mode": INPUT_MOUNT_MAPPING_FROM_REST[io_mode]}) + # add name and version for binding input + if io_name or io_version: + assert rest_object_class.__name__ == "JobOutput" + # current code only support dump name and version for JobOutput + # this assert can be deleted if we need to dump name/version for JobInput + if io_name: + io_bindings[key].update({"name": io_name}) + if io_version: + io_bindings[key].update({"version": io_version}) + if not io_mode and not io_name and not io_version: + io_bindings[key] = io_value + else: + if rest_object_class.__name__ == "JobOutput": + # current code only support dump name and version for JobOutput + # this condition can be deleted if we need to dump name/version for JobInput + if "name" in val.keys(): + val["asset_name"] = val.pop("name") + if "version" in val.keys(): + val["asset_version"] = val.pop("version") + rest_obj = rest_object_class.from_dict(val) + rest_io_objects[key] = rest_obj + else: + msg = "Got unsupported type of input/output: {}:" + f"{type(val)}" + raise ValidationException( + message=msg.format(val), + no_personal_data_message=msg.format("[val]"), + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + return io_bindings, rest_io_objects + + +def from_dict_to_rest_distribution(distribution_dict: Dict) -> Union[PyTorch, Mpi, TensorFlow, Ray]: + target_type = distribution_dict["distribution_type"].lower() + if target_type == "pytorch": + return PyTorch(**distribution_dict) + if target_type == "mpi": + return Mpi(**distribution_dict) + if target_type == "tensorflow": + return TensorFlow(**distribution_dict) + if target_type == "ray": + return Ray(**distribution_dict) + msg = "Distribution type must be pytorch, mpi, tensorflow or ray: {}".format(target_type) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/data/expression_component_template.yml b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/data/expression_component_template.yml new file mode 100644 index 00000000..10d391aa --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/data/expression_component_template.yml @@ -0,0 +1,16 @@ +$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json +type: command + +name: expression_component +version: 1 + +outputs: + output: + is_control: true + +code: ./src + +environment: azureml://registries/azureml/environments/mldesigner/labels/latest + +command: >- + {command_placeholder} diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job.py new file mode 100644 index 00000000..7ddbbc46 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job.py @@ -0,0 +1,711 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access +import itertools +import logging +import typing +from functools import partial +from pathlib import Path +from typing import Any, Dict, Generator, List, Optional, Union, cast + +from typing_extensions import Literal + +from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase +from azure.ai.ml._restclient.v2024_01_01_preview.models import PipelineJob as RestPipelineJob +from azure.ai.ml._schema import PathAwareSchema +from azure.ai.ml._schema.pipeline.pipeline_job import PipelineJobSchema +from azure.ai.ml._utils._arm_id_utils import get_resource_name_from_arm_id_safe +from azure.ai.ml._utils.utils import ( + camel_to_snake, + is_data_binding_expression, + is_private_preview_enabled, + transform_dict_keys, +) +from azure.ai.ml.constants import JobType +from azure.ai.ml.constants._common import AZUREML_PRIVATE_FEATURES_ENV_VAR, BASE_PATH_CONTEXT_KEY +from azure.ai.ml.constants._component import ComponentSource +from azure.ai.ml.constants._job.pipeline import ValidationErrorCode +from azure.ai.ml.entities._builders import BaseNode +from azure.ai.ml.entities._builders.condition_node import ConditionNode +from azure.ai.ml.entities._builders.control_flow_node import LoopNode +from azure.ai.ml.entities._builders.import_node import Import +from azure.ai.ml.entities._builders.parallel import Parallel +from azure.ai.ml.entities._builders.pipeline import Pipeline +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._component.pipeline_component import PipelineComponent + +# from azure.ai.ml.entities._job.identity import AmlToken, Identity, ManagedIdentity, UserIdentity +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, + _BaseJobIdentityConfiguration, +) +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._inputs_outputs.group_input import GroupInput +from azure.ai.ml.entities._job._input_output_helpers import ( + from_rest_data_outputs, + from_rest_inputs_to_dataset_literal, + to_rest_data_outputs, + to_rest_dataset_literal_inputs, +) +from azure.ai.ml.entities._job.import_job import ImportJob +from azure.ai.ml.entities._job.job import Job +from azure.ai.ml.entities._job.job_service import JobServiceBase +from azure.ai.ml.entities._job.pipeline._io import PipelineInput, PipelineJobIOMixin +from azure.ai.ml.entities._job.pipeline.pipeline_job_settings import PipelineJobSettings +from azure.ai.ml.entities._mixins import YamlTranslatableMixin +from azure.ai.ml.entities._system_data import SystemData +from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin +from azure.ai.ml.exceptions import ErrorTarget, UserErrorException, ValidationException + +module_logger = logging.getLogger(__name__) + + +class PipelineJob(Job, YamlTranslatableMixin, PipelineJobIOMixin, PathAwareSchemaValidatableMixin): + """Pipeline job. + + You should not instantiate this class directly. Instead, you should + use the `@pipeline` decorator to create a `PipelineJob`. + + :param component: Pipeline component version. The field is mutually exclusive with 'jobs'. + :type component: Union[str, ~azure.ai.ml.entities._component.pipeline_component.PipelineComponent] + :param inputs: Inputs to the pipeline job. + :type inputs: dict[str, Union[~azure.ai.ml.entities.Input, str, bool, int, float]] + :param outputs: Outputs of the pipeline job. + :type outputs: dict[str, ~azure.ai.ml.entities.Output] + :param name: Name of the PipelineJob. Defaults to None. + :type name: str + :param description: Description of the pipeline job. Defaults to None + :type description: str + :param display_name: Display name of the pipeline job. Defaults to None + :type display_name: str + :param experiment_name: Name of the experiment the job will be created under. + If None is provided, the experiment will be set to the current directory. Defaults to None + :type experiment_name: str + :param jobs: Pipeline component node name to component object. Defaults to None + :type jobs: dict[str, ~azure.ai.ml.entities._builders.BaseNode] + :param settings: Setting of the pipeline job. Defaults to None + :type settings: ~azure.ai.ml.entities.PipelineJobSettings + :param identity: Identity that the training job will use while running on compute. Defaults to None + :type identity: Union[ + ~azure.ai.ml.entities._credentials.ManagedIdentityConfiguration, + ~azure.ai.ml.entities._credentials.AmlTokenConfiguration, + ~azure.ai.ml.entities._credentials.UserIdentityConfiguration + + ] + :param compute: Compute target name of the built pipeline. Defaults to None + :type compute: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None + :type tags: dict[str, str] + :param kwargs: A dictionary of additional configuration parameters. Defaults to None + :type kwargs: dict + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_pipeline_job_configurations.py + :start-after: [START configure_pipeline_job_and_settings] + :end-before: [END configure_pipeline_job_and_settings] + :language: python + :dedent: 8 + :caption: Shows how to create a pipeline using this class. + """ + + def __init__( + self, + *, + component: Optional[Union[str, PipelineComponent, Component]] = None, + inputs: Optional[Dict[str, Union[Input, str, bool, int, float]]] = None, + outputs: Optional[Dict[str, Output]] = None, + name: Optional[str] = None, + description: Optional[str] = None, + display_name: Optional[str] = None, + experiment_name: Optional[str] = None, + jobs: Optional[Dict[str, BaseNode]] = None, + settings: Optional[PipelineJobSettings] = None, + identity: Optional[ + Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration] + ] = None, + compute: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> None: + # initialize io + inputs, outputs = inputs or {}, outputs or {} + if isinstance(component, PipelineComponent) and component._source in [ + ComponentSource.DSL, + ComponentSource.YAML_COMPONENT, + ]: + self._inputs = self._build_inputs_dict(inputs, input_definition_dict=component.inputs) + # for pipeline component created pipeline jobs, + # it's output should have same value with the component outputs, + # then override it with given outputs (filter out None value) + pipeline_outputs = {k: v for k, v in (outputs or {}).items() if v} + self._outputs = self._build_pipeline_outputs_dict({**component.outputs, **pipeline_outputs}) + else: + # Build inputs/outputs dict without meta when definition not available + self._inputs = self._build_inputs_dict(inputs) + # for node created pipeline jobs, + # it's output should have same value with the given outputs + self._outputs = self._build_pipeline_outputs_dict(outputs=outputs) + source = kwargs.pop("_source", ComponentSource.CLASS) + if component is None: + component = PipelineComponent( + jobs=jobs, + description=description, + display_name=display_name, + base_path=kwargs.get(BASE_PATH_CONTEXT_KEY), + _source=source, + ) + + # If component is Pipeline component, jobs will be component.jobs + self._jobs = (jobs or {}) if isinstance(component, str) else {} + + self.component: Union[PipelineComponent, str] = cast(Union[PipelineComponent, str], component) + if "type" not in kwargs: + kwargs["type"] = JobType.PIPELINE + if isinstance(component, PipelineComponent): + description = component.description if description is None else description + display_name = component.display_name if display_name is None else display_name + super(PipelineJob, self).__init__( + name=name, + description=description, + tags=tags, + display_name=display_name, + experiment_name=experiment_name, + compute=compute, + **kwargs, + ) + + self._remove_pipeline_input() + self.compute = compute + self._settings: Any = None + self.settings = settings + self.identity = identity + # TODO: remove default code & environment? + self._default_code = None + self._default_environment = None + + @property + def inputs(self) -> Dict: + """Inputs of the pipeline job. + + :return: Inputs of the pipeline job. + :rtype: dict[str, Union[~azure.ai.ml.entities.Input, str, bool, int, float]] + """ + return self._inputs + + @property + def outputs(self) -> Dict[str, Union[str, Output]]: + """Outputs of the pipeline job. + + :return: Outputs of the pipeline job. + :rtype: dict[str, Union[str, ~azure.ai.ml.entities.Output]] + """ + return self._outputs + + @property + def jobs(self) -> Dict: + """Return jobs of pipeline job. + + :return: Jobs of pipeline job. + :rtype: dict + """ + res: dict = self.component.jobs if isinstance(self.component, PipelineComponent) else self._jobs + return res + + @property + def settings(self) -> Optional[PipelineJobSettings]: + """Settings of the pipeline job. + + :return: Settings of the pipeline job. + :rtype: ~azure.ai.ml.entities.PipelineJobSettings + """ + if self._settings is None: + self._settings = PipelineJobSettings() + res: Optional[PipelineJobSettings] = self._settings + return res + + @settings.setter + def settings(self, value: Union[Dict, PipelineJobSettings]) -> None: + """Set the pipeline job settings. + + :param value: The pipeline job settings. + :type value: Union[dict, ~azure.ai.ml.entities.PipelineJobSettings] + """ + if value is not None: + if isinstance(value, PipelineJobSettings): + # since PipelineJobSettings inherit _AttrDict, we need add this branch to distinguish with dict + pass + elif isinstance(value, dict): + value = PipelineJobSettings(**value) + else: + raise TypeError("settings must be PipelineJobSettings or dict but got {}".format(type(value))) + self._settings = value + + @classmethod + def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException: + return ValidationException( + message=message, + no_personal_data_message=no_personal_data_message, + target=ErrorTarget.PIPELINE, + ) + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema: + # import this to ensure that nodes are registered before schema is created. + + return PipelineJobSchema(context=context) + + @classmethod + def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]: + # jobs validations are done in _customized_validate() + return ["component", "jobs"] + + @property + def _skip_required_compute_missing_validation(self) -> Literal[True]: + return True + + def _validate_compute_is_set(self) -> MutableValidationResult: + validation_result = self._create_empty_validation_result() + if self.compute is not None: + return validation_result + if self.settings is not None and self.settings.default_compute is not None: + return validation_result + + if not isinstance(self.component, str): + validation_result.merge_with(self.component._validate_compute_is_set()) + return validation_result + + def _customized_validate(self) -> MutableValidationResult: + """Validate that all provided inputs and parameters are valid for current pipeline and components in it. + + :return: The validation result + :rtype: MutableValidationResult + """ + validation_result = super(PipelineJob, self)._customized_validate() + + if isinstance(self.component, PipelineComponent): + # Merge with pipeline component validate result for structure validation. + # Skip top level parameter missing type error + validation_result.merge_with( + self.component._customized_validate(), + condition_skip=lambda x: x.error_code == ValidationErrorCode.PARAMETER_TYPE_UNKNOWN + and x.yaml_path.startswith("inputs"), + ) + # Validate compute + validation_result.merge_with(self._validate_compute_is_set()) + # Validate Input + validation_result.merge_with(self._validate_input()) + # Validate initialization & finalization jobs + validation_result.merge_with(self._validate_init_finalize_job()) + + return validation_result + + def _validate_input(self) -> MutableValidationResult: + validation_result = self._create_empty_validation_result() + if not isinstance(self.component, str): + # TODO(1979547): refine this logic: not all nodes have `_get_input_binding_dict` method + used_pipeline_inputs = set( + itertools.chain( + *[ + self.component._get_input_binding_dict(node if not isinstance(node, LoopNode) else node.body)[0] + for node in self.jobs.values() + if not isinstance(node, ConditionNode) + # condition node has no inputs + ] + ) + ) + # validate inputs + if not isinstance(self.component, Component): + return validation_result + for key, meta in self.component.inputs.items(): + if key not in used_pipeline_inputs: # pylint: disable=possibly-used-before-assignment + # Only validate inputs certainly used. + continue + # raise error when required input with no default value not set + if ( + self.inputs.get(key, None) is None # input not provided + and meta.optional is not True # and it's required + and meta.default is None # and it does not have default + ): + name = self.name or self.display_name + name = f"{name!r} " if name else "" + validation_result.append_error( + yaml_path=f"inputs.{key}", + message=f"Required input {key!r} for pipeline {name}not provided.", + ) + return validation_result + + def _validate_init_finalize_job(self) -> MutableValidationResult: # pylint: disable=too-many-statements + from azure.ai.ml.entities._job.pipeline._io import InputOutputBase, _GroupAttrDict + + validation_result = self._create_empty_validation_result() + # subgraph (PipelineComponent) should not have on_init/on_finalize set + for job_name, job in self.jobs.items(): + if job.type != "pipeline": + continue + if job.settings.on_init: + validation_result.append_error( + yaml_path=f"jobs.{job_name}.settings.on_init", + message="On_init is not supported for pipeline component.", + ) + if job.settings.on_finalize: + validation_result.append_error( + yaml_path=f"jobs.{job_name}.settings.on_finalize", + message="On_finalize is not supported for pipeline component.", + ) + + on_init = None + on_finalize = None + + if self.settings is not None: + # quick return if neither on_init nor on_finalize is set + if self.settings.on_init is None and self.settings.on_finalize is None: + return validation_result + + on_init, on_finalize = self.settings.on_init, self.settings.on_finalize + + append_on_init_error = partial(validation_result.append_error, "settings.on_init") + append_on_finalize_error = partial(validation_result.append_error, "settings.on_finalize") + # on_init and on_finalize cannot be same + if on_init == on_finalize: + append_on_init_error(f"Invalid on_init job {on_init}, it should be different from on_finalize.") + append_on_finalize_error(f"Invalid on_finalize job {on_finalize}, it should be different from on_init.") + # pipeline should have at least one normal node + if len(set(self.jobs.keys()) - {on_init, on_finalize}) == 0: + validation_result.append_error(yaml_path="jobs", message="No other job except for on_init/on_finalize job.") + + def _is_control_flow_node(_validate_job_name: str) -> bool: + from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode + + _validate_job = self.jobs[_validate_job_name] + return issubclass(type(_validate_job), ControlFlowNode) + + def _is_isolated_job(_validate_job_name: str) -> bool: + def _try_get_data_bindings( + _name: str, _input_output_data: Union["_GroupAttrDict", "InputOutputBase"] + ) -> Optional[List]: + """Try to get data bindings from input/output data, return None if not found. + :param _name: The name to use when flattening GroupAttrDict + :type _name: str + :param _input_output_data: The input/output data + :type _input_output_data: Union[_GroupAttrDict, str, InputOutputBase] + :return: A list of data bindings, or None if not found + :rtype: Optional[List[str]] + """ + # handle group input + if GroupInput._is_group_attr_dict(_input_output_data): + _new_input_output_data: _GroupAttrDict = cast(_GroupAttrDict, _input_output_data) + # flatten to avoid nested cases + flattened_values: List[Input] = list(_new_input_output_data.flatten(_name).values()) + # handle invalid empty group + if len(flattened_values) == 0: + return None + return [_value.path for _value in flattened_values] + _input_output_data = _input_output_data._data + if isinstance(_input_output_data, str): + return [_input_output_data] + if not hasattr(_input_output_data, "_data_binding"): + return None + return [_input_output_data._data_binding()] + + _validate_job = self.jobs[_validate_job_name] + # no input to validate job + for _input_name in _validate_job.inputs: + _data_bindings = _try_get_data_bindings(_input_name, _validate_job.inputs[_input_name]) + if _data_bindings is None: + continue + for _data_binding in _data_bindings: + if is_data_binding_expression(_data_binding, ["parent", "jobs"]): + return False + # no output from validate job - iterate other jobs input(s) to validate + for _job_name, _job in self.jobs.items(): + # exclude control flow node as it does not have inputs + if _is_control_flow_node(_job_name): + continue + for _input_name in _job.inputs: + _data_bindings = _try_get_data_bindings(_input_name, _job.inputs[_input_name]) + if _data_bindings is None: + continue + for _data_binding in _data_bindings: + if is_data_binding_expression(_data_binding, ["parent", "jobs", _validate_job_name]): + return False + return True + + # validate on_init + if on_init is not None: + if on_init not in self.jobs: + append_on_init_error(f"On_init job name {on_init} not exists in jobs.") + else: + if _is_control_flow_node(on_init): + append_on_init_error("On_init job should not be a control flow node.") + elif not _is_isolated_job(on_init): + append_on_init_error("On_init job should not have connection to other execution node.") + # validate on_finalize + if on_finalize is not None: + if on_finalize not in self.jobs: + append_on_finalize_error(f"On_finalize job name {on_finalize} not exists in jobs.") + else: + if _is_control_flow_node(on_finalize): + append_on_finalize_error("On_finalize job should not be a control flow node.") + elif not _is_isolated_job(on_finalize): + append_on_finalize_error("On_finalize job should not have connection to other execution node.") + return validation_result + + def _remove_pipeline_input(self) -> None: + """Remove None pipeline input.If not remove, it will pass "None" to backend.""" + redundant_pipeline_inputs = [] + for pipeline_input_name, pipeline_input in self._inputs.items(): + if isinstance(pipeline_input, PipelineInput) and pipeline_input._data is None: + redundant_pipeline_inputs.append(pipeline_input_name) + for redundant_pipeline_input in redundant_pipeline_inputs: + self._inputs.pop(redundant_pipeline_input) + + def _check_private_preview_features(self) -> None: + """Checks is private preview features included in pipeline. + + If private preview environment not set, raise exception. + """ + if not is_private_preview_enabled(): + error_msg = ( + "{} is a private preview feature, " + f"please set environment variable {AZUREML_PRIVATE_FEATURES_ENV_VAR} to true to use it." + ) + # check has not supported nodes + for _, node in self.jobs.items(): + # TODO: Remove in PuP + if isinstance(node, (ImportJob, Import)): + msg = error_msg.format("Import job in pipeline") + raise UserErrorException(message=msg, no_personal_data_message=msg) + + def _to_node(self, context: Optional[Dict] = None, **kwargs: Any) -> "Pipeline": + """Translate a command job to a pipeline node when load schema. + + (Write a pipeline job as node in yaml is not supported presently.) + + :param context: Context of command job YAML file. + :type context: dict + :return: Translated command component. + :rtype: Pipeline + """ + component = self._to_component(context, **kwargs) + + return Pipeline( + component=component, + compute=self.compute, + # Need to supply the inputs with double curly. + inputs=self.inputs, + outputs=self.outputs, + description=self.description, + tags=self.tags, + display_name=self.display_name, + properties=self.properties, + ) + + def _to_rest_object(self) -> JobBase: + """Build current parameterized pipeline instance to a pipeline job object before submission. + + :return: Rest pipeline job. + :rtype: JobBase + """ + # Check if there are private preview features in it + self._check_private_preview_features() + + # Build the inputs to dict. Handle both value & binding assignment. + # Example: { + # "input_data": {"data": {"path": "path/to/input/data"}, "mode"="Mount"}, + # "input_value": 10, + # "learning_rate": "${{jobs.step1.inputs.learning_rate}}" + # } + built_inputs = self._build_inputs() + + # Build the outputs to dict + # example: {"eval_output": "${{jobs.eval.outputs.eval_output}}"} + built_outputs = self._build_outputs() + + if self.settings is not None: + settings_dict = self.settings._to_dict() + + if isinstance(self.component, PipelineComponent): + source = self.component._source + # Build the jobs to dict + rest_component_jobs = self.component._build_rest_component_jobs() + else: + source = ComponentSource.REMOTE_WORKSPACE_JOB + rest_component_jobs = {} + # add _source on pipeline job.settings + if "_source" not in settings_dict: # pylint: disable=possibly-used-before-assignment + settings_dict.update({"_source": source}) + + # TODO: Revisit this logic when multiple types of component jobs are supported + rest_compute = self.compute + # This will be resolved in job_operations _resolve_arm_id_or_upload_dependencies. + component_id = self.component if isinstance(self.component, str) else self.component.id + + # TODO remove it in the future. + # MFE not support pass None or empty input value. Remove the empty inputs in pipeline job. + built_inputs = {k: v for k, v in built_inputs.items() if v is not None and v != ""} + + pipeline_job = RestPipelineJob( + compute_id=rest_compute, + component_id=component_id, + display_name=self.display_name, + tags=self.tags, + description=self.description, + properties=self.properties, + experiment_name=self.experiment_name, + jobs=rest_component_jobs, + inputs=to_rest_dataset_literal_inputs(built_inputs, job_type=self.type), + outputs=to_rest_data_outputs(built_outputs), + settings=settings_dict, + services={k: v._to_rest_object() for k, v in self.services.items()} if self.services else None, + identity=self.identity._to_job_rest_object() if self.identity else None, + ) + + rest_job = JobBase(properties=pipeline_job) + rest_job.name = self.name + return rest_job + + @classmethod + def _load_from_rest(cls, obj: JobBase) -> "PipelineJob": + """Build a pipeline instance from rest pipeline object. + + :param obj: The REST Pipeline Object + :type obj: JobBase + :return: pipeline job. + :rtype: PipelineJob + """ + properties: RestPipelineJob = obj.properties + # Workaround for BatchEndpoint as these fields are not filled in + # Unpack the inputs + from_rest_inputs = from_rest_inputs_to_dataset_literal(properties.inputs) or {} + from_rest_outputs = from_rest_data_outputs(properties.outputs) or {} + # Unpack the component jobs + sub_nodes = PipelineComponent._resolve_sub_nodes(properties.jobs) if properties.jobs else {} + # backend may still store Camel settings, eg: DefaultDatastore, translate them to snake when load back + settings_dict = transform_dict_keys(properties.settings, camel_to_snake) if properties.settings else None + settings_sdk = PipelineJobSettings(**settings_dict) if settings_dict else PipelineJobSettings() + # Create component or use component id + if getattr(properties, "component_id", None): + component = properties.component_id + else: + component = PipelineComponent._load_from_rest_pipeline_job( + { + "inputs": from_rest_inputs, + "outputs": from_rest_outputs, + "display_name": properties.display_name, + "description": properties.description, + "jobs": sub_nodes, + } + ) + + job = PipelineJob( + component=component, + inputs=from_rest_inputs, + outputs=from_rest_outputs, + name=obj.name, + id=obj.id, + jobs=sub_nodes, + display_name=properties.display_name, + tags=properties.tags, + properties=properties.properties, + experiment_name=properties.experiment_name, + status=properties.status, + creation_context=SystemData._from_rest_object(obj.system_data) if obj.system_data else None, + services=JobServiceBase._from_rest_job_services(properties.services) if properties.services else None, + compute=get_resource_name_from_arm_id_safe(properties.compute_id), + settings=settings_sdk, + identity=( + _BaseJobIdentityConfiguration._from_rest_object(properties.identity) if properties.identity else None + ), + ) + + return job + + def _to_dict(self) -> Dict: + res: dict = self._dump_for_validation() + return res + + @classmethod + def _component_items_from_path(cls, data: Dict) -> Generator: + if "jobs" in data: + for node_name, job_instance in data["jobs"].items(): + potential_component_path = job_instance["component"] if "component" in job_instance else None + if isinstance(potential_component_path, str) and potential_component_path.startswith("file:"): + yield node_name, potential_component_path + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "PipelineJob": + path_first_occurrence: dict = {} + component_first_occurrence = {} + for node_name, component_path in cls._component_items_from_path(data): + if component_path in path_first_occurrence: + component_first_occurrence[node_name] = path_first_occurrence[component_path] + # set components to be replaced here may break the validation logic + else: + path_first_occurrence[component_path] = node_name + + # use this instead of azure.ai.ml.entities._util.load_from_dict to avoid parsing + loaded_schema = cls._create_schema_for_validation(context=context).load(data, **kwargs) + + # replace repeat component with first occurrence to reduce arm id resolution + # current load yaml file logic is in azure.ai.ml._schema.core.schema.YamlFileSchema.load_from_file + # is it possible to load the same yaml file only once in 1 pipeline loading? + for node_name, first_occurrence in component_first_occurrence.items(): + job = loaded_schema["jobs"][node_name] + job._component = loaded_schema["jobs"][first_occurrence].component + # For Parallel job, should also align task attribute which is usually from component.task + if isinstance(job, Parallel): + job.task = job._component.task + # parallel.task.code is based on parallel._component.base_path, so need to update it + job._base_path = job._component.base_path + return PipelineJob( + base_path=context[BASE_PATH_CONTEXT_KEY], + _source=ComponentSource.YAML_JOB, + **loaded_schema, + ) + + def __str__(self) -> str: + try: + res_to_yaml: str = self._to_yaml() + return res_to_yaml + except BaseException: # pylint: disable=W0718 + res: str = super(PipelineJob, self).__str__() + return res + + def _get_telemetry_values(self) -> Dict: + telemetry_values: dict = super()._get_telemetry_values() + if isinstance(self.component, PipelineComponent): + telemetry_values.update(self.component._get_telemetry_values()) + else: + telemetry_values.update({"source": ComponentSource.REMOTE_WORKSPACE_JOB}) + telemetry_values.pop("is_anonymous") + return telemetry_values + + def _to_component(self, context: Optional[Dict] = None, **kwargs: Any) -> "PipelineComponent": + """Translate a pipeline job to pipeline component. + + :param context: Context of pipeline job YAML file. + :type context: dict + :return: Translated pipeline component. + :rtype: PipelineComponent + """ + ignored_keys = PipelineComponent._check_ignored_keys(self) + if ignored_keys: + name = self.name or self.display_name + name = f"{name!r} " if name else "" + module_logger.warning("%s ignored when translating PipelineJob %sto PipelineComponent.", ignored_keys, name) + pipeline_job_dict = kwargs.get("pipeline_job_dict", {}) + context = context or {BASE_PATH_CONTEXT_KEY: Path("./")} + + # Create anonymous pipeline component with default version as 1 + return PipelineComponent( + base_path=context[BASE_PATH_CONTEXT_KEY], + display_name=self.display_name, + inputs=self._to_inputs(inputs=self.inputs, pipeline_job_dict=pipeline_job_dict), + outputs=self._to_outputs(outputs=self.outputs, pipeline_job_dict=pipeline_job_dict), + jobs=self.jobs, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job_settings.py new file mode 100644 index 00000000..0fe41e2e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job_settings.py @@ -0,0 +1,75 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import Any, Dict, Generator, Optional + +from azure.ai.ml.entities._job.pipeline._attr_dict import _AttrDict + + +class PipelineJobSettings(_AttrDict): + """Settings of PipelineJob. + + :param default_datastore: The default datastore of the pipeline. + :type default_datastore: str + :param default_compute: The default compute target of the pipeline. + :type default_compute: str + :param continue_on_step_failure: Flag indicating whether to continue pipeline execution if a step fails. + :type continue_on_step_failure: bool + :param force_rerun: Flag indicating whether to force rerun pipeline execution. + :type force_rerun: bool + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_pipeline_job_configurations.py + :start-after: [START configure_pipeline_job_and_settings] + :end-before: [END configure_pipeline_job_and_settings] + :language: python + :dedent: 8 + :caption: Shows how to set pipeline properties using this class. + """ + + def __init__( + self, + default_datastore: Optional[str] = None, + default_compute: Optional[str] = None, + continue_on_step_failure: Optional[bool] = None, + force_rerun: Optional[bool] = None, + **kwargs: Any + ) -> None: + self._init = True + super().__init__() + self.default_compute: Any = default_compute + self.default_datastore: Any = default_datastore + self.continue_on_step_failure = continue_on_step_failure + self.force_rerun = force_rerun + self.on_init = kwargs.get("on_init", None) + self.on_finalize = kwargs.get("on_finalize", None) + for k, v in kwargs.items(): + setattr(self, k, v) + self._init = False + + def _get_valid_keys(self) -> Generator[str, Any, None]: + for k, v in self.__dict__.items(): + if v is None: + continue + # skip private attributes inherited from _AttrDict + if k in ["_logger", "_allowed_keys", "_init", "_key_restriction"]: + continue + yield k + + def _to_dict(self) -> Dict: + result = {} + for k in self._get_valid_keys(): + result[k] = self.__dict__[k] + result.update(self._get_attrs()) + return result + + def _initializing(self) -> bool: + return self._init + + def __bool__(self) -> bool: + for _ in self._get_valid_keys(): + return True + # _attr_dict will return False if no extra attributes are set + return self.__len__() > 0 |
