aboutsummaryrefslogtreecommitdiff
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access
import copy
import inspect
import logging
import typing
from collections import OrderedDict
from inspect import Parameter, signature
from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union

from azure.ai.ml._utils._func_utils import get_outputs_and_locals
from azure.ai.ml._utils.utils import is_valid_node_name, parse_args_description_from_docstring
from azure.ai.ml.constants._component import ComponentSource, IOConstants
from azure.ai.ml.constants._job.pipeline import COMPONENT_IO_KEYWORDS
from azure.ai.ml.dsl._utils import _sanitize_python_variable_name
from azure.ai.ml.entities import PipelineJob
from azure.ai.ml.entities._builders import BaseNode
from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
from azure.ai.ml.entities._component.component import Component
from azure.ai.ml.entities._component.pipeline_component import PipelineComponent
from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output, _get_param_with_standard_annotation
from azure.ai.ml.entities._inputs_outputs.utils import _get_annotation_by_value, is_group
from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
from azure.ai.ml.entities._job.pipeline._attr_dict import has_attr_safe
from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, PipelineOutput, _GroupAttrDict
from azure.ai.ml.entities._util import copy_output_setting

# We need to limit the depth of pipeline to avoid the built graph goes too deep and prevent potential
# stack overflow in dsl.pipeline.
from azure.ai.ml.exceptions import UserErrorException

_BUILDER_STACK_MAX_DEPTH = 100

module_logger = logging.getLogger(__name__)


class _PipelineComponentBuilderStack:
    def __init__(self) -> None:
        self.items: List["PipelineComponentBuilder"] = []

    def top(self) -> Optional["PipelineComponentBuilder"]:
        if self.is_empty():
            return None
        return self.items[-1]

    def pop(self) -> Optional["PipelineComponentBuilder"]:
        if self.is_empty():
            return None
        return self.items.pop()

    def push(self, item: object) -> None:
        error_msg = f"{self.__class__.__name__} only " f"allows pushing `{PipelineComponentBuilder.__name__}` element"
        assert isinstance(item, PipelineComponentBuilder), error_msg

        # TODO: validate cycle
        self.items.append(item)
        if self.size() >= _BUILDER_STACK_MAX_DEPTH:
            current_pipeline = self.items[0].name
            # clear current pipeline stack
            self.items = []
            msg = "Pipeline {} depth exceeds limitation. Max depth: {}"
            raise UserErrorException(
                message=msg.format(current_pipeline, _BUILDER_STACK_MAX_DEPTH),
                no_personal_data_message=msg.format("[current_pipeline]", _BUILDER_STACK_MAX_DEPTH),
            )

    def is_empty(self) -> bool:
        return len(self.items) == 0

    def size(self) -> int:
        return len(self.items)


# This collection is used to record pipeline component builders in current call stack
_definition_builder_stack = _PipelineComponentBuilderStack()


def _is_inside_dsl_pipeline_func() -> bool:
    """Checks whether executing within a dsl pipeline func

    :return: True if is inside DSL pipeline func.
    :rtype: bool
    """
    return _definition_builder_stack.size() > 0


def _add_component_to_current_definition_builder(component: Union[BaseNode, AutoMLJob]) -> None:
    if _is_inside_dsl_pipeline_func():
        builder = _definition_builder_stack.top()
        if builder is not None:
            builder.add_node(component)


class PipelineComponentBuilder:
    # map from python built-in type to component type
    # pylint: disable=too-many-instance-attributes
    DEFAULT_DATA_TYPE_MAPPING = {
        "float": "number",
        "int": "integer",
        "bool": "boolean",
        "str": "string",
    }
    DEFAULT_OUTPUT_NAME = "output"

    def __init__(
        self,
        func: Callable,
        name: Optional[str] = None,
        version: Optional[str] = None,
        display_name: Optional[str] = None,
        description: Optional[str] = None,
        default_datastore: Any = None,
        tags: Optional[Union[Dict[str, str], str]] = None,
        source_path: Optional[str] = None,
        non_pipeline_inputs: Optional[List] = None,
    ):
        self.func = func
        name = name if name is not None else func.__name__
        display_name = display_name if display_name else name
        description = description if description else func.__doc__
        self._args_description = parse_args_description_from_docstring(func.__doc__)
        # List of nodes, order by it's creation order in pipeline.
        self.nodes: List = []
        self.non_pipeline_parameter_names = non_pipeline_inputs or []
        # A dict of inputs name to InputDefinition.
        # TODO: infer pipeline component input meta from assignment
        self.inputs = self._build_inputs(func)
        self.output_annotation = self._get_output_annotation(func)
        self._name = name
        self.version = version
        self.display_name = display_name
        self.description = description
        self.default_datastore = default_datastore
        self.tags = tags
        self.source_path = source_path

    @property
    def name(self) -> str:
        """Name of pipeline builder, it's name will be same as the pipeline definition it builds.

        :return: Pipeline builder name
        :rtype: str
        """
        return self._name

    def add_node(self, node: Union[BaseNode, AutoMLJob]) -> None:
        """Add node to pipeline builder.

        :param node: A pipeline node.
        :type node: Union[BaseNode, AutoMLJob]
        """
        self.nodes.append(node)

    def build(
        self,
        *,
        user_provided_kwargs: Optional[Dict] = None,
        non_pipeline_inputs_dict: Optional[Dict] = None,
        non_pipeline_inputs: Optional[List] = None,
    ) -> PipelineComponent:
        """Build a pipeline component from current pipeline builder.

        :keyword user_provided_kwargs: The kwargs user provided to dsl pipeline function. None if not provided.
        :keyword non_pipeline_inputs_dict: The non-pipeline input provided key-value. None if not exist.
        :keyword non_pipeline_inputs: List of non-pipeline input name. None if not exist.
        :return: The built PipelineComponent
        :rtype: PipelineComponent
        """
        if user_provided_kwargs is None:
            user_provided_kwargs = {}
        # Clear nodes as we may call build multiple times.
        self.nodes = []

        kwargs = _build_pipeline_parameter(
            func=self.func,
            user_provided_kwargs=user_provided_kwargs,
            # TODO: support result() for pipeline input inside parameter group
            group_default_kwargs=self._get_group_parameter_defaults(),
            non_pipeline_inputs=non_pipeline_inputs,
        )
        kwargs.update(non_pipeline_inputs_dict or {})

        # Use a dict to store all variables in self.func
        # We use this stack to store the dsl pipeline definition hierarchy
        _definition_builder_stack.push(self)

        try:
            outputs, _locals = get_outputs_and_locals(self.func, kwargs)
        finally:
            _definition_builder_stack.pop()

        if outputs is None:
            outputs = {}

        jobs: Dict = self._update_nodes_variable_names(_locals)
        pipeline_component = PipelineComponent(
            name=self.name,
            version=self.version,
            display_name=self.display_name,
            description=self.description,
            inputs=self.inputs,
            jobs=jobs,
            tags=self.tags,  # type: ignore[arg-type]
            source_path=self.source_path,
            _source=ComponentSource.DSL,
        )
        # TODO: Refine here. The output can not be built first then pass into pipeline component creation,
        # exception will be raised in component.build_validate_io().
        pipeline_component._outputs = self._build_pipeline_outputs(outputs)
        return pipeline_component

    def _validate_group_annotation(self, name: str, val: GroupInput) -> None:
        for k, v in val.values.items():
            if isinstance(v, GroupInput):
                self._validate_group_annotation(k, v)
            elif isinstance(v, Output):
                # TODO(2097468): automatically change it to Input when used in input annotation
                raise UserErrorException("Output annotation cannot be used in @pipeline.")
            elif isinstance(v, Input):
                if v.type not in IOConstants.PRIMITIVE_STR_2_TYPE:
                    # TODO(2097478): support port type groups
                    raise UserErrorException(f"Only primitive types can be used as input of group, got {v.type}")
            else:
                raise UserErrorException(f"Unsupported annotation type {type(v)} for group field {name}.{k}")

    def _build_inputs(self, func: Union[Callable, Type]) -> Dict:
        inputs: Dict = _get_param_with_standard_annotation(
            func, is_func=True, skip_params=self.non_pipeline_parameter_names
        )
        for k, v in inputs.items():
            if isinstance(v, GroupInput):
                self._validate_group_annotation(name=k, val=v)
            # add arg description
            if k in self._args_description:
                v["description"] = self._args_description[k]
        return inputs

    def _build_pipeline_outputs(self, outputs: typing.Dict[str, NodeOutput]) -> Dict[str, PipelineOutput]:
        """Validate if dsl.pipeline returns valid outputs and set output binding. Create PipelineOutput as pipeline's
        output definition based on node outputs from return.

        :param outputs: Outputs of pipeline
        :type outputs: Mapping[str, azure.ai.ml.Output]
        :return: The mapping of output names to PipelineOutput
        :rtype: Dict[str, PipelineOutput]
        """
        error_msg = (
            "The return type of dsl.pipeline decorated function should be a mapping from output name to "
            "azure.ai.ml.Output with owner."
        )
        if is_group(outputs):
            outputs = {key: val for key, val in outputs.__dict__.items() if val}
        if not isinstance(outputs, dict):
            raise UserErrorException(message=error_msg, no_personal_data_message=error_msg)
        output_dict = {}
        output_meta_dict = {}
        for key, value in outputs.items():
            if not isinstance(key, str) or not isinstance(value, NodeOutput) or value._owner is None:
                raise UserErrorException(message=error_msg, no_personal_data_message=error_msg)
            if value._meta is not None:
                meta = value._meta
            else:
                meta = Output(type=value.type, path=value.path, mode=value.mode, description=value.description)

            # Hack: map internal output type to pipeline output type
            def _map_internal_output_type(_meta: Output) -> str:
                """Map component output type to valid pipeline output type.

                :param _meta: The output
                :type _meta: Output
                :return: Output type
                :rtype: str
                """
                if type(_meta).__name__ != "InternalOutput":
                    return str(_meta.type)
                return str(_meta.map_pipeline_output_type())  # type: ignore[attr-defined]

            # Note: Here we set PipelineOutput as Pipeline's output definition as we need output binding.
            output_meta = Output(
                type=_map_internal_output_type(meta),  # type: ignore[arg-type]
                description=meta.description,
                mode=meta.mode,
            )
            pipeline_output = PipelineOutput(
                port_name=key,
                data=None,
                # meta is used to create pipeline component, store it here to make sure pipeline component and inner
                # node output type are consistent
                meta=output_meta,
                owner="pipeline",
                description=self._args_description.get(key, None),
                # store original node output to be able to trace back to inner node from a pipeline output builder.
                binding_output=value,
            )
            # copy node level output setting to pipeline output
            copy_output_setting(
                source=value._owner.outputs[value._port_name], target=pipeline_output  # type: ignore[arg-type]
            )

            value._owner.outputs[value._port_name]._data = pipeline_output  # type: ignore[union-attr]

            output_dict[key] = pipeline_output
            output_meta_dict[key] = output_meta._to_dict()

        self._validate_inferred_outputs(output_meta_dict, output_dict)
        return output_dict

    def _get_group_parameter_defaults(self) -> Dict:
        group_defaults = {}
        for key, val in self.inputs.items():
            if not isinstance(val, GroupInput):
                continue
            # Copy and insert top-level parameter name into group names for all items
            group_defaults[key] = copy.deepcopy(val.default)
            group_defaults[key].insert_group_name_for_items(key)
        return group_defaults

    def _update_nodes_variable_names(self, func_variables: dict) -> Dict[str, Union[BaseNode, AutoMLJob]]:
        """Update nodes list to ordered dict with variable name key and component object value.

        Variable naming priority:
             1. Specified by using xxx.name.
                 e.g.
                 module1 = module_func()
                 module1.name = "node1"     # final node name is "node1"

             2. Variable name
                 e.g.
                 my_node = module_func()     # final node name is "my_node"

             3. Anonymous node, but another node with same component.name has user-defined name
                 e.g.
                 my_node = module_func()     # final node name is "my_node"
                 module_fun()                # final node name is "my_node_1"
                 module_fun()                # final node name is "my_node_2"

             4. Anonymous node
                 e.g.
                 my_node = module_func()     # final node name is "my_node"
                 module_func_1()             # final node name is its component name

        :param func_variables: The function variables
        :type func_variables: dict
        :return: Map of variable name to component object
        :rtype: Dict[str, Union[BaseNode, AutoMLJob]]
        """

        def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]) -> Optional[Union[str, Component]]:
            # TODO(1979547): refactor this
            if isinstance(node, AutoMLJob):
                return node.name or _sanitize_python_variable_name(node.__class__.__name__)
            if isinstance(node, ControlFlowNode):
                return _sanitize_python_variable_name(node.__class__.__name__)
            return node.name or node._get_component_name()

        valid_component_ids = set(item._instance_id for item in self.nodes)
        id_name_dict = {}
        name_count_dict = {}
        compname_udfname_dict = {}
        local_names = set()
        result = OrderedDict()

        for k, v in func_variables.items():
            # TODO(1979547): refactor this
            if not isinstance(v, (BaseNode, AutoMLJob, PipelineJob, ControlFlowNode)):
                continue
            instance_id = getattr(v, "_instance_id", None)
            if instance_id not in valid_component_ids:
                continue
            name = getattr(v, "name", None) or k
            # for node name _, treat it as anonymous node with name unset
            if name == "_":
                continue

            # User defined name must be valid python identifier
            if not is_valid_node_name(name):
                raise UserErrorException(
                    f"Invalid node name found: {name!r}. Node name must start with a lower letter or underscore, "
                    "and can only contain lower letters, numbers and underscore."
                )

            # Raise error when setting a name that already exists, likely conflict with a variable name
            if name in local_names and instance_id not in id_name_dict:
                raise UserErrorException(
                    f"Duplicate node name found in pipeline: {self.name!r}, "
                    f"node name: {name!r}. Duplicate check is case-insensitive."
                )
            local_names.add(name)
            id_name_dict[v._instance_id] = name  # type: ignore[union-attr]
            name_count_dict[name] = 1

        # Find the last user-defined name for the same type of components
        for node in self.nodes:
            _id = node._instance_id
            if _id in id_name_dict:
                compname_udfname_dict[_get_name_or_component_name(node)] = id_name_dict[_id]

        # Refine and fill default name
        # If component name is same, append '_{count}' suffix
        for node in self.nodes:
            _id = node._instance_id
            if _id not in id_name_dict:
                target_name = _get_name_or_component_name(node)
                if node.name is None and target_name in compname_udfname_dict:
                    target_name = compname_udfname_dict[target_name]
                if target_name not in name_count_dict:
                    name_count_dict[target_name] = 0
                name_count_dict[target_name] += 1
                suffix = "" if name_count_dict[target_name] == 1 else f"_{name_count_dict[target_name] - 1}"
                id_name_dict[_id] = f"{_sanitize_python_variable_name(str(target_name))}{suffix}"
            final_name = id_name_dict[_id]
            node.name = final_name
            result[final_name] = node

            # Validate IO name of node with correct node name, and log warning if there is keyword.
            self._validate_keyword_in_node_io(node)
        return result

    def _update_inputs(self, pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]) -> None:
        """Update the pipeline inputs by the dict.

        :param pipeline_inputs: The pipeline inputs
        :type pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]
        """
        for input_name, value in pipeline_inputs.items():
            anno: Any = None
            if input_name not in self.inputs:
                if isinstance(value, PipelineInput):
                    value = value._data
                if isinstance(value, Input):
                    anno = copy.copy(value)
                elif isinstance(value, NodeOutput):
                    anno = Input(type=value.type)
                else:
                    anno = _get_annotation_by_value(value)
                anno.name = input_name
                anno.description = self._args_description.get(input_name)
                self.inputs[input_name] = anno

    @classmethod
    def _get_output_annotation(cls, func: Callable) -> Dict[str, Dict]:
        """Get the output annotation of the function, validate & refine it.

        :param func: The function to retrieve output annotations from
        :type func: Callable
        :return: A dict of output annotations
        :rtype: Dict[str, Dict]
        """
        return_annotation = inspect.signature(func).return_annotation

        if is_group(return_annotation):
            outputs = _get_param_with_standard_annotation(return_annotation, is_func=False)
        elif isinstance(return_annotation, Output):
            outputs = {cls.DEFAULT_OUTPUT_NAME: return_annotation}
        else:
            # skip if return annotation is not group or output
            return {}

        output_annotations = {}
        for key, val in outputs.items():
            if isinstance(val, GroupInput):
                raise UserErrorException(message="Nested group annotation is not supported in pipeline output.")
            # normalize annotation since currently annotation in @group will be converted to Input
            if isinstance(val, Input):
                val = Output(type=val.type)
            if not isinstance(val, Output):
                raise UserErrorException(
                    message="Invalid output annotation. "
                    f"Only Output annotation in return annotation is supported. Got {type(val)}."
                )
            output_annotations[key] = val._to_dict()
        return output_annotations

    def _validate_inferred_outputs(self, output_meta_dict: dict, output_dict: Dict[str, PipelineOutput]) -> None:
        """Validate inferred output dict against annotation.

        :param output_meta_dict: The output meta dict
        :type output_meta_dict: dict
        :param output_dict: The output dict
        :type output_dict: Dict[str, PipelineOutput]
        """
        if not self.output_annotation:
            return
        error_prefix = "Unmatched outputs between actual pipeline output and output in annotation"
        if output_meta_dict.keys() != self.output_annotation.keys():
            raise UserErrorException(
                "{}: actual pipeline component outputs: {}, annotation outputs: {}".format(
                    error_prefix, output_meta_dict.keys(), self.output_annotation.keys()
                )
            )

        unmatched_outputs = []
        for key, actual_output in output_meta_dict.items():
            expected_output = self.output_annotation[key]
            actual_output.pop("description", None)
            expected_description = expected_output.pop("description", None)
            # skip comparing mode since when component's from remote, output mode is not available
            actual_output.pop("mode", None)
            expected_mode = expected_output.pop("mode", None)
            if expected_output != actual_output:
                unmatched_outputs.append(
                    f"{key}: pipeline component output: {actual_output} != annotation output {expected_output}"
                )
            res = output_dict[key]._meta
            if expected_description:
                if res is not None:
                    res.description = expected_description
                # also copy the description to pipeline job
                output_dict[key].description = expected_description
            if expected_mode:
                if res is not None:
                    res.mode = expected_mode
                # also copy the mode to pipeline job
                output_dict[key].mode = expected_mode

        if unmatched_outputs:
            raise UserErrorException(f"{error_prefix}: {unmatched_outputs}")

    @staticmethod
    def _validate_keyword_in_node_io(node: Union[BaseNode, AutoMLJob]) -> None:
        if has_attr_safe(node, "inputs"):
            for input_name in set(node.inputs) & COMPONENT_IO_KEYWORDS:  # type: ignore[arg-type]
                module_logger.warning(
                    'Reserved word "%s" is used as input name in node "%s", '
                    "can only be accessed with '%s.inputs[\"%s\"]'",
                    input_name,
                    node.name,
                    node.name,
                    input_name,
                )
        if has_attr_safe(node, "outputs"):
            for output_name in set(node.outputs) & COMPONENT_IO_KEYWORDS:  # type: ignore[arg-type]
                module_logger.warning(
                    'Reserved word "%s" is used as output name in node "%s", '
                    "can only be accessed with '%s.outputs[\"%s\"]'",
                    output_name,
                    node.name,
                    node.name,
                    output_name,
                )


def _build_pipeline_parameter(
    func: Optional[Callable],
    *,
    user_provided_kwargs: Dict,
    group_default_kwargs: Optional[Dict] = None,
    non_pipeline_inputs: Optional[List] = None,
) -> Dict:
    # Pass group defaults into kwargs to support group.item can be used even if no default on function.
    # example:
    # @group
    # class Group:
    #   key = 'val'
    #
    # @pipeline
    # def pipeline_func(param: Group):
    #   component_func(input=param.key)  <--- param.key should be val.

    # transform kwargs
    transformed_kwargs, non_pipeline_inputs = {}, non_pipeline_inputs or []
    if group_default_kwargs:
        transformed_kwargs.update(
            {
                key: _wrap_pipeline_parameter(key, default_value=value, actual_value=value)
                for key, value in group_default_kwargs.items()
                if key not in non_pipeline_inputs
            }
        )

    def all_params(parameters: Any) -> Generator:
        yield from parameters.values()

    if func is None:
        return transformed_kwargs

    parameters = all_params(signature(func).parameters)
    # transform default values
    for left_args in parameters:
        if (
            left_args.name not in transformed_kwargs
            and left_args.kind != Parameter.VAR_KEYWORD
            and left_args.name not in non_pipeline_inputs
        ):
            default_value = left_args.default if left_args.default is not Parameter.empty else None
            actual_value = user_provided_kwargs.get(left_args.name)
            transformed_kwargs[left_args.name] = _wrap_pipeline_parameter(
                key=left_args.name, default_value=default_value, actual_value=actual_value
            )
    # Add variable kwargs to transformed_kwargs.
    for key, value in user_provided_kwargs.items():
        if key not in transformed_kwargs:
            transformed_kwargs[key] = _wrap_pipeline_parameter(key=key, default_value=None, actual_value=value)
    return transformed_kwargs


def _wrap_pipeline_parameter(
    key: str, default_value: Any, actual_value: Any, group_names: Optional[List[str]] = None
) -> Union[_GroupAttrDict, PipelineInput]:
    # Append parameter path in group
    group_names = [*group_names] if group_names else []
    if isinstance(default_value, _GroupAttrDict):
        group_names.append(key)
        return _GroupAttrDict(
            {
                k: _wrap_pipeline_parameter(k, default_value=v, actual_value=v, group_names=group_names)
                for k, v in default_value.items()
            }
        )
    # Note: this PipelineInput object is built to mark input as a data binding.
    # It only exists in dsl.pipeline function execution time and won't store in pipeline job or pipeline component.
    return PipelineInput(name=key, meta=None, default_data=default_value, data=actual_value, group_names=group_names)