about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/__init__.py5
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_attr_dict.py161
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_component_translatable.py412
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/__init__.py21
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/attr_dict.py170
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/base.py848
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/mixin.py623
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_load_component.py313
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_expression.py662
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py182
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/data/expression_component_template.yml16
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job.py711
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job_settings.py75
13 files changed, 4199 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/__init__.py
new file mode 100644
index 00000000..fdf8caba
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/__init__.py
@@ -0,0 +1,5 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_attr_dict.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_attr_dict.py
new file mode 100644
index 00000000..cf8d92be
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_attr_dict.py
@@ -0,0 +1,161 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import logging
+from abc import ABC
+from typing import Any, Dict, Generic, List, Optional, TypeVar
+
+K = TypeVar("K")
+V = TypeVar("V")
+
+
+class _AttrDict(Generic[K, V], Dict, ABC):
+    """This class is used for accessing values with instance.some_key. It supports the following scenarios:
+
+    1. Setting arbitrary attribute, eg: obj.resource_layout.node_count = 2
+      1.1 Setting same nested filed twice will return same object, eg:
+              obj.resource_layout.node_count = 2
+              obj.resource_layout.process_count_per_node = 2
+              obj.resource_layout will be {"node_count": 2, "process_count_per_node": 2}
+      1.2 Only public attribute is supported, eg: obj._resource_layout._node_count = 2 will raise AttributeError
+      1.3 All set attribute can be recorded, eg:
+              obj.target = "aml"
+              obj.resource_layout.process_count_per_node = 2
+              obj.get_attr() will return {"target": "aml", "resource_layout": {"process_count_per_node": 2}}
+    2. Getting arbitrary attribute, getting non-exist attribute will return an empty dict.
+    3. Calling arbitrary methods is not allowed, eg: obj.resource_layout() should raise AttributeError
+    """
+
+    def __init__(self, allowed_keys: Optional[Dict] = None, **kwargs: Any):
+        """Initialize a attribute dictionary.
+
+        :param allowed_keys: A dictionary of keys that allowed to set as arbitrary attributes. None means all keys can
+            be set as arbitrary attributes.
+
+        :type dict
+        :param kwargs: A dictionary of additional configuration parameters.
+        :type kwargs: dict
+        """
+        super(_AttrDict, self).__init__(**kwargs)
+        if allowed_keys is None:
+            # None allowed_keys means no restriction on keys can be set for _AttrDict
+            self._allowed_keys = {}
+            self._key_restriction = False
+        else:
+            # Otherwise use allowed_keys to restrict keys can be set for _AttrDict
+            self._allowed_keys = dict(allowed_keys)
+            self._key_restriction = True
+        self._logger = logging.getLogger("attr_dict")
+
+    def _initializing(self) -> bool:
+        # use this to indicate ongoing init process, sub class need to make sure this return True during init process.
+        return False
+
+    def _get_attrs(self) -> dict:
+        """Get all arbitrary attributes which has been set, empty values are excluded.
+
+        :return: A dict which contains all arbitrary attributes set by user.
+        :rtype: dict
+        """
+
+        # TODO: check this
+        def remove_empty_values(data: Dict) -> Dict:
+            if not isinstance(data, dict):
+                return data
+            # skip empty dicts as default value of _AttrDict is empty dict
+            return {k: remove_empty_values(v) for k, v in data.items() if v or not isinstance(v, dict)}
+
+        return remove_empty_values(self)
+
+    def _is_arbitrary_attr(self, attr_name: str) -> bool:
+        """Checks if a given attribute name should be treat as arbitrary attribute.
+
+        Attributes inside _AttrDict can be non-arbitrary attribute or arbitrary attribute.
+        Non-arbitrary attributes are normal attributes like other object which stores in self.__dict__.
+        Arbitrary attributes are attributes stored in the dictionary it self, what makes it special it it's value
+        can be an instance of _AttrDict
+        Take `obj = _AttrDict(allowed_keys={"resource_layout": {"node_count": None}})` as an example.
+        `obj.some_key` is accessing non-arbitrary attribute.
+        `obj.resource_layout` is accessing arbitrary attribute, user can use `obj.resource_layout.node_count = 1` to
+        assign value to it.
+
+        :param attr_name: Attribute name
+        :type attr_name: str
+        :return: If the given attribute name should be treated as arbitrary attribute.
+        :rtype: bool
+        """
+        # Internal attribute won't be set as arbitrary attribute.
+        if attr_name.startswith("_"):
+            return False
+        # All attributes set in __init__ won't be set as arbitrary attribute
+        if self._initializing():
+            return False
+        # If there's key restriction, only keys in it can be set as arbitrary attribute.
+        if self._key_restriction and attr_name not in self._allowed_keys:
+            return False
+        # Attributes already in attribute dict will not be set as arbitrary attribute.
+        try:
+            self.__getattribute__(attr_name)
+        except AttributeError:
+            return True
+        return False
+
+    def __getattr__(self, key: Any) -> Any:
+        if not self._is_arbitrary_attr(key):
+            return super().__getattribute__(key)
+        self._logger.debug("getting %s", key)
+        try:
+            return super().__getitem__(key)
+        except KeyError:
+            allowed_keys = self._allowed_keys.get(key, None) if self._key_restriction else None
+            result: Any = _AttrDict(allowed_keys=allowed_keys)
+            self.__setattr__(key, result)
+            return result
+
+    def __setattr__(self, key: Any, value: V) -> None:
+        if not self._is_arbitrary_attr(key):
+            super().__setattr__(key, value)
+        else:
+            self._logger.debug("setting %s to %s", key, value)
+            super().__setitem__(key, value)
+
+    def __setitem__(self, key: Any, value: V) -> None:
+        self.__setattr__(key, value)
+
+    def __getitem__(self, item: V) -> Any:
+        # support attr_dict[item] since dumping it in marshmallow requires this.
+        return self.__getattr__(item)
+
+    def __dir__(self) -> List:
+        # For Jupyter Notebook auto-completion
+        return list(super().__dir__()) + list(self.keys())
+
+
+def has_attr_safe(obj: Any, attr: Any) -> bool:
+    if isinstance(obj, _AttrDict):
+        has_attr = not obj._is_arbitrary_attr(attr)
+    elif isinstance(obj, dict):
+        return attr in obj
+    else:
+        has_attr = hasattr(obj, attr)
+    return has_attr
+
+
+def try_get_non_arbitrary_attr(obj: Any, attr: str) -> Optional[Any]:
+    """Try to get non-arbitrary attribute for potential attribute dict.
+
+    Will not create target attribute if it is an arbitrary attribute in _AttrDict.
+
+    :param obj: The obj
+    :type obj: Any
+    :param attr: The attribute name
+    :type attr: str
+    :return: obj.attr
+    :rtype: Any
+    """
+    if has_attr_safe(obj, attr):
+        return obj[attr] if isinstance(obj, dict) else getattr(obj, attr)
+    return None
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_component_translatable.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_component_translatable.py
new file mode 100644
index 00000000..22be939d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_component_translatable.py
@@ -0,0 +1,412 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, redefined-builtin
+# disable redefined-builtin to use input as argument name
+import re
+from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
+
+from pydash import get
+
+from azure.ai.ml._utils.utils import is_data_binding_expression
+from azure.ai.ml.constants._common import AssetTypes
+from azure.ai.ml.constants._component import ComponentJobConstants
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.pipeline._io import PipelineInput, PipelineOutput
+from azure.ai.ml.entities._job.sweep.search_space import Choice, Randint, SweepDistribution
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, JobException
+
+# avoid circular import error
+if TYPE_CHECKING:
+    from azure.ai.ml.entities._builders import BaseNode
+    from azure.ai.ml.entities._component.component import Component
+
+
+class ComponentTranslatableMixin:
+    _PYTHON_SDK_TYPE_MAPPING = {
+        float: "number",
+        int: "integer",
+        bool: "boolean",
+        str: "string",
+    }
+
+    @classmethod
+    def _find_source_from_parent_inputs(cls, input: str, pipeline_job_inputs: dict) -> Tuple[str, Optional[str]]:
+        """Find source type and mode of input/output from parent input.
+
+        :param input: The input name
+        :type input: str
+        :param pipeline_job_inputs: The pipeline job inputs
+        :type pipeline_job_inputs: dict
+        :return: A 2-tuple of the type and the mode
+        :rtype: Tuple[str, Optional[str]]
+        """
+        _input_name = input.split(".")[2][:-2]
+        if _input_name not in pipeline_job_inputs.keys():
+            msg = "Failed to find top level definition for input binding {}."
+            raise JobException(
+                message=msg.format(input),
+                no_personal_data_message=msg.format("[input]"),
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        input_data = pipeline_job_inputs[_input_name]
+        input_type = type(input_data)
+        if input_type in cls._PYTHON_SDK_TYPE_MAPPING:
+            return cls._PYTHON_SDK_TYPE_MAPPING[input_type], None
+        return getattr(input_data, "type", AssetTypes.URI_FOLDER), getattr(input_data, "mode", None)
+
+    @classmethod
+    def _find_source_from_parent_outputs(cls, input: str, pipeline_job_outputs: dict) -> Tuple[str, Optional[str]]:
+        """Find source type and mode of input/output from parent output.
+
+        :param input: The input name
+        :type input: str
+        :param pipeline_job_outputs: The pipeline job outputs
+        :type pipeline_job_outputs: dict
+        :return: A 2-tuple of the type and the mode
+        :rtype: Tuple[str, Optional[str]]
+        """
+        _output_name = input.split(".")[2][:-2]
+        if _output_name not in pipeline_job_outputs.keys():
+            msg = "Failed to find top level definition for output binding {}."
+            raise JobException(
+                message=msg.format(input),
+                no_personal_data_message=msg.format("[input]"),
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        output_data = pipeline_job_outputs[_output_name]
+        output_type = type(output_data)
+        if output_type in cls._PYTHON_SDK_TYPE_MAPPING:
+            return cls._PYTHON_SDK_TYPE_MAPPING[output_type], None
+        if isinstance(output_data, dict):
+            if "type" in output_data:
+                output_data_type = output_data["type"]
+            else:
+                output_data_type = AssetTypes.URI_FOLDER
+            if "mode" in output_data:
+                output_data_mode = output_data["mode"]
+            else:
+                output_data_mode = None
+            return output_data_type, output_data_mode
+        return getattr(output_data, "type", AssetTypes.URI_FOLDER), getattr(output_data, "mode", None)
+
+    @classmethod
+    def _find_source_from_other_jobs(
+        cls, input: str, jobs_dict: dict, pipeline_job_dict: dict
+    ) -> Tuple[str, Optional[str]]:
+        """Find source type and mode of input/output from other job.
+
+        :param input: The input name
+        :type input: str
+        :param jobs_dict: The job dict
+        :type jobs_dict:
+        :param pipeline_job_dict: The pipeline job dict
+        :type pipeline_job_dict: dict
+        :return: A 2-tuple of the type and the mode
+        :rtype: Tuple[str, Optional[str]]
+        """
+        from azure.ai.ml.entities import CommandJob
+        from azure.ai.ml.entities._builders import BaseNode
+        from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+        from azure.ai.ml.parallel import ParallelJob
+
+        _input_regex = r"\${{parent.jobs.([^.]+).([^.]+).([^.]+)}}"
+        m = re.match(_input_regex, input)
+        if m is None:
+            msg = "Failed to find top level definition for job binding {}."
+            raise JobException(
+                message=msg.format(input),
+                no_personal_data_message=msg.format("[input]"),
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        _input_job_name, _io_type, _name = m.groups()
+        _input_job = jobs_dict[_input_job_name]
+
+        # we only support input of one job is from output of another output, but input mode should be decoupled with
+        # output mode, so we always return None source_mode
+        source_mode = None
+        if isinstance(_input_job, BaseNode):
+            # If source is base node, get type from io builder
+            _source = _input_job[_io_type][_name]
+            try:
+                source_type = _source.type
+                # Todo: get component type for registered component, and no need following codes
+                # source_type is None means _input_job's component is registered component which results in its
+                # input/output type is None.
+                if source_type is None:
+                    if _source._data is None:
+                        # return default type if _input_job's output data is None
+                        source_type = AssetTypes.URI_FOLDER
+                    elif isinstance(_source._data, Output):
+                        # if _input_job data is a Output object and we return its type.
+                        source_type = _source._data.type
+                    else:
+                        #  otherwise _input_job's input/output is bound to pipeline input/output, we continue
+                        #  infer the type according to _source._data. Will return corresponding pipeline
+                        #  input/output type because we didn't get the component.
+                        source_type, _ = cls._find_source_input_output_type(_source._data, pipeline_job_dict)
+                return source_type, source_mode
+            except AttributeError as e:
+                msg = "Failed to get referenced component type {}."
+                raise JobException(
+                    message=msg.format(_input_regex),
+                    no_personal_data_message=msg.format("[_input_regex]"),
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.USER_ERROR,
+                ) from e
+        if isinstance(_input_job, (CommandJob, ParallelJob)):
+            # If source has not parsed to Command yet, infer type
+            _source = get(_input_job, f"{_io_type}.{_name}")
+            if isinstance(_source, str):
+                source_type, _ = cls._find_source_input_output_type(_source, pipeline_job_dict)
+                return source_type, source_mode
+            return getattr(_source, "type", AssetTypes.URI_FOLDER), source_mode
+        if isinstance(_input_job, AutoMLJob):
+            # If source is AutoMLJob, only outputs is supported
+            if _io_type != "outputs":
+                msg = f"Only binding to AutoMLJob output is supported, currently got {_io_type}"
+                raise JobException(
+                    message=msg,
+                    no_personal_data_message=msg,
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+            # AutoMLJob's output type can only be MLTABLE
+            return AssetTypes.MLTABLE, source_mode
+        msg = f"Unknown referenced source job type: {type(_input_job)}."
+        raise JobException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.PIPELINE,
+            error_category=ErrorCategory.USER_ERROR,
+        )
+
+    @classmethod
+    def _find_source_input_output_type(cls, input: str, pipeline_job_dict: dict) -> Tuple[str, Optional[str]]:
+        """Find source type and mode of input/output.
+
+        :param input: The input binding
+        :type input: str
+        :param pipeline_job_dict: The pipeline job dict
+        :type pipeline_job_dict: dict
+        :return: A 2-tuple of the type and the mode
+        :rtype: Tuple[str, Optional[str]]
+        """
+        pipeline_job_inputs = pipeline_job_dict.get("inputs", {})
+        pipeline_job_outputs = pipeline_job_dict.get("outputs", {})
+        jobs_dict = pipeline_job_dict.get("jobs", {})
+        if is_data_binding_expression(input, ["parent", "inputs"]):
+            return cls._find_source_from_parent_inputs(input, pipeline_job_inputs)
+        if is_data_binding_expression(input, ["parent", "outputs"]):
+            return cls._find_source_from_parent_outputs(input, pipeline_job_outputs)
+        if is_data_binding_expression(input, ["parent", "jobs"]):
+            try:
+                return cls._find_source_from_other_jobs(input, jobs_dict, pipeline_job_dict)
+            except JobException as e:
+                raise e
+            except Exception as e:
+                msg = "Failed to find referenced source for input binding {}"
+                raise JobException(
+                    message=msg.format(input),
+                    no_personal_data_message=msg.format("[input]"),
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.SYSTEM_ERROR,
+                ) from e
+        else:
+            msg = "Job input in a pipeline can bind only to a job output or a pipeline input"
+            raise JobException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+    @classmethod
+    def _to_input(
+        cls,  # pylint: disable=unused-argument
+        input: Union[Input, str, bool, int, float],
+        pipeline_job_dict: Optional[dict] = None,
+        **kwargs: Any,
+    ) -> Input:
+        """Convert a single job input value to component input.
+
+        :param input: The input
+        :type input: Union[Input, str, bool, int, float]
+        :param pipeline_job_dict: The pipeline job dict
+        :type pipeline_job_dict: Optional[dict]
+        :return: The Component Input
+        :rtype: Input
+        """
+        pipeline_job_dict = pipeline_job_dict or {}
+        input_variable: Dict = {}
+
+        if isinstance(input, str) and bool(re.search(ComponentJobConstants.INPUT_PATTERN, input)):
+            # handle input bindings
+            input_variable["type"], input_variable["mode"] = cls._find_source_input_output_type(
+                input, pipeline_job_dict
+            )
+
+        elif isinstance(input, Input):
+            input_variable = input._to_dict()
+        elif isinstance(input, SweepDistribution):
+            if isinstance(input, Choice):
+                if input.values is not None:
+                    input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(input.values[0])]
+            elif isinstance(input, Randint):
+                input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[int]
+            else:
+                input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[float]
+
+            input_variable["optional"] = False
+        elif type(input) in cls._PYTHON_SDK_TYPE_MAPPING:
+            input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(input)]
+            input_variable["default"] = input
+        elif isinstance(input, PipelineInput):
+            # Infer input type from input data
+            input_variable = input._to_input()._to_dict()
+        else:
+            msg = "'{}' is not supported as component input, supported types are '{}'.".format(
+                type(input), cls._PYTHON_SDK_TYPE_MAPPING.keys()
+            )
+            raise JobException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        return Input(**input_variable)
+
+    @classmethod
+    def _to_input_builder_function(cls, input: Union[Dict, SweepDistribution, Input, str, bool, int, float]) -> Input:
+        input_variable = {}
+
+        if isinstance(input, Input):
+            input_variable = input._to_dict()
+        elif isinstance(input, SweepDistribution):
+            if isinstance(input, Choice):
+                if input.values is not None:
+                    input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(input.values[0])]
+            elif isinstance(input, Randint):
+                input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[int]
+            else:
+                input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[float]
+
+            input_variable["optional"] = False
+        else:
+            input_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(input)]
+            input_variable["default"] = input
+        return Input(**input_variable)
+
+    @classmethod
+    def _to_output(
+        cls,  # pylint: disable=unused-argument
+        output: Optional[Union[Output, Dict, str, bool, int, float]],
+        pipeline_job_dict: Optional[dict] = None,
+        **kwargs: Any,
+    ) -> Output:
+        """Translate output value to Output and infer component output type
+        from linked pipeline output, its original type or default type.
+
+        :param output: The output
+        :type output: Union[Output, str, bool, int, float]
+        :param pipeline_job_dict: The pipeline job dict
+        :type pipeline_job_dict: Optional[dict]
+        :return: The output object
+        :rtype: Output
+        """
+        pipeline_job_dict = pipeline_job_dict or {}
+        output_type = None
+        if not pipeline_job_dict or output is None:
+            try:
+                output_type = output.type  # type: ignore
+            except AttributeError:
+                # default to url_folder if failed to get type
+                output_type = AssetTypes.URI_FOLDER
+            output_variable = {"type": output_type}
+            return Output(**output_variable)
+        output_variable = {}
+
+        if isinstance(output, str) and bool(re.search(ComponentJobConstants.OUTPUT_PATTERN, output)):
+            # handle output bindings
+            output_variable["type"], output_variable["mode"] = cls._find_source_input_output_type(
+                output, pipeline_job_dict
+            )
+
+        elif isinstance(output, Output):
+            output_variable = output._to_dict()
+
+        elif isinstance(output, PipelineOutput):
+            output_variable = output._to_output()._to_dict()
+
+        elif type(output) in cls._PYTHON_SDK_TYPE_MAPPING:
+            output_variable["type"] = cls._PYTHON_SDK_TYPE_MAPPING[type(output)]
+            output_variable["default"] = output
+        else:
+            msg = "'{}' is not supported as component output, supported types are '{}'.".format(
+                type(output), cls._PYTHON_SDK_TYPE_MAPPING.keys()
+            )
+            raise JobException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        return Output(**output_variable)
+
+    def _to_inputs(self, inputs: Optional[Dict], **kwargs: Any) -> Dict:
+        """Translate inputs to Inputs.
+
+        :param inputs: mapping from input name to input object.
+        :type inputs: Dict[str, Union[Input, str, bool, int, float]]
+        :return: mapping from input name to translated component input.
+        :rtype: Dict[str, Input]
+        """
+        pipeline_job_dict = kwargs.get("pipeline_job_dict", {})
+        translated_component_inputs = {}
+        if inputs is not None:
+            for io_name, io_value in inputs.items():
+                translated_component_inputs[io_name] = self._to_input(io_value, pipeline_job_dict)
+        return translated_component_inputs
+
+    def _to_outputs(self, outputs: Optional[Dict], **kwargs: Any) -> Dict:
+        """Translate outputs to Outputs.
+
+        :param outputs: mapping from output name to output object.
+        :type outputs: Dict[str, Output]
+        :return: mapping from output name to translated component output.
+        :rtype: Dict[str, Output]
+        """
+        # Translate outputs to Outputs.
+        pipeline_job_dict = kwargs.get("pipeline_job_dict", {})
+        translated_component_outputs = {}
+        if outputs is not None:
+            for output_name, output_value in outputs.items():
+                translated_component_outputs[output_name] = self._to_output(output_value, pipeline_job_dict)
+        return translated_component_outputs
+
+    def _to_component(self, context: Optional[Dict] = None, **kwargs: Any) -> Union["Component", str]:
+        """Translate to Component.
+
+        :param context: The context
+        :type context: Optional[context]
+        :return: Translated Component.
+        :rtype: Component
+        """
+        # Note: Source of translated component should be same with Job
+        # And should be set after called _to_component/_to_node as job has no _source now.
+        raise NotImplementedError()
+
+    def _to_node(self, context: Optional[Dict] = None, **kwargs: Any) -> "BaseNode":
+        """Translate to pipeline node.
+
+        :param context: The context
+        :type context: Optional[context]
+        :return: Translated node.
+        :rtype: BaseNode
+        """
+        # Note: Source of translated component should be same with Job
+        # And should be set after called _to_component/_to_node as job has no _source now.
+        raise NotImplementedError()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/__init__.py
new file mode 100644
index 00000000..3ccde947
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/__init__.py
@@ -0,0 +1,21 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+"""Classes in this package converts input & output set by user to pipeline job input & output."""
+
+from .attr_dict import OutputsAttrDict, _GroupAttrDict
+from .base import InputOutputBase, NodeInput, NodeOutput, PipelineInput, PipelineOutput
+from .mixin import AutoMLNodeIOMixin, NodeWithGroupInputMixin, PipelineJobIOMixin
+
+__all__ = [
+    "PipelineOutput",
+    "PipelineInput",
+    "NodeOutput",
+    "NodeInput",
+    "InputOutputBase",
+    "OutputsAttrDict",
+    "_GroupAttrDict",
+    "NodeWithGroupInputMixin",
+    "AutoMLNodeIOMixin",
+    "PipelineJobIOMixin",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/attr_dict.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/attr_dict.py
new file mode 100644
index 00000000..0ae08bcd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/attr_dict.py
@@ -0,0 +1,170 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from enum import Enum
+from typing import Any, Dict, List, Optional, Union
+
+from azure.ai.ml.entities._assets import Data
+from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output
+from azure.ai.ml.entities._job.pipeline._attr_dict import K
+from azure.ai.ml.entities._job.pipeline._io.base import NodeInput, NodeOutput, PipelineInput
+from azure.ai.ml.exceptions import (
+    ErrorCategory,
+    ErrorTarget,
+    UnexpectedAttributeError,
+    UnexpectedKeywordError,
+    ValidationException,
+)
+
+
+class InputsAttrDict(dict):
+    def __init__(self, inputs: dict, **kwargs: Any):
+        self._validate_inputs(inputs)
+        super(InputsAttrDict, self).__init__(**inputs, **kwargs)
+
+    @classmethod
+    def _validate_inputs(cls, inputs: Any) -> None:
+        msg = "Pipeline/component input should be a \
+        azure.ai.ml.entities._job.pipeline._io.NodeInput with owner, got {}."
+        for val in inputs.values():
+            if isinstance(val, NodeInput) and val._owner is not None:  # pylint: disable=protected-access
+                continue
+            if isinstance(val, _GroupAttrDict):
+                continue
+            raise ValidationException(
+                message=msg.format(val),
+                no_personal_data_message=msg.format("[val]"),
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+    def __setattr__(
+        self,
+        key: str,
+        value: Union[int, bool, float, str, NodeOutput, PipelineInput, Input],
+    ) -> None:
+        # Extract enum value.
+        value = value.value if isinstance(value, Enum) else value
+        original_input = self.__getattr__(key)  # Note that an exception will be raised if the keyword is invalid.
+        if isinstance(original_input, _GroupAttrDict) or isinstance(value, _GroupAttrDict):
+            # Set the value directly if is parameter group.
+            self._set_group_with_type_check(key, GroupInput.custom_class_value_to_attr_dict(value))
+            return
+        original_input._data = original_input._build_data(value)
+
+    def _set_group_with_type_check(self, key: Any, value: Any) -> None:
+        msg = "{!r} is expected to be a parameter group, but got {}."
+        if not isinstance(value, _GroupAttrDict):
+            raise ValidationException(
+                message=msg.format(key, type(value)),
+                no_personal_data_message=msg.format("[key]", "[value_type]"),
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        self.__setitem__(key, GroupInput.custom_class_value_to_attr_dict(value))
+
+    def __getattr__(self, item: Any) -> NodeInput:
+        res: NodeInput = self.__getitem__(item)
+        return res
+
+
+class _GroupAttrDict(InputsAttrDict):
+    """This class is used for accessing values with instance.some_key."""
+
+    @classmethod
+    def _validate_inputs(cls, inputs: Any) -> None:
+        msg = "Pipeline/component input should be a azure.ai.ml.entities._job.pipeline._io.NodeInput, got {}."
+        for val in inputs.values():
+            if isinstance(val, NodeInput) and val._owner is not None:  # pylint: disable=protected-access
+                continue
+            if isinstance(val, _GroupAttrDict):
+                continue
+            # Allow PipelineInput as Group may appear at top level pipeline input.
+            if isinstance(val, PipelineInput):
+                continue
+            raise ValidationException(
+                message=msg.format(val),
+                no_personal_data_message=msg.format("[val]"),
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+    def __getattr__(self, name: K) -> Any:
+        if name not in self:
+            raise UnexpectedAttributeError(keyword=name, keywords=list(self))
+        return super().__getitem__(name)
+
+    def __getitem__(self, item: K) -> Any:
+        # We raise this exception instead of KeyError
+        if item not in self:
+            raise UnexpectedKeywordError(func_name="ParameterGroup", keyword=item, keywords=list(self))
+        return super().__getitem__(item)
+
+    # For Jupyter Notebook auto-completion
+    def __dir__(self) -> List:
+        return list(super().__dir__()) + list(self.keys())
+
+    def flatten(self, group_parameter_name: Optional[str]) -> Dict:
+        # Return the flattened result of self
+
+        group_parameter_name = group_parameter_name if group_parameter_name else ""
+        flattened_parameters = {}
+        msg = "'%s' in parameter group should be a azure.ai.ml.entities._job._io.NodeInput, got '%s'."
+        for k, v in self.items():
+            flattened_name = ".".join([group_parameter_name, k])
+            if isinstance(v, _GroupAttrDict):
+                flattened_parameters.update(v.flatten(flattened_name))
+            elif isinstance(v, NodeInput):
+                flattened_parameters[flattened_name] = v._to_job_input()  # pylint: disable=protected-access
+            else:
+                raise ValidationException(
+                    message=msg % (flattened_name, type(v)),
+                    no_personal_data_message=msg % ("name", "type"),
+                    target=ErrorTarget.PIPELINE,
+                )
+        return flattened_parameters
+
+    def insert_group_name_for_items(self, group_name: Any) -> None:
+        # Insert one group name for all items.
+        for v in self.values():
+            if isinstance(v, _GroupAttrDict):
+                v.insert_group_name_for_items(group_name)
+            elif isinstance(v, PipelineInput):
+                # Insert group names for pipeline input
+                v._group_names = [group_name] + v._group_names  # pylint: disable=protected-access
+
+
+class OutputsAttrDict(dict):
+    def __init__(self, outputs: dict, **kwargs: Any):
+        for val in outputs.values():
+            if not isinstance(val, NodeOutput) or val._owner is None:
+                msg = "Pipeline/component output should be a azure.ai.ml.dsl.Output with owner, got {}."
+                raise ValidationException(
+                    message=msg.format(val),
+                    no_personal_data_message=msg.format("[val]"),
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+        super(OutputsAttrDict, self).__init__(**outputs, **kwargs)
+
+    def __getattr__(self, item: Any) -> NodeOutput:
+        return self.__getitem__(item)
+
+    def __getitem__(self, item: Any) -> NodeOutput:
+        if item not in self:
+            # We raise this exception instead of KeyError as OutputsAttrDict doesn't support add new item after
+            # __init__.
+            raise UnexpectedAttributeError(keyword=item, keywords=list(self))
+        res: NodeOutput = super().__getitem__(item)
+        return res
+
+    def __setattr__(self, key: str, value: Union[Data, Output]) -> None:
+        if isinstance(value, Output):
+            mode = value.mode
+            value = Output(type=value.type, path=value.path, mode=mode, name=value.name, version=value.version)
+        original_output = self.__getattr__(key)  # Note that an exception will be raised if the keyword is invalid.
+        original_output._data = original_output._build_data(value)
+
+    def __setitem__(self, key: str, value: Output) -> None:
+        return self.__setattr__(key, value)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/base.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/base.py
new file mode 100644
index 00000000..b17972ae
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/base.py
@@ -0,0 +1,848 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import copy
+import re
+from abc import ABC, abstractmethod
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, TypeVar, Union, cast, overload
+
+from azure.ai.ml._utils.utils import is_data_binding_expression
+from azure.ai.ml.constants import AssetTypes
+from azure.ai.ml.constants._component import IOConstants
+from azure.ai.ml.entities._assets._artifacts.data import Data
+from azure.ai.ml.entities._assets._artifacts.model import Model
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpressionMixin
+from azure.ai.ml.entities._util import resolve_pipeline_parameter
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, UserErrorException, ValidationException
+
+# avoid circular import error
+if TYPE_CHECKING:
+    from azure.ai.ml.entities import PipelineJob
+    from azure.ai.ml.entities._builders import BaseNode
+
+T = TypeVar("T")
+
+
+def _build_data_binding(data: Union[str, "PipelineInput", "Output"]) -> Union[str, Output]:
+    """Build input builders to data bindings.
+
+    :param data: The data to build a data binding from
+    :type data: Union[str, PipelineInput, Output]
+    :return: A data binding string if data isn't a str, otherwise data
+    :rtype: str
+    """
+    result: Union[str, Output] = ""
+
+    if isinstance(data, (InputOutputBase)):
+        # Build data binding when data is PipelineInput, Output
+        result = data._data_binding()
+    else:
+        # Otherwise just return the data
+        result = data
+    return result
+
+
+def _resolve_builders_2_data_bindings(
+    data: Union[list, dict, str, "PipelineInput", "Output"]
+) -> Union[dict, list, str, Output]:
+    """Traverse data and build input builders inside it to data bindings.
+
+    :param data: The bindings to resolve
+    :type data: Union[list, dict, str, "PipelineInput", "Output"]
+    :return:
+       * A dict if data was a dict
+       * A list if data was a list
+       * A str otherwise
+    :rtype: Union[list, dict, str]
+    """
+    if isinstance(data, dict):
+        for key, val in data.items():
+            if isinstance(val, (dict, list)):
+                data[key] = _resolve_builders_2_data_bindings(val)
+            else:
+                data[key] = _build_data_binding(val)
+        return data
+    if isinstance(data, list):
+        resolved_data = []
+        for val in data:
+            resolved_data.append(_resolve_builders_2_data_bindings(val))
+        return resolved_data
+    return _build_data_binding(data)
+
+
+def _data_to_input(data: Union[Data, Model]) -> Input:
+    """Convert a Data object to an Input object.
+
+    :param data: The data to convert
+    :type data: Data
+    :return: The Input object
+    :rtype: Input
+    """
+    if data.id:
+        return Input(type=data.type, path=data.id)
+    return Input(type=data.type, path=f"{data.name}:{data.version}")
+
+
+class InputOutputBase(ABC):
+    # TODO: refine this code, always use _data to store builder level settings and use _meta to store definition
+    # TODO: when _data missing, return value from _meta
+
+    def __init__(
+        self,
+        meta: Optional[Union[Input, Output]],
+        data: Optional[Union[int, bool, float, str, Input, Output, "PipelineInput"]],
+        default_data: Optional[Union[int, bool, float, str, Input, Output]] = None,
+        **kwargs: Any,
+    ):
+        """Base class of input & output.
+
+        :param meta: Metadata of this input/output, eg: type, min, max, etc.
+        :type meta: Union[Input, Output]
+        :param data: Actual value of input/output, None means un-configured data.
+        :type data: Union[None, int, bool, float, str,
+                          azure.ai.ml.Input,
+                          azure.ai.ml.Output]
+        :param default_data: default value of input/output, None means un-configured data.
+        :type default_data: Union[None, int, bool, float, str,
+                          azure.ai.ml.Input,
+                          azure.ai.ml.Output]
+        """
+        self._meta = meta
+        self._original_data = data
+        self._data: Any = self._build_data(data)
+        self._default_data = default_data
+        self._type: str = meta.type if meta is not None else kwargs.pop("type", None)
+        self._mode = self._get_mode(original_data=data, data=self._data, kwargs=kwargs)
+        self._description = (
+            self._data.description
+            if self._data is not None and hasattr(self._data, "description") and self._data.description
+            else kwargs.pop("description", None)
+        )
+        # TODO: remove this
+        self._attribute_map: Dict = {}
+        self._name: Optional[str] = ""
+        self._version: Optional[str] = ""
+        super(InputOutputBase, self).__init__(**kwargs)
+
+    @abstractmethod
+    def _build_data(self, data: T) -> Union[T, str, Input, "InputOutputBase"]:
+        """Validate if data matches type and translate it to Input/Output acceptable type.
+
+        :param data: The data
+        :type data: T
+        :return: The built data
+        :rtype: Union[T, str, Input, InputOutputBase]
+        """
+
+    @abstractmethod
+    def _build_default_data(self) -> None:
+        """Build default data when data not configured."""
+
+    @property
+    def type(self) -> str:
+        """Type of input/output.
+
+        :return: The type
+        :rtype: str
+        """
+        return self._type
+
+    @type.setter
+    def type(self, type: Any) -> None:  # pylint: disable=redefined-builtin
+        # For un-configured input/output, we build a default data entry for them.
+        self._build_default_data()
+        self._type = type
+        if isinstance(self._data, (Input, Output)):
+            self._data.type = type
+        elif self._data is not None and not isinstance(
+            self._data, (int, float, str)
+        ):  # when type of self._data is InputOutputBase or its child class
+            self._data._type = type
+
+    @property
+    def mode(self) -> Optional[str]:
+        return self._mode
+
+    @mode.setter
+    def mode(self, mode: Optional[str]) -> None:
+        # For un-configured input/output, we build a default data entry for them.
+        self._build_default_data()
+        self._mode = mode
+        if isinstance(self._data, (Input, Output)):
+            self._data.mode = mode
+        elif self._data is not None and not isinstance(self._data, (int, float, str)):
+            self._data._mode = mode
+
+    @property
+    def description(self) -> Any:
+        return self._description
+
+    @description.setter
+    def description(self, description: str) -> None:
+        # For un-configured input/output, we build a default data entry for them.
+        self._build_default_data()
+        self._description = description
+        if isinstance(self._data, (Input, Output)):
+            self._data.description = description
+        elif self._data is not None and not isinstance(self._data, (int, float, str)):
+            self._data._description = description
+
+    @property
+    def path(self) -> Optional[str]:
+        # This property is introduced for static intellisense.
+        if hasattr(self._data, "path"):
+            if self._data is not None and not isinstance(self._data, (int, float, str)):
+                res: Optional[str] = self._data.path
+                return res
+        msg = f"{type(self._data)} does not have path."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.PIPELINE,
+            error_category=ErrorCategory.USER_ERROR,
+        )
+
+    @path.setter
+    def path(self, path: str) -> None:
+        # For un-configured input/output, we build a default data entry for them.
+        self._build_default_data()
+        if hasattr(self._data, "path"):
+            if self._data is not None and not isinstance(self._data, (int, float, str)):
+                self._data.path = path
+        else:
+            msg = f"{type(self._data)} does not support setting path."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+    def _data_binding(self) -> str:
+        """Return data binding string representation for this input/output.
+
+        :return: The data binding string
+        :rtype: str
+        """
+        raise NotImplementedError()
+
+    # Why did we have this function? It prevents the DictMixin from being applied.
+    # Unclear if we explicitly do NOT want the mapping protocol to be applied to this, or it this was just
+    # confirmation that it didn't at the time.
+    def keys(self) -> None:
+        # This property is introduced to raise catchable Exception in marshmallow mapping validation trial.
+        raise TypeError(f"'{type(self).__name__}' object is not a mapping")
+
+    def __str__(self) -> str:
+        try:
+            return self._data_binding()
+        except AttributeError:
+            return super(InputOutputBase, self).__str__()
+
+    def __hash__(self) -> int:
+        return id(self)
+
+    @classmethod
+    def _get_mode(
+        cls,
+        original_data: Optional[Union[int, bool, float, str, Input, Output, "PipelineInput"]],
+        data: Optional[Union[int, bool, float, str, Input, Output]],
+        kwargs: dict,
+    ) -> Optional[str]:
+        """Get mode of this input/output builder.
+
+        :param original_data: Original value of input/output.
+        :type original_data: Union[None, int, bool, float, str
+                          azure.ai.ml.Input,
+                          azure.ai.ml.Output,
+                          azure.ai.ml.entities._job.pipeline._io.PipelineInput]
+        :param data: Built input/output data.
+        :type data: Union[None, int, bool, float, str
+                          azure.ai.ml.Input,
+                          azure.ai.ml.Output]
+        :param kwargs: The kwargs
+        :type kwargs: Dict
+        :return: The mode
+        :rtype: Optional[str]
+        """
+        # pipeline level inputs won't pass mode to bound node level inputs
+        if isinstance(original_data, PipelineInput):
+            return None
+        return data.mode if data is not None and hasattr(data, "mode") else kwargs.pop("mode", None)
+
+    @property
+    def _is_primitive_type(self) -> bool:
+        return self.type in IOConstants.PRIMITIVE_STR_2_TYPE
+
+
+class NodeInput(InputOutputBase):
+    """Define one input of a Component."""
+
+    def __init__(
+        self,
+        port_name: str,
+        meta: Optional[Input],
+        *,
+        data: Optional[Union[int, bool, float, str, Output, "PipelineInput", Input]] = None,
+        # TODO: Bug Item number: 2883405
+        owner: Optional[Union["BaseComponent", "PipelineJob"]] = None,  # type: ignore
+        **kwargs: Any,
+    ):
+        """Initialize an input of a component.
+
+        :param name: The name of the input.
+        :type name: str
+        :param meta: Metadata of this input, eg: type, min, max, etc.
+        :type meta: Input
+        :param data: The input data. Valid types include int, bool, float, str,
+            Output of another component or pipeline input and Input.
+            Note that the output of another component or pipeline input associated should be reachable in the scope
+            of current pipeline. Input is introduced to support case like
+            TODO: new examples
+            component.inputs.xxx = Input(path="arm_id")
+        :type data: Union[int, bool, float, str
+                          azure.ai.ml.Output,
+                          azure.ai.ml.Input]
+        :param owner: The owner component of the input, used to calculate binding.
+        :type owner: Union[azure.ai.ml.entities.BaseNode, azure.ai.ml.entities.PipelineJob]
+        :param kwargs: A dictionary of additional configuration parameters.
+        :type kwargs: dict
+        """
+        # TODO: validate data matches type in meta
+        # TODO: validate supported data
+        self._port_name = port_name
+        self._owner = owner
+        super().__init__(meta=meta, data=data, **kwargs)
+
+    def _build_default_data(self) -> None:
+        """Build default data when input not configured."""
+        if self._data is None:
+            self._data = Input()
+
+    def _build_data(self, data: T) -> Union[T, str, Input, InputOutputBase]:
+        """Build input data according to assigned input
+
+        eg: node.inputs.key = data
+
+        :param data: The data
+        :type data: T
+        :return: The built data
+        :rtype: Union[T, str, Input, "PipelineInput", "NodeOutput"]
+        """
+        _data: Union[T, str, NodeOutput] = resolve_pipeline_parameter(data)
+        if _data is None:
+            return _data
+        # Unidiomatic typecheck: Checks that data is _exactly_ this type, and not potentially a subtype
+        if type(_data) is NodeInput:  # pylint: disable=unidiomatic-typecheck
+            msg = "Can not bind input to another component's input."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        if isinstance(_data, (PipelineInput, NodeOutput)):
+            # If value is input or output, it's a data binding, we require it have a owner so we can convert it to
+            # a data binding, eg: ${{inputs.xxx}}
+            if isinstance(_data, NodeOutput) and _data._owner is None:
+                msg = "Setting input binding {} to output without owner is not allowed."
+                raise ValidationException(
+                    message=msg.format(_data),
+                    no_personal_data_message=msg.format("[_data]"),
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+            return _data
+        # for data binding case, set is_singular=False for case like "${{parent.inputs.job_in_folder}}/sample1.csv"
+        if isinstance(_data, Input) or is_data_binding_expression(_data, is_singular=False):
+            return _data
+        if isinstance(_data, (Data, Model)):
+            return _data_to_input(_data)
+        # self._meta.type could be None when sub pipeline has no annotation
+        if isinstance(self._meta, Input) and self._meta.type and not self._meta._is_primitive_type:
+            if isinstance(_data, str):
+                return Input(type=self._meta.type, path=_data)
+            msg = "only path input is supported now but get {}: {}."
+            raise UserErrorException(
+                message=msg.format(type(_data), _data),
+                no_personal_data_message=msg.format(type(_data), "[_data]"),
+            )
+        return _data
+
+    def _to_job_input(self) -> Optional[Union[Input, str, Output]]:
+        """convert the input to Input, this logic will change if backend contract changes."""
+        result: Optional[Union[Input, str, Output]] = None
+
+        if self._data is None:
+            # None data means this input is not configured.
+            result = None
+        elif isinstance(self._data, (PipelineInput, NodeOutput)):
+            # Build data binding when data is PipelineInput, Output
+            result = Input(path=self._data._data_binding(), mode=self.mode)
+        elif is_data_binding_expression(self._data):
+            result = Input(path=self._data, mode=self.mode)
+        else:
+            data_binding = _build_data_binding(self._data)
+            if is_data_binding_expression(self._data):
+                result = Input(path=data_binding, mode=self.mode)
+            else:
+                result = data_binding
+            # TODO: validate is self._data is supported
+
+        return result
+
+    def _data_binding(self) -> str:
+        msg = "Input binding {} can only come from a pipeline, currently got {}"
+        # call type(self._owner) to avoid circular import
+        raise ValidationException(
+            message=msg.format(self._port_name, type(self._owner)),
+            target=ErrorTarget.PIPELINE,
+            no_personal_data_message=msg.format("[port_name]", "[owner]"),
+            error_category=ErrorCategory.USER_ERROR,
+        )
+
+    def _copy(self, owner: Any) -> "NodeInput":
+        return NodeInput(
+            port_name=self._port_name,
+            data=self._data,
+            owner=owner,
+            meta=cast(Input, self._meta),
+        )
+
+    def _deepcopy(self) -> "NodeInput":
+        return NodeInput(
+            port_name=self._port_name,
+            data=copy.copy(self._data),
+            owner=self._owner,
+            meta=cast(Input, self._meta),
+        )
+
+    def _get_data_owner(self) -> Optional["BaseNode"]:
+        """Gets the data owner of the node
+
+        Note: This only works for @pipeline, not for YAML pipeline.
+
+        Note: Inner step will be returned as the owner when node's input is from sub pipeline's output.
+            @pipeline
+            def sub_pipeline():
+                inner_node = component_func()
+                return inner_node.outputs
+
+            @pipeline
+            def root_pipeline():
+                pipeline_node = sub_pipeline()
+                node = copy_files_component_func(input_dir=pipeline_node.outputs.output_dir)
+                owner = node.inputs.input_dir._get_data_owner()
+                assert owner == pipeline_node.nodes[0]
+
+        :return: The node if Input is from another node's output. Returns None for literal value.
+        :rtype: Optional[BaseNode]
+        """
+        from azure.ai.ml.entities import Pipeline
+        from azure.ai.ml.entities._builders import BaseNode
+
+        def _resolve_data_owner(data: Any) -> Optional["BaseNode"]:
+            if isinstance(data, BaseNode) and not isinstance(data, Pipeline):
+                return data
+            while isinstance(data, PipelineInput):
+                # for pipeline input, it's original value(can be literal value or another node's output)
+                # is stored in _original_data
+                return _resolve_data_owner(data._original_data)
+            if isinstance(data, NodeOutput):
+                if isinstance(data._owner, Pipeline):
+                    # for input from subgraph's output, trace back to inner node
+                    return _resolve_data_owner(data._binding_output)
+                # for input from another node's output, return the node
+                return _resolve_data_owner(data._owner)
+            return None
+
+        return _resolve_data_owner(self._data)
+
+
+class NodeOutput(InputOutputBase, PipelineExpressionMixin):
+    """Define one output of a Component."""
+
+    def __init__(
+        self,
+        port_name: str,
+        meta: Optional[Union[Input, Output]],
+        *,
+        data: Optional[Union[Output, str]] = None,
+        # TODO: Bug Item number: 2883405
+        owner: Optional[Union["BaseComponent", "PipelineJob"]] = None,  # type: ignore
+        binding_output: Optional["NodeOutput"] = None,
+        **kwargs: Any,
+    ):
+        """Initialize an Output of a component.
+
+        :param port_name: The port_name of the output.
+        :type port_name: str
+        :param name: The name used to register NodeOutput/PipelineOutput data.
+        :type name: str
+        :param version: The version used to register NodeOutput/PipelineOutput data.
+        :ype version: str
+        :param data: The output data. Valid types include str, Output
+        :type data: Union[str
+                          azure.ai.ml.entities.Output]
+        :param mode: The mode of the output.
+        :type mode: str
+        :param owner: The owner component of the output, used to calculate binding.
+        :type owner: Union[azure.ai.ml.entities.BaseNode, azure.ai.ml.entities.PipelineJob]
+        :param binding_output: The node output bound to pipeline output, only available for pipeline.
+        :type binding_output: azure.ai.ml.entities.NodeOutput
+        :param kwargs: A dictionary of additional configuration parameters.
+        :type kwargs: dict
+        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if object cannot be successfully validated.
+            Details will be provided in the error message.
+        """
+        # Allow inline output binding with string, eg: "component_out_path_1": "${{parents.outputs.job_out_data_1}}"
+        if data is not None and not isinstance(data, (Output, str)):
+            msg = "Got unexpected type for output: {}."
+            raise ValidationException(
+                message=msg.format(data),
+                target=ErrorTarget.PIPELINE,
+                no_personal_data_message=msg.format("[data]"),
+            )
+        super().__init__(meta=meta, data=data, **kwargs)
+        self._port_name = port_name
+        self._owner = owner
+        self._name: Optional[str] = self._data.name if isinstance(self._data, Output) else None
+        self._version: Optional[str] = self._data.version if isinstance(self._data, Output) else None
+
+        self._assert_name_and_version()
+
+        # store original node output to be able to trace back to inner node from a pipeline output builder.
+        self._binding_output = binding_output
+
+    @property
+    def port_name(self) -> str:
+        """The output port name, eg: node.outputs.port_name.
+
+        :return: The port name
+        :rtype: str
+        """
+        return self._port_name
+
+    @property
+    def name(self) -> Optional[str]:
+        """Used in registering output data.
+
+        :return: The output name
+        :rtype: str
+        """
+        return self._name
+
+    @name.setter
+    def name(self, name: str) -> None:
+        """Assigns the name to NodeOutput/PipelineOutput and builds data according to the name.
+
+        :param name: The new name
+        :type name: str
+        """
+        self._build_default_data()
+        self._name = name
+        if isinstance(self._data, Output):
+            self._data.name = name
+        elif isinstance(self._data, InputOutputBase):
+            self._data._name = name
+        else:
+            raise UserErrorException(
+                f"We support self._data of Input, Output, InputOutputBase, NodeOutput and NodeInput,"
+                f"but got type: {type(self._data)}."
+            )
+
+    @property
+    def version(self) -> Optional[str]:
+        """Used in registering output data.
+
+        :return: The output data
+        :rtype: str
+        """
+        return self._version
+
+    @version.setter
+    def version(self, version: str) -> None:
+        """Assigns the version to NodeOutput/PipelineOutput and builds data according to the version.
+
+        :param version: The new version
+        :type version: str
+        """
+        self._build_default_data()
+        self._version = version
+        if isinstance(self._data, Output):
+            self._data.version = version
+        elif isinstance(self._data, InputOutputBase):
+            self._data._version = version
+        else:
+            raise UserErrorException(
+                f"We support self._data of Input, Output, InputOutputBase, NodeOutput and NodeInput,"
+                f"but got type: {type(self._data)}."
+            )
+
+    @property
+    def path(self) -> Any:
+        # For node output path,
+        if self._data is not None and hasattr(self._data, "path"):
+            return self._data.path
+        return None
+
+    @path.setter
+    def path(self, path: Optional[str]) -> None:
+        # For un-configured output, we build a default data entry for them.
+        self._build_default_data()
+        if self._data is not None and hasattr(self._data, "path"):
+            self._data.path = path
+        else:
+            # YAML job will have string output binding and do not support setting path for it.
+            msg = f"{type(self._data)} does not support setting path."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+    def _assert_name_and_version(self) -> None:
+        if self.name and not (re.match("^[A-Za-z0-9_-]*$", self.name) and len(self.name) <= 255):
+            raise UserErrorException(
+                f"The output name {self.name} can only contain alphanumeric characters, dashes and underscores, "
+                f"with a limit of 255 characters."
+            )
+        if self.version and not self.name:
+            raise UserErrorException("Output name is required when output version is specified.")
+
+    def _build_default_data(self) -> None:
+        """Build default data when output not configured."""
+        if self._data is None:
+            # _meta will be None when node._component is not a Component object
+            # so we just leave the type inference work to backend
+            self._data = Output(type=None)  # type: ignore[call-overload]
+
+    def _build_data(self, data: T) -> Any:
+        """Build output data according to assigned input, eg: node.outputs.key = data
+
+        :param data: The data
+        :type data: T
+        :return: `data`
+        :rtype: T
+        """
+        if data is None:
+            return data
+        if not isinstance(data, (Output, str)):
+            msg = f"{self.__class__.__name__} only allow set {Output.__name__} object, {type(data)} is not supported."
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.PIPELINE,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        res: T = cast(T, data)
+        return res
+
+    def _to_job_output(self) -> Optional[Output]:
+        """Convert the output to Output, this logic will change if backend contract changes."""
+        if self._data is None:
+            # None data means this output is not configured.
+            result = None
+        elif isinstance(self._data, str):
+            result = Output(
+                type=AssetTypes.URI_FOLDER, path=self._data, mode=self.mode, name=self.name, version=self.version
+            )
+        elif isinstance(self._data, Output):
+            result = self._data
+        elif isinstance(self._data, PipelineOutput):
+            result = Output(
+                type=AssetTypes.URI_FOLDER,
+                path=self._data._data_binding(),
+                mode=self.mode,
+                name=self._data.name,
+                version=self._data.version,
+                description=self.description,
+            )
+        else:
+            msg = "Got unexpected type for output: {}."
+            raise ValidationException(
+                message=msg.format(self._data),
+                target=ErrorTarget.PIPELINE,
+                no_personal_data_message=msg.format("[data]"),
+            )
+        return result
+
+    def _data_binding(self) -> str:
+        if self._owner is not None:
+            return f"${{{{parent.jobs.{self._owner.name}.outputs.{self._port_name}}}}}"
+
+        return ""
+
+    def _copy(self, owner: Any) -> "NodeOutput":
+        return NodeOutput(
+            port_name=self._port_name,
+            data=cast(Output, self._data),
+            owner=owner,
+            meta=self._meta,
+        )
+
+    def _deepcopy(self) -> "NodeOutput":
+        return NodeOutput(
+            port_name=self._port_name,
+            data=cast(Output, copy.copy(self._data)),
+            owner=self._owner,
+            meta=self._meta,
+            binding_output=self._binding_output,
+        )
+
+
+class PipelineInput(NodeInput, PipelineExpressionMixin):
+    """Define one input of a Pipeline."""
+
+    def __init__(self, name: str, meta: Optional[Input], group_names: Optional[List[str]] = None, **kwargs: Any):
+        """Initialize a PipelineInput.
+
+        :param name: The name of the input.
+        :type name: str
+        :param meta: Metadata of this input, eg: type, min, max, etc.
+        :type meta: Input
+        :param group_names: The input parameter's group names.
+        :type group_names: List[str]
+        """
+        super(PipelineInput, self).__init__(port_name=name, meta=meta, **kwargs)
+        self._group_names = group_names if group_names else []
+
+    def result(self) -> Any:
+        """Return original value of pipeline input.
+
+        :return: The original value of pipeline input
+        :rtype: Any
+
+        Example:
+
+        .. code-block:: python
+
+           @pipeline
+           def pipeline_func(param1):
+             # node1's param1 will get actual value of param1 instead of a input binding.
+             node1 = component_func(param1=param1.result())
+        """
+
+        # use this to break self loop
+        original_data_cache: Set = set()
+        original_data = self._original_data
+        while isinstance(original_data, PipelineInput) and original_data not in original_data_cache:
+            original_data_cache.add(original_data)
+            original_data = original_data._original_data
+        return original_data
+
+    def __str__(self) -> str:
+        return self._data_binding()
+
+    @overload
+    def _build_data(self, data: Union[Model, Data]) -> Input: ...
+
+    @overload
+    def _build_data(self, data: T) -> Any: ...
+
+    def _build_data(self, data: Union[Model, Data, T]) -> Any:
+        """Build data according to input type.
+
+        :param data: The data
+        :type data: Union[Model, Data, T]
+        :return:
+            * Input if data is a Model or Data
+            * data otherwise
+        :rtype: Union[Input, T]
+        """
+        if data is None:
+            return data
+        # Unidiomatic typecheck: Checks that data is _exactly_ this type, and not potentially a subtype
+        if type(data) is NodeInput:  # pylint: disable=unidiomatic-typecheck
+            msg = "Can not bind input to another component's input."
+            raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.PIPELINE)
+        if isinstance(data, (PipelineInput, NodeOutput)):
+            # If value is input or output, it's a data binding, owner is required to convert it to
+            # a data binding, eg: ${{parent.inputs.xxx}}
+            if isinstance(data, NodeOutput) and data._owner is None:
+                msg = "Setting input binding {} to output without owner is not allowed."
+                raise ValidationException(
+                    message=msg.format(data),
+                    no_personal_data_message=msg.format("[data]"),
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+            return data
+        if isinstance(data, (Data, Model)):
+            # If value is Data, we convert it to an corresponding Input
+            return _data_to_input(data)
+        return data
+
+    def _data_binding(self) -> str:
+        full_name = "%s.%s" % (".".join(self._group_names), self._port_name) if self._group_names else self._port_name
+        return f"${{{{parent.inputs.{full_name}}}}}"
+
+    def _to_input(self) -> Optional[Union[Input, Output]]:
+        """Convert pipeline input to component input for pipeline component.
+
+        :return: The component input
+        :rtype: Input
+        """
+        if self._data is None:
+            # None data means this input is not configured.
+            return self._meta
+        data_type = self._data.type if isinstance(self._data, Input) else None
+        # If type is asset type, return data type without default.
+        # Else infer type from data and set it as default.
+        if data_type and data_type.lower() in AssetTypes.__dict__.values():
+            if not isinstance(self._data, (int, float, str)):
+                result = Input(type=data_type, mode=self._data.mode)
+        elif type(self._data) in IOConstants.PRIMITIVE_TYPE_2_STR:
+            result = Input(
+                type=IOConstants.PRIMITIVE_TYPE_2_STR[type(self._data)],
+                default=self._data,
+            )
+        else:
+            msg = f"Unsupported Input type {type(self._data)} detected when translate job to component."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        return result  # pylint: disable=possibly-used-before-assignment
+
+
+class PipelineOutput(NodeOutput):
+    """Define one output of a Pipeline."""
+
+    def _to_job_output(self) -> Optional[Output]:
+        result: Optional[Output] = None
+        if isinstance(self._data, Output):
+            # For pipeline output with type Output, always pass to backend.
+            result = self._data
+        elif self._data is None and self._meta and self._meta.type:
+            # For un-configured pipeline output with meta, we need to return Output with accurate type,
+            # so it won't default to uri_folder.
+            result = Output(type=self._meta.type, mode=self._meta.mode, description=self._meta.description)
+        else:
+            result = super(PipelineOutput, self)._to_job_output()
+        # Copy meta type to avoid built output's None type default to uri_folder.
+        if self.type and result is not None and not result.type:
+            result.type = self.type
+        return result
+
+    def _data_binding(self) -> str:
+        return f"${{{{parent.outputs.{self._port_name}}}}}"
+
+    def _to_output(self) -> Optional[Output]:
+        """Convert pipeline output to component output for pipeline component."""
+        if self._data is None:
+            # None data means this input is not configured.
+            return None
+        if isinstance(self._meta, Output):
+            return self._meta
+        # Assign type directly as we didn't have primitive output type for now.
+        if not isinstance(self._data, (int, float, str)):
+            return Output(type=self._data.type, mode=self._data.mode)
+        return Output()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/mixin.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/mixin.py
new file mode 100644
index 00000000..6c3d9357
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_io/mixin.py
@@ -0,0 +1,623 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import copy
+from typing import Any, Dict, List, Optional, Tuple, Type, Union
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import JobInput as RestJobInput
+from azure.ai.ml._restclient.v2023_04_01_preview.models import JobOutput as RestJobOutput
+from azure.ai.ml.constants._component import ComponentJobConstants
+from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output
+from azure.ai.ml.entities._util import copy_output_setting
+from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException
+
+from ..._input_output_helpers import (
+    from_rest_data_outputs,
+    from_rest_inputs_to_dataset_literal,
+    to_rest_data_outputs,
+    to_rest_dataset_literal_inputs,
+)
+from .._pipeline_job_helpers import from_dict_to_rest_io, process_sdk_component_job_io
+from .attr_dict import InputsAttrDict, OutputsAttrDict, _GroupAttrDict
+from .base import NodeInput, NodeOutput, PipelineInput, PipelineOutput
+
+
+class NodeIOMixin:
+    """Provides ability to wrap node inputs/outputs and build data bindings
+    dynamically."""
+
+    @classmethod
+    def _get_supported_inputs_types(cls) -> Optional[Any]:
+        return None
+
+    @classmethod
+    def _get_supported_outputs_types(cls) -> Optional[Any]:
+        return None
+
+    @classmethod
+    def _validate_io(cls, value: Any, allowed_types: Optional[tuple], *, key: Optional[str] = None) -> None:
+        if allowed_types is None:
+            return
+
+        if value is None or isinstance(value, allowed_types):
+            pass
+        else:
+            msg = "Expecting {} for input/output {}, got {} instead."
+            raise ValidationException(
+                message=msg.format(allowed_types, key, type(value)),
+                no_personal_data_message=msg.format(allowed_types, "[key]", type(value)),
+                target=ErrorTarget.PIPELINE,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+
+    def _build_input(
+        self,
+        name: str,
+        meta: Optional[Input],
+        data: Optional[Union[dict, int, bool, float, str, Output, "PipelineInput", Input]],
+    ) -> NodeInput:
+        # output mode of last node should not affect input mode of next node
+        if isinstance(data, NodeOutput):
+            # Decoupled input and output
+            # value = copy.deepcopy(value)
+            data = data._deepcopy()  # pylint: disable=protected-access
+            data.mode = None
+        elif isinstance(data, dict):
+            # Use type comparison instead of is_instance to skip _GroupAttrDict
+            # when loading from yaml io will be a dict,
+            # like {'job_data_path': '${{parent.inputs.pipeline_job_data_path}}'}
+            # parse dict to allowed type
+            data = Input(**data)
+
+            # parameter group can be of custom type, so we don't check it here
+            if meta is not None and not isinstance(meta, GroupInput):
+                self._validate_io(data, self._get_supported_inputs_types(), key=name)
+        return NodeInput(port_name=name, meta=meta, data=data, owner=self)
+
+    def _build_output(self, name: str, meta: Optional[Output], data: Optional[Union[Output, str]]) -> NodeOutput:
+        if isinstance(data, dict):
+            data = Output(**data)
+
+        self._validate_io(data, self._get_supported_outputs_types(), key=name)
+        # For un-configured outputs, settings it to None, so we won't pass extra fields(eg: default mode)
+        return NodeOutput(port_name=name, meta=meta, data=data, owner=self)
+
+    # pylint: disable=unused-argument
+    def _get_default_input_val(self, val: Any):  # type: ignore
+        # use None value as data placeholder for unfilled inputs.
+        # server side will fill the default value
+        return None
+
+    def _build_inputs_dict(
+        self,
+        inputs: Dict[str, Union[Input, str, bool, int, float]],
+        *,
+        input_definition_dict: Optional[dict] = None,
+    ) -> InputsAttrDict:
+        """Build an input attribute dict so user can get/set inputs by
+        accessing attribute, eg: node1.inputs.xxx.
+
+        :param inputs: Provided kwargs when parameterizing component func.
+        :type inputs: Dict[str, Union[Input, str, bool, int, float]]
+        :keyword input_definition_dict: Static input definition dict. If not provided, will build inputs without meta.
+        :paramtype input_definition_dict: dict
+        :return: Built dynamic input attribute dict.
+        :rtype: InputsAttrDict
+        """
+        if input_definition_dict is not None:
+            # TODO: validate inputs.keys() in input_definitions.keys()
+            input_dict = {}
+            for key, val in input_definition_dict.items():
+                if key in inputs.keys():
+                    # If input is set through component functions' kwargs, create an input object with real value.
+                    data = inputs[key]
+                else:
+                    data = self._get_default_input_val(val)  # pylint: disable=assignment-from-none
+
+                val = self._build_input(name=key, meta=val, data=data)
+                input_dict[key] = val
+        else:
+            input_dict = {key: self._build_input(name=key, meta=None, data=val) for key, val in inputs.items()}
+        return InputsAttrDict(input_dict)
+
+    def _build_outputs_dict(
+        self, outputs: Dict, *, output_definition_dict: Optional[dict] = None, none_data: bool = False
+    ) -> OutputsAttrDict:
+        """Build an output attribute dict so user can get/set outputs by
+        accessing attribute, eg: node1.outputs.xxx.
+
+        :param outputs: Provided kwargs when parameterizing component func.
+        :type outputs: Dict[str, Output]
+        :keyword output_definition_dict: Static output definition dict.
+        :paramtype output_definition_dict: Dict
+        :keyword none_data: If True, will set output data to None.
+        :paramtype none_data: bool
+        :return: Built dynamic output attribute dict.
+        :rtype: OutputsAttrDict
+        """
+        if output_definition_dict is not None:
+            # TODO: check if we need another way to mark a un-configured output instead of just set None.
+            # Create None as data placeholder for all outputs.
+            output_dict = {}
+            for key, val in output_definition_dict.items():
+                if key in outputs.keys():
+                    # If output has given value, create an output object with real value.
+                    val = self._build_output(name=key, meta=val, data=outputs[key])
+                else:
+                    val = self._build_output(name=key, meta=val, data=None)
+                output_dict[key] = val
+        else:
+            output_dict = {}
+            for key, val in outputs.items():
+                output_val = self._build_output(name=key, meta=None, data=val if not none_data else None)
+                output_dict[key] = output_val
+        return OutputsAttrDict(output_dict)
+
+    def _build_inputs(self) -> Dict:
+        """Build inputs of this component to a dict dict which maps output to
+        actual value.
+
+        The built input dict will have same input format as other jobs, eg:
+        {
+           "input_data": Input(path="path/to/input/data", mode="Mount"),
+           "input_value": 10,
+           "learning_rate": "${{jobs.step1.inputs.learning_rate}}"
+        }
+
+        :return: The input dict
+        :rtype: Dict[str, Union[Input, str, bool, int, float]]
+        """
+        inputs = {}
+        # pylint: disable=redefined-builtin
+        for name, input in self.inputs.items():  # type: ignore
+            if isinstance(input, _GroupAttrDict):
+                # Flatten group inputs into inputs dict
+                inputs.update(input.flatten(group_parameter_name=name))
+                continue
+            inputs[name] = input._to_job_input()  # pylint: disable=protected-access
+        return inputs
+
+    def _build_outputs(self) -> Dict[str, Output]:
+        """Build outputs of this component to a dict which maps output to
+        actual value.
+
+        The built output dict will have same output format as other jobs, eg:
+        {
+            "eval_output": "${{jobs.eval.outputs.eval_output}}"
+        }
+
+        :return: The output dict
+        :rtype: Dict[str, Output]
+        """
+        outputs = {}
+        for name, output in self.outputs.items():  # type: ignore
+            if isinstance(output, NodeOutput):
+                output = output._to_job_output()  # pylint: disable=protected-access
+            outputs[name] = output
+        # Remove non-configured output
+        return {k: v for k, v in outputs.items() if v is not None}
+
+    def _to_rest_inputs(self) -> Dict[str, Dict]:
+        """Translate input builders to rest input dicts.
+
+        The built dictionary's format aligns with component job's input yaml, eg:
+        {
+           "input_data": {"data": {"path": "path/to/input/data"},  "mode"="Mount"},
+           "input_value": 10,
+           "learning_rate": "${{jobs.step1.inputs.learning_rate}}"
+        }
+
+        :return: The REST inputs
+        :rtype: Dict[str, Dict]
+        """
+        built_inputs = self._build_inputs()
+        return self._input_entity_to_rest_inputs(input_entity=built_inputs)
+
+    @classmethod
+    def _input_entity_to_rest_inputs(cls, input_entity: Dict[str, Input]) -> Dict[str, Dict]:
+        # Convert io entity to rest io objects
+        input_bindings, dataset_literal_inputs = process_sdk_component_job_io(
+            input_entity, [ComponentJobConstants.INPUT_PATTERN]
+        )
+
+        # parse input_bindings to InputLiteral(value=str(binding))
+        rest_inputs = {**input_bindings, **dataset_literal_inputs}
+        # Note: The function will only be called from BaseNode,
+        # and job_type is used to enable dot in pipeline job input keys,
+        # so pass job_type as None directly here.
+        rest_inputs = to_rest_dataset_literal_inputs(rest_inputs, job_type=None)
+
+        # convert rest io to dict
+        rest_dataset_literal_inputs = {}
+        for name, val in rest_inputs.items():
+            rest_dataset_literal_inputs[name] = val.as_dict()
+            if hasattr(val, "mode") and val.mode:
+                rest_dataset_literal_inputs[name].update({"mode": val.mode.value})
+        return rest_dataset_literal_inputs
+
+    def _to_rest_outputs(self) -> Dict[str, Dict]:
+        """Translate output builders to rest output dicts.
+
+        The built dictionary's format aligns with component job's output yaml, eg:
+        {"eval_output": "${{jobs.eval.outputs.eval_output}}"}
+
+        :return: The REST outputs
+        :rtype: Dict[str, Dict]
+        """
+        built_outputs = self._build_outputs()
+
+        # Convert io entity to rest io objects
+        output_bindings, data_outputs = process_sdk_component_job_io(
+            built_outputs, [ComponentJobConstants.OUTPUT_PATTERN]
+        )
+        rest_data_outputs = to_rest_data_outputs(data_outputs)
+
+        # convert rest io to dict
+        # parse output_bindings to {"value": binding, "type": "literal"} since there's no mode for it
+        rest_output_bindings = {}
+        for key, binding in output_bindings.items():
+            rest_output_bindings[key] = {"value": binding["value"], "type": "literal"}
+            if "mode" in binding:
+                rest_output_bindings[key].update({"mode": binding["mode"].value})
+            if "name" in binding:
+                rest_output_bindings[key].update({"name": binding["name"]})
+            if "version" in binding:
+                rest_output_bindings[key].update({"version": binding["version"]})
+
+        def _rename_name_and_version(output_dict: Dict) -> Dict:
+            # NodeOutput can only be registered with name and version, therefore we rename here
+            if "asset_name" in output_dict.keys():
+                output_dict["name"] = output_dict.pop("asset_name")
+            if "asset_version" in output_dict.keys():
+                output_dict["version"] = output_dict.pop("asset_version")
+            return output_dict
+
+        rest_data_outputs = {name: _rename_name_and_version(val.as_dict()) for name, val in rest_data_outputs.items()}
+        self._update_output_types(rest_data_outputs)
+        rest_data_outputs.update(rest_output_bindings)
+        return rest_data_outputs
+
+    @classmethod
+    def _from_rest_inputs(cls, inputs: Dict) -> Dict[str, Union[Input, str, bool, int, float]]:
+        """Load inputs from rest inputs.
+
+        :param inputs: The REST inputs
+        :type inputs: Dict[str, Union[str, dict]]
+        :return: Input dict
+        :rtype: Dict[str, Union[Input, str, bool, int, float]]
+        """
+
+        # JObject -> RestJobInput/RestJobOutput
+        input_bindings, rest_inputs = from_dict_to_rest_io(inputs, RestJobInput, [ComponentJobConstants.INPUT_PATTERN])
+
+        # RestJobInput/RestJobOutput -> Input/Output
+        dataset_literal_inputs = from_rest_inputs_to_dataset_literal(rest_inputs)
+
+        return {**dataset_literal_inputs, **input_bindings}
+
+    @classmethod
+    def _from_rest_outputs(cls, outputs: Dict[str, Union[str, dict]]) -> Dict:
+        """Load outputs from rest outputs.
+
+        :param outputs: The REST outputs
+        :type outputs: Dict[str, Union[str, dict]]
+        :return: Output dict
+        :rtype: Dict[str, Output]
+        """
+
+        # JObject -> RestJobInput/RestJobOutput
+        output_bindings, rest_outputs = from_dict_to_rest_io(
+            outputs, RestJobOutput, [ComponentJobConstants.OUTPUT_PATTERN]
+        )
+
+        # RestJobInput/RestJobOutput -> Input/Output
+        data_outputs = from_rest_data_outputs(rest_outputs)
+
+        return {**data_outputs, **output_bindings}
+
+    def _update_output_types(self, rest_data_outputs: dict) -> None:
+        """Update output types in rest_data_outputs according to meta level output.
+
+        :param rest_data_outputs: The REST data outputs
+        :type rest_data_outputs: Dict
+        """
+
+        for name, rest_output in rest_data_outputs.items():
+            original_output = self.outputs[name]  # type: ignore
+            # for configured output with meta, "correct" the output type to file to avoid the uri_folder default value
+            if original_output and original_output.type:
+                if original_output.type in ["AnyFile", "uri_file"]:
+                    rest_output["job_output_type"] = "uri_file"
+
+
+def flatten_dict(
+    dct: Optional[Dict],
+    _type: Union[Type["_GroupAttrDict"], Type[GroupInput]],
+    *,
+    allow_dict_fields: Optional[List[str]] = None,
+) -> Dict:
+    """Flatten inputs/input_definitions dict for inputs dict build.
+
+    :param dct: The dictionary to flatten
+    :type dct: Dict
+    :param _type: Either _GroupAttrDict or GroupInput (both have the method `flatten`)
+    :type _type: Union[Type["_GroupAttrDict"], Type[GroupInput]]
+    :keyword allow_dict_fields: A list of keys for dictionary values that will be included in flattened output
+    :paramtype allow_dict_fields: Optional[List[str]]
+    :return: The flattened dict
+    :rtype: Dict
+    """
+    _result = {}
+    if dct is not None:
+        for key, val in dct.items():
+            # to support passing dict value as parameter group
+            if allow_dict_fields and key in allow_dict_fields and isinstance(val, dict):
+                # for child dict, all values are allowed to be dict
+                for flattened_key, flattened_val in flatten_dict(
+                    val, _type, allow_dict_fields=list(val.keys())
+                ).items():
+                    _result[key + "." + flattened_key] = flattened_val
+                continue
+            val = GroupInput.custom_class_value_to_attr_dict(val)
+            if isinstance(val, _type):
+                _result.update(val.flatten(group_parameter_name=key))
+                continue
+            _result[key] = val
+    return _result
+
+
+class NodeWithGroupInputMixin(NodeIOMixin):
+    """This class provide build_inputs_dict for a node to use ParameterGroup as an input."""
+
+    @classmethod
+    def _validate_group_input_type(
+        cls,
+        input_definition_dict: dict,
+        inputs: Dict[str, Union[Input, str, bool, int, float]],
+    ) -> None:
+        """Raise error when group input receive a value not group type.
+
+        :param input_definition_dict: The input definition dict
+        :type input_definition_dict: dict
+        :param inputs: The inputs
+        :type inputs: Dict[str, Union[Input, str, bool, int, float]]
+        """
+        # Note: We put and extra validation here instead of doing it in pipeline._validate()
+        # due to group input will be discarded silently if assign it to a non-group parameter.
+        group_msg = "'%s' is defined as a parameter group but got input '%s' with type '%s'."
+        non_group_msg = "'%s' is defined as a parameter but got a parameter group as input."
+        for key, val in inputs.items():
+            definition = input_definition_dict.get(key)
+            val = GroupInput.custom_class_value_to_attr_dict(val)
+            if val is None:
+                continue
+            # 1. inputs.group = 'a string'
+            if isinstance(definition, GroupInput) and not isinstance(val, (_GroupAttrDict, dict)):
+                raise ValidationException(
+                    message=group_msg % (key, val, type(val)),
+                    no_personal_data_message=group_msg % ("[key]", "[val]", "[type(val)]"),
+                    target=ErrorTarget.PIPELINE,
+                    type=ValidationErrorType.INVALID_VALUE,
+                )
+            # 2. inputs.str_param = group
+            if not isinstance(definition, GroupInput) and isinstance(val, _GroupAttrDict):
+                raise ValidationException(
+                    message=non_group_msg % key,
+                    no_personal_data_message=non_group_msg % "[key]",
+                    target=ErrorTarget.PIPELINE,
+                    type=ValidationErrorType.INVALID_VALUE,
+                )
+
+    @classmethod
+    def _flatten_inputs_and_definition(
+        cls,
+        inputs: Dict[str, Union[Input, str, bool, int, float]],
+        input_definition_dict: dict,
+    ) -> Tuple[Dict, Dict]:
+        """
+        Flatten all GroupInput(definition) and GroupAttrDict recursively and build input dict.
+        For example:
+        input_definition_dict = {
+            "group1": GroupInput(
+                values={
+                    "param1": GroupInput(
+                         values={
+                             "param1_1": Input(type="str"),
+                         }
+                    ),
+                    "param2": Input(type="int"),
+                }
+            ),
+            "group2": GroupInput(
+                values={
+                    "param3": Input(type="str"),
+                }
+            ),
+        } => {
+            "group1.param1.param1_1": Input(type="str"),
+            "group1.param2": Input(type="int"),
+            "group2.param3": Input(type="str"),
+        }
+        inputs = {
+            "group1": {
+                "param1": {
+                    "param1_1": "value1",
+                },
+                "param2": 2,
+            },
+            "group2": {
+                "param3": "value3",
+            },
+        } => {
+            "group1.param1.param1_1": "value1",
+            "group1.param2": 2,
+            "group2.param3": "value3",
+        }
+        :param inputs: The inputs
+        :type inputs: Dict[str, Union[Input, str, bool, int, float]]
+        :param input_definition_dict: The input definition dict
+        :type input_definition_dict: dict
+        :return: The flattened inputs and definition
+        :rtype: Tuple[Dict, Dict]
+        """
+        group_input_names = [key for key, val in input_definition_dict.items() if isinstance(val, GroupInput)]
+        flattened_inputs = flatten_dict(inputs, _GroupAttrDict, allow_dict_fields=group_input_names)
+        flattened_definition_dict = flatten_dict(input_definition_dict, GroupInput)
+        return flattened_inputs, flattened_definition_dict
+
+    def _build_inputs_dict(
+        self,
+        inputs: Dict[str, Union[Input, str, bool, int, float]],
+        *,
+        input_definition_dict: Optional[dict] = None,
+    ) -> InputsAttrDict:
+        """Build an input attribute dict so user can get/set inputs by
+        accessing attribute, eg: node1.inputs.xxx.
+
+        :param inputs: Provided kwargs when parameterizing component func.
+        :type inputs: Dict[str, Union[Input, str, bool, int, float]]
+        :keyword input_definition_dict: Input definition dict from component entity.
+        :paramtype input_definition_dict: dict
+        :return: Built input attribute dict.
+        :rtype: InputsAttrDict
+        """
+
+        # TODO: should we support group input when there is no local input definition?
+        if input_definition_dict is not None:
+            # Validate group mismatch
+            self._validate_group_input_type(input_definition_dict, inputs)
+
+            # Flatten inputs and definition
+            flattened_inputs, flattened_definition_dict = self._flatten_inputs_and_definition(
+                inputs, input_definition_dict
+            )
+            # Build: zip all flattened parameter with definition
+            inputs = super()._build_inputs_dict(flattened_inputs, input_definition_dict=flattened_definition_dict)
+            return InputsAttrDict(GroupInput.restore_flattened_inputs(inputs))
+        return super()._build_inputs_dict(inputs)
+
+
+class PipelineJobIOMixin(NodeWithGroupInputMixin):
+    """Provides ability to wrap pipeline job inputs/outputs and build data bindings
+    dynamically."""
+
+    def _build_input(self, name: str, meta: Optional[Input], data: Any) -> "PipelineInput":
+        return PipelineInput(name=name, meta=meta, data=data, owner=self)
+
+    def _build_output(
+        self, name: str, meta: Optional[Union[Input, Output]], data: Optional[Union[Output, str]]
+    ) -> "PipelineOutput":
+        # TODO: settings data to None for un-configured outputs so we won't passing extra fields(eg: default mode)
+        result = PipelineOutput(port_name=name, meta=meta, data=data, owner=self)
+        return result
+
+    def _build_inputs_dict(
+        self,
+        inputs: Dict[str, Union[Input, str, bool, int, float]],
+        *,
+        input_definition_dict: Optional[dict] = None,
+    ) -> InputsAttrDict:
+        """Build an input attribute dict so user can get/set inputs by
+        accessing attribute, eg: node1.inputs.xxx.
+
+        :param inputs: Provided kwargs when parameterizing component func.
+        :type inputs: Dict[str, Union[Input, str, bool, int, float]]
+        :keyword input_definition_dict: Input definition dict from component entity.
+        :return: Built input attribute dict.
+        :rtype: InputsAttrDict
+        """
+        input_dict = super()._build_inputs_dict(inputs, input_definition_dict=input_definition_dict)
+        # TODO: should we do this when input_definition_dict is not None?
+        # TODO: should we put this in super()._build_inputs_dict?
+        if input_definition_dict is None:
+            return InputsAttrDict(GroupInput.restore_flattened_inputs(input_dict))
+        return input_dict
+
+    def _build_output_for_pipeline(self, name: str, data: Optional[Union[Output, NodeOutput]]) -> "PipelineOutput":
+        """Build an output object for pipeline and copy settings from source output.
+
+        :param name: Output name.
+        :type name: str
+        :param data: Output data.
+        :type data: Optional[Union[Output, NodeOutput]]
+        :return: Built output object.
+        :rtype: PipelineOutput
+        """
+        # pylint: disable=protected-access
+        if data is None:
+            # For None output, build an empty output builder
+            output_val = self._build_output(name=name, meta=None, data=None)
+        elif isinstance(data, Output):
+            # For output entity, build an output builder with data points to it
+            output_val = self._build_output(name=name, meta=data, data=data)
+        elif isinstance(data, NodeOutput):
+            # For output builder, build a new output builder and copy settings from it
+            output_val = self._build_output(name=name, meta=data._meta, data=None)
+            copy_output_setting(source=data, target=output_val)
+        else:
+            message = "Unsupported output type: {} for pipeline output: {}: {}"
+            raise ValidationException(
+                message=message.format(type(data), name, data),
+                no_personal_data_message=message,
+                target=ErrorTarget.PIPELINE,
+            )
+        return output_val
+
+    def _build_pipeline_outputs_dict(self, outputs: Dict) -> OutputsAttrDict:
+        """Build an output attribute dict without output definition metadata.
+        For pipeline outputs, its setting should be copied from node level outputs.
+
+        :param outputs: Node output dict or pipeline component's outputs.
+        :type outputs: Dict[str, Union[Output, NodeOutput]]
+        :return: Built dynamic output attribute dict.
+        :rtype: OutputsAttrDict
+        """
+        output_dict = {}
+        for key, val in outputs.items():
+            output_dict[key] = self._build_output_for_pipeline(name=key, data=val)
+        return OutputsAttrDict(output_dict)
+
+    def _build_outputs(self) -> Dict[str, Output]:
+        """Build outputs of this pipeline to a dict which maps output to actual
+        value.
+
+        The built dictionary's format aligns with component job's output yaml,
+        un-configured outputs will be None, eg:
+        {"eval_output": "${{jobs.eval.outputs.eval_output}}", "un_configured": None}
+
+        :return: The output dict
+        :rtype: Dict[str, Output]
+        """
+        outputs = {}
+        for name, output in self.outputs.items():  # type: ignore
+            if isinstance(output, NodeOutput):
+                output = output._to_job_output()  # pylint: disable=protected-access
+            outputs[name] = output
+        return outputs
+
+    def _get_default_input_val(self, val: Any):  # type: ignore
+        # use Default value as data placeholder for unfilled inputs.
+        # client side need to fill the default value for dsl.pipeline
+        if isinstance(val, GroupInput):
+            # Copy default value dict for group
+            return copy.deepcopy(val.default)
+        return val.default
+
+    def _update_output_types(self, rest_data_outputs: Dict) -> None:
+        """Won't clear output type for pipeline level outputs since it's required in rest object.
+
+        :param rest_data_outputs: The REST data outputs
+        :type rest_data_outputs: Dict
+        """
+
+
+class AutoMLNodeIOMixin(NodeIOMixin):
+    """Wrap outputs of automl node and build data bindings dynamically."""
+
+    def __init__(self, **kwargs):  # type: ignore
+        # add a inputs field to align with other nodes
+        self.inputs = {}
+        super(AutoMLNodeIOMixin, self).__init__(**kwargs)
+        if getattr(self, "outputs", None):
+            self._outputs = self._build_outputs_dict(self.outputs or {})
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_load_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_load_component.py
new file mode 100644
index 00000000..60c4cbe7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_load_component.py
@@ -0,0 +1,313 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+from typing import Any, Callable, Dict, List, Mapping, Optional, Union, cast
+
+from marshmallow import INCLUDE
+
+from azure.ai.ml import Output
+from azure.ai.ml._schema import NestedField
+from azure.ai.ml._schema.pipeline.component_job import SweepSchema
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, SOURCE_PATH_CONTEXT_KEY, CommonYamlFields
+from azure.ai.ml.constants._component import ControlFlowType, DataTransferTaskType, NodeType
+from azure.ai.ml.constants._compute import ComputeType
+from azure.ai.ml.dsl._component_func import to_component_func
+from azure.ai.ml.dsl._overrides_definition import OverrideDefinition
+from azure.ai.ml.entities._builders import (
+    BaseNode,
+    Command,
+    DataTransferCopy,
+    DataTransferExport,
+    DataTransferImport,
+    Import,
+    Parallel,
+    Spark,
+    Sweep,
+)
+from azure.ai.ml.entities._builders.condition_node import ConditionNode
+from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
+from azure.ai.ml.entities._builders.do_while import DoWhile
+from azure.ai.ml.entities._builders.parallel_for import ParallelFor
+from azure.ai.ml.entities._builders.pipeline import Pipeline
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+from azure.ai.ml.entities._util import get_type_from_spec
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+
+class _PipelineNodeFactory:
+    """A class to create pipeline node instances from yaml dict or rest objects without hard-coded type check."""
+
+    def __init__(self) -> None:
+        self._create_instance_funcs: dict = {}
+        self._load_from_rest_object_funcs: dict = {}
+
+        self.register_type(
+            _type=NodeType.COMMAND,
+            create_instance_func=lambda: Command.__new__(Command),
+            load_from_rest_object_func=Command._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=NodeType.IMPORT,
+            create_instance_func=lambda: Import.__new__(Import),
+            load_from_rest_object_func=Import._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=NodeType.PARALLEL,
+            create_instance_func=lambda: Parallel.__new__(Parallel),
+            load_from_rest_object_func=Parallel._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=NodeType.PIPELINE,
+            create_instance_func=lambda: Pipeline.__new__(Pipeline),
+            load_from_rest_object_func=Pipeline._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=NodeType.SWEEP,
+            create_instance_func=lambda: Sweep.__new__(Sweep),
+            load_from_rest_object_func=Sweep._from_rest_object,
+            nested_schema=NestedField(SweepSchema, unknown=INCLUDE),
+        )
+        self.register_type(
+            _type=NodeType.AUTOML,
+            create_instance_func=None,
+            load_from_rest_object_func=self._automl_from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=NodeType.SPARK,
+            create_instance_func=lambda: Spark.__new__(Spark),
+            load_from_rest_object_func=Spark._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=ControlFlowType.DO_WHILE,
+            create_instance_func=None,
+            load_from_rest_object_func=DoWhile._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=ControlFlowType.IF_ELSE,
+            create_instance_func=None,
+            load_from_rest_object_func=ConditionNode._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=ControlFlowType.PARALLEL_FOR,
+            create_instance_func=None,
+            load_from_rest_object_func=ParallelFor._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.COPY_DATA]),
+            create_instance_func=lambda: DataTransferCopy.__new__(DataTransferCopy),
+            load_from_rest_object_func=DataTransferCopy._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.IMPORT_DATA]),
+            create_instance_func=lambda: DataTransferImport.__new__(DataTransferImport),
+            load_from_rest_object_func=DataTransferImport._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.EXPORT_DATA]),
+            create_instance_func=lambda: DataTransferExport.__new__(DataTransferExport),
+            load_from_rest_object_func=DataTransferExport._from_rest_object,
+            nested_schema=None,
+        )
+        self.register_type(
+            _type=NodeType.FLOW_PARALLEL,
+            create_instance_func=lambda: Parallel.__new__(Parallel),
+            load_from_rest_object_func=None,
+            nested_schema=None,
+        )
+
+    @classmethod
+    def _get_func(cls, _type: str, funcs: Dict[str, Callable]) -> Callable:
+        if _type == NodeType._CONTAINER:
+            msg = (
+                "Component returned by 'list' is abbreviated and can not be used directly, "
+                "please use result from 'get'."
+            )
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.COMPONENT,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        _type = get_type_from_spec({CommonYamlFields.TYPE: _type}, valid_keys=funcs)
+        return funcs[_type]
+
+    def get_create_instance_func(self, _type: str) -> Callable[..., BaseNode]:
+        """Get the function to create a new instance of the node.
+
+        :param _type: The type of the node.
+        :type _type: str
+        :return: The create instance function
+        :rtype: Callable[..., BaseNode]
+        """
+        return self._get_func(_type, self._create_instance_funcs)
+
+    def get_load_from_rest_object_func(self, _type: str) -> Callable:
+        """Get the function to load a node from a rest object.
+
+        :param _type: The type of the node.
+        :type _type: str
+        :return: The `_load_from_rest_object` function
+        :rtype: Callable[[Any], Union[BaseNode, AutoMLJob, ControlFlowNode]]
+        """
+        return self._get_func(_type, self._load_from_rest_object_funcs)
+
+    def register_type(
+        self,
+        _type: str,
+        *,
+        create_instance_func: Optional[Callable[..., Union[BaseNode, AutoMLJob]]] = None,
+        load_from_rest_object_func: Optional[Callable] = None,
+        nested_schema: Optional[Union[NestedField, List[NestedField]]] = None,
+    ) -> None:
+        """Register a type of node.
+
+        :param _type: The type of the node.
+        :type _type: str
+        :keyword create_instance_func: A function to create a new instance of the node
+        :paramtype create_instance_func: typing.Optional[typing.Callable[..., typing.Union[BaseNode, AutoMLJob]]]
+        :keyword load_from_rest_object_func: A function to load a node from a rest object
+        :paramtype load_from_rest_object_func: typing.Optional[typing.Callable[[Any], typing.Union[BaseNode, AutoMLJob\
+            , ControlFlowNode]]]
+        :keyword nested_schema: schema/schemas of corresponding nested field, will be used in \
+            PipelineJobSchema.jobs.value
+        :paramtype nested_schema: typing.Optional[typing.Union[NestedField, List[NestedField]]]
+        """
+        if create_instance_func is not None:
+            self._create_instance_funcs[_type] = create_instance_func
+        if load_from_rest_object_func is not None:
+            self._load_from_rest_object_funcs[_type] = load_from_rest_object_func
+        if nested_schema is not None:
+            from azure.ai.ml._schema.core.fields import TypeSensitiveUnionField
+            from azure.ai.ml._schema.pipeline.pipeline_component import PipelineComponentSchema
+            from azure.ai.ml._schema.pipeline.pipeline_job import PipelineJobSchema
+
+            for declared_fields in [
+                PipelineJobSchema._declared_fields,
+                PipelineComponentSchema._declared_fields,
+            ]:
+                jobs_value_field: TypeSensitiveUnionField = declared_fields["jobs"].value_field
+                if not isinstance(nested_schema, list):
+                    nested_schema = [nested_schema]
+                for nested_field in nested_schema:
+                    jobs_value_field.insert_type_sensitive_field(type_name=_type, field=nested_field)
+
+    def load_from_dict(self, *, data: dict, _type: Optional[str] = None) -> Union[BaseNode, AutoMLJob]:
+        """Load a node from a dict.
+
+        :keyword data: A dict containing the node's data.
+        :paramtype data: dict
+        :keyword _type: The type of the node. If not specified, it will be inferred from the data.
+        :paramtype _type: str
+        :return: The node
+        :rtype: Union[BaseNode, AutoMLJob]
+        """
+        if _type is None:
+            _type = data[CommonYamlFields.TYPE] if CommonYamlFields.TYPE in data else NodeType.COMMAND
+            # todo: refine Hard code for now to support different task type for DataTransfer node
+            if _type == NodeType.DATA_TRANSFER:
+                _type = "_".join([NodeType.DATA_TRANSFER, data.get("task", " ")])
+        else:
+            data[CommonYamlFields.TYPE] = _type
+
+        new_instance: Union[BaseNode, AutoMLJob] = self.get_create_instance_func(_type)()
+
+        if isinstance(new_instance, BaseNode):
+            # parse component
+            component_key = new_instance._get_component_attr_name()
+            if component_key in data and isinstance(data[component_key], dict):
+                data[component_key] = Component._load(
+                    data=data[component_key],
+                    yaml_path=data[component_key].pop(SOURCE_PATH_CONTEXT_KEY, None),
+                )
+        # TODO: Bug Item number: 2883415
+        new_instance.__init__(**data)  # type: ignore
+        return new_instance
+
+    def load_from_rest_object(
+        self, *, obj: dict, _type: Optional[str] = None, **kwargs: Any
+    ) -> Union[BaseNode, AutoMLJob, ControlFlowNode]:
+        """Load a node from a rest object.
+
+        :keyword obj: A rest object containing the node's data.
+        :paramtype obj: dict
+        :keyword _type: The type of the node. If not specified, it will be inferred from the data.
+        :paramtype _type: str
+        :return: The node
+        :rtype: Union[BaseNode, AutoMLJob, ControlFlowNode]
+        """
+
+        # TODO: Remove in PuP with native import job/component type support in MFE/Designer
+        if "computeId" in obj and obj["computeId"] and obj["computeId"].endswith("/" + ComputeType.ADF):
+            _type = NodeType.IMPORT
+
+        if _type is None:
+            _type = obj[CommonYamlFields.TYPE] if CommonYamlFields.TYPE in obj else NodeType.COMMAND
+            # todo: refine Hard code for now to support different task type for DataTransfer node
+            if _type == NodeType.DATA_TRANSFER:
+                _type = "_".join([NodeType.DATA_TRANSFER, obj.get("task", " ")])
+        else:
+            obj[CommonYamlFields.TYPE] = _type
+
+        res: Union[BaseNode, AutoMLJob, ControlFlowNode] = self.get_load_from_rest_object_func(_type)(obj, **kwargs)
+        return res
+
+    @classmethod
+    def _automl_from_rest_object(cls, node: Dict) -> AutoMLJob:
+        _outputs = cast(Dict[str, Union[str, dict]], node.get("outputs"))
+        # rest dict outputs -> Output objects
+        outputs = AutoMLJob._from_rest_outputs(_outputs)
+        # Output objects -> yaml dict outputs
+        parsed_outputs = {}
+        for key, val in outputs.items():
+            if isinstance(val, Output):
+                val = val._to_dict()
+            parsed_outputs[key] = val
+        node["outputs"] = parsed_outputs
+        return AutoMLJob._load_from_dict(
+            node,
+            context={BASE_PATH_CONTEXT_KEY: "./"},
+            additional_message="Failed to load automl task from backend.",
+            inside_pipeline=True,
+        )
+
+
+def _generate_component_function(
+    component_entity: Component,
+    override_definitions: Optional[Mapping[str, OverrideDefinition]] = None,  # pylint: disable=unused-argument
+) -> Callable[..., Union[Command, Parallel]]:
+    # Generate a function which returns a component node.
+    def create_component_func(**kwargs: Any) -> Union[BaseNode, AutoMLJob]:
+        # todo: refine Hard code for now to support different task type for DataTransfer node
+        _type = component_entity.type
+        if _type == NodeType.DATA_TRANSFER:
+            # TODO: Bug Item number: 2883431
+            _type = "_".join([NodeType.DATA_TRANSFER, component_entity.task])  # type: ignore
+            if component_entity.task == DataTransferTaskType.IMPORT_DATA:  # type: ignore
+                return pipeline_node_factory.load_from_dict(
+                    data={"component": component_entity, "_from_component_func": True, **kwargs},
+                    _type=_type,
+                )
+        return pipeline_node_factory.load_from_dict(
+            data={"component": component_entity, "inputs": kwargs, "_from_component_func": True},
+            _type=_type,
+        )
+
+    res: Callable = to_component_func(component_entity, create_component_func)
+    return res
+
+
+pipeline_node_factory = _PipelineNodeFactory()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_expression.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_expression.py
new file mode 100644
index 00000000..49bb8a61
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_expression.py
@@ -0,0 +1,662 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import re
+import tempfile
+from collections import namedtuple
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
+
+from azure.ai.ml._utils.utils import dump_yaml_to_file, get_all_data_binding_expressions, load_yaml
+from azure.ai.ml.constants._common import AZUREML_PRIVATE_FEATURES_ENV_VAR, DefaultOpenEncoding
+from azure.ai.ml.constants._component import ComponentParameterTypes, IOConstants
+from azure.ai.ml.exceptions import UserErrorException
+
+if TYPE_CHECKING:
+    from azure.ai.ml.entities._builders import BaseNode
+
+ExpressionInput = namedtuple("ExpressionInput", ["name", "type", "value"])
+NONE_PARAMETER_TYPE = "None"
+
+
+class PipelineExpressionOperator:
+    """Support operator in native Python experience."""
+
+    ADD = "+"
+    SUB = "-"
+    MUL = "*"
+    DIV = "/"
+    MOD = "%"
+    POW = "**"
+    FLOORDIV = "//"
+    LT = "<"
+    GT = ">"
+    LTE = "<="
+    GTE = ">="
+    EQ = "=="
+    NE = "!="
+    AND = "&"
+    OR = "|"
+    XOR = "^"
+
+
+_SUPPORTED_OPERATORS = {
+    getattr(PipelineExpressionOperator, attr)
+    for attr in PipelineExpressionOperator.__dict__
+    if not attr.startswith("__")
+}
+
+
+def _enumerate_operation_combination() -> Dict[str, Union[str, Exception]]:
+    """Enumerate the result type of binary operations on types
+
+    Leverages `eval` to validate operation and get its result type.
+
+    :return: A dictionary that maps an operation to either:
+      * A result type
+      * An Exception
+    :rtype: Dict[str, Union[str, Exception]]
+    """
+    res: Dict = {}
+    primitive_types_values = {
+        NONE_PARAMETER_TYPE: repr(None),
+        ComponentParameterTypes.BOOLEAN: repr(True),
+        ComponentParameterTypes.INTEGER: repr(1),
+        ComponentParameterTypes.NUMBER: repr(1.0),
+        ComponentParameterTypes.STRING: repr("1"),
+    }
+    for type1, operand1 in primitive_types_values.items():
+        for type2, operand2 in primitive_types_values.items():
+            for operator in _SUPPORTED_OPERATORS:
+                k = f"{type1} {operator} {type2}"
+                try:
+                    eval_result = eval(f"{operand1} {operator} {operand2}")  # pylint: disable=eval-used # nosec
+                    res[k] = IOConstants.PRIMITIVE_TYPE_2_STR[type(eval_result)]
+                except TypeError:
+                    error_message = (
+                        f"Operator '{operator}' is not supported between instances of '{type1}' and '{type2}'."
+                    )
+                    res[k] = UserErrorException(message=error_message, no_personal_data_message=error_message)
+    return res
+
+
+# enumerate and store as a lookup table:
+#   key format is "<operand1_type> <operator> <operand2_type>"
+#   value can be either result type as str and UserErrorException for invalid operation
+_OPERATION_RESULT_TYPE_LOOKUP = _enumerate_operation_combination()
+
+
+class PipelineExpressionMixin:
+    _SUPPORTED_PRIMITIVE_TYPES = (bool, int, float, str)
+    _SUPPORTED_PIPELINE_INPUT_TYPES = (
+        ComponentParameterTypes.BOOLEAN,
+        ComponentParameterTypes.INTEGER,
+        ComponentParameterTypes.NUMBER,
+        ComponentParameterTypes.STRING,
+    )
+
+    def _validate_binary_operation(self, other: Any, operator: str) -> None:
+        from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
+
+        if (
+            other is not None
+            and not isinstance(other, self._SUPPORTED_PRIMITIVE_TYPES)
+            and not isinstance(other, (PipelineInput, NodeOutput, PipelineExpression))
+        ):
+            error_message = (
+                f"Operator '{operator}' is not supported with {type(other)}; "
+                "currently only support primitive types (None, bool, int, float and str), "
+                "pipeline input, component output and expression."
+            )
+            raise UserErrorException(message=error_message, no_personal_data_message=error_message)
+
+    def __add__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.ADD)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.ADD)
+
+    def __radd__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.ADD)
+        return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.ADD)
+
+    def __sub__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.SUB)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.SUB)
+
+    def __rsub__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.SUB)
+        return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.SUB)
+
+    def __mul__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.MUL)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.MUL)
+
+    def __rmul__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.MUL)
+        return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.MUL)
+
+    def __truediv__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.DIV)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.DIV)
+
+    def __rtruediv__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.DIV)
+        return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.DIV)
+
+    def __mod__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.MOD)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.MOD)
+
+    def __rmod__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.MOD)
+        return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.MOD)
+
+    def __pow__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.POW)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.POW)
+
+    def __rpow__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.POW)
+        return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.POW)
+
+    def __floordiv__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.FLOORDIV)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.FLOORDIV)
+
+    def __rfloordiv__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.FLOORDIV)
+        return PipelineExpression._from_operation(other, self, PipelineExpressionOperator.FLOORDIV)
+
+    def __lt__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.LT)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.LT)
+
+    def __gt__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.GT)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.GT)
+
+    def __le__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.LTE)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.LTE)
+
+    def __ge__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.GTE)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.GTE)
+
+    # TODO: Bug Item number: 2883354
+    def __eq__(self, other: Any) -> "PipelineExpression":  # type: ignore
+        self._validate_binary_operation(other, PipelineExpressionOperator.EQ)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.EQ)
+
+    # TODO: Bug Item number: 2883354
+    def __ne__(self, other: Any) -> "PipelineExpression":  # type: ignore
+        self._validate_binary_operation(other, PipelineExpressionOperator.NE)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.NE)
+
+    def __and__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.AND)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.AND)
+
+    def __or__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.OR)
+        return PipelineExpression._from_operation(self, other, PipelineExpressionOperator.OR)
+
+    def __xor__(self, other: Any) -> "PipelineExpression":
+        self._validate_binary_operation(other, PipelineExpressionOperator.XOR)
+        return PipelineExpression._from_operation(self, None, PipelineExpressionOperator.XOR)
+
+    def __bool__(self) -> bool:
+        """Python method that is used to implement truth value testing and the built-in operation bool().
+
+        This method is not supported as PipelineExpressionMixin is designed to record operation history,
+        while this method can only return False or True, leading to history breaks here.
+        As overloadable boolean operators PEP (refer to: https://www.python.org/dev/peps/pep-0335/)
+        was rejected, logical operations are also not supported.
+
+        :return: True if not inside dsl pipeline func, raises otherwise
+        :rtype: bool
+        """
+        from azure.ai.ml.dsl._pipeline_component_builder import _is_inside_dsl_pipeline_func
+
+        # note: unexpected bool test always be checking if the object is None;
+        # so for non-pipeline scenarios, directly return True to avoid unexpected breaking,
+        # and for pipeline scenarios, will use is not None to replace bool test.
+        if not _is_inside_dsl_pipeline_func():
+            return True
+
+        error_message = f"Type {type(self)} is not supported for operation bool()."
+        raise UserErrorException(message=error_message, no_personal_data_message=error_message)
+
+
+class PipelineExpression(PipelineExpressionMixin):
+    """Pipeline expression entity.
+
+    Use PipelineExpression to support simple and trivial parameter transformation tasks with constants
+    or other parameters. Operations are recorded in this class during executions, and expected result
+    will be generated for corresponding scenario.
+    """
+
+    _PIPELINE_INPUT_PREFIX = ["parent", "inputs"]
+    _PIPELINE_INPUT_PATTERN = re.compile(pattern=r"parent.inputs.(?P<pipeline_input_name>[^.]+)")
+    _PIPELINE_INPUT_NAME_GROUP = "pipeline_input_name"
+    # AML type to Python type, for generated Python code
+    _TO_PYTHON_TYPE = {
+        ComponentParameterTypes.BOOLEAN: bool.__name__,
+        ComponentParameterTypes.INTEGER: int.__name__,
+        ComponentParameterTypes.NUMBER: float.__name__,
+        ComponentParameterTypes.STRING: str.__name__,
+    }
+
+    _INDENTATION = "    "
+    _IMPORT_MLDESIGNER_LINE = "from mldesigner import command_component, Output"
+    _DECORATOR_LINE = "@command_component(@@decorator_parameters@@)"
+    _COMPONENT_FUNC_NAME = "expression_func"
+    _COMPONENT_FUNC_DECLARATION_LINE = (
+        f"def {_COMPONENT_FUNC_NAME}(@@component_parameters@@)" " -> Output(type=@@return_type@@):"
+    )
+    _PYTHON_CACHE_FOLDER_NAME = "__pycache__"
+
+    def __init__(self, postfix: List[str], inputs: Dict[str, ExpressionInput]):
+        self._postfix = postfix
+        self._inputs = inputs.copy()  # including PiplineInput and Output, extra stored name and type
+        self._result_type: Optional[str] = None
+        self._created_component = None
+
+    @property
+    def expression(self) -> str:
+        """Infix expression string, wrapped with parentheses.
+
+        :return: The infix expression
+        :rtype: str
+        """
+        return self._to_infix()
+
+    def __str__(self) -> str:
+        return self._to_data_binding()
+
+    def _data_binding(self) -> str:
+        return self._to_data_binding()
+
+    def _to_infix(self) -> str:
+        stack = []
+        for token in self._postfix:
+            if token not in _SUPPORTED_OPERATORS:
+                stack.append(token)
+                continue
+            operand2, operand1 = stack.pop(), stack.pop()
+            stack.append(f"({operand1} {token} {operand2})")
+        return stack.pop()
+
+    # pylint: disable=too-many-statements
+    @staticmethod
+    def _handle_operand(
+        operand: "PipelineExpression",
+        postfix: List[str],
+        expression_inputs: Dict[str, ExpressionInput],
+        pipeline_inputs: dict,
+    ) -> Tuple[List[str], Dict[str, ExpressionInput]]:
+        """Handle operand in expression, update postfix expression and expression inputs.
+
+        :param operand: The operand
+        :type operand: "PipelineExpression"
+        :param postfix:
+        :type postfix: List[str]
+        :param expression_inputs: The expression inputs
+        :type expression_inputs: Dict[str, ExpressionInput]
+        :param pipeline_inputs: The pipeline inputs
+        :type pipeline_inputs: dict
+        :return: A 2-tuple of the updated postfix expression and expression inputs
+        :rtype: Tuple[List[str], Dict[str, ExpressionInput]]
+        """
+        from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
+
+        def _update_postfix(_postfix: List[str], _old_name: str, _new_name: str) -> List[str]:
+            return list(map(lambda _x: _new_name if _x == _old_name else _x, _postfix))
+
+        def _get_or_create_input_name(
+            _original_name: str,
+            _operand: Union[PipelineInput, NodeOutput],
+            _expression_inputs: Dict[str, ExpressionInput],
+        ) -> str:
+            """Get or create expression input name as current operand may have appeared in expression.
+
+            :param _original_name: The original name
+            :type _original_name: str
+            :param _operand: The expression operand
+            :type _operand: Union[PipelineInput, NodeOutput]
+            :param _expression_inputs: The expression inputs
+            :type _expression_inputs: Dict[str, ExpressionInput]
+            :return: The input name
+            :rtype: str
+            """
+            _existing_id_to_name = {id(_v.value): _k for _k, _v in _expression_inputs.items()}
+            if id(_operand) in _existing_id_to_name:
+                return _existing_id_to_name[id(_operand)]
+            # use a counter to generate a unique name for current operand
+            _name, _counter = _original_name, 0
+            while _name in _expression_inputs:
+                _name = f"{_original_name}_{_counter}"
+                _counter += 1
+            return _name
+
+        def _handle_pipeline_input(
+            _pipeline_input: PipelineInput,
+            _postfix: List[str],
+            _expression_inputs: Dict[str, ExpressionInput],
+        ) -> Tuple[List[str], dict]:
+            _name = _pipeline_input._port_name
+            # 1. use name with counter for pipeline input; 2. add component's name to component output
+            if _name in _expression_inputs:
+                _seen_input = _expression_inputs[_name]
+                if isinstance(_seen_input.value, PipelineInput):
+                    _name = _get_or_create_input_name(_name, _pipeline_input, _expression_inputs)
+                else:
+                    _expression_inputs.pop(_name)
+                    _new_name = f"{_seen_input.value._owner.component.name}__{_seen_input.value._port_name}"
+                    _postfix = _update_postfix(_postfix, _name, _new_name)
+                    _expression_inputs[_new_name] = ExpressionInput(_new_name, _seen_input.type, _seen_input)
+            _postfix.append(_name)
+
+            param_input = pipeline_inputs
+            for group_name in _pipeline_input._group_names:
+                param_input = param_input[group_name].values
+            _expression_inputs[_name] = ExpressionInput(
+                _name, param_input[_pipeline_input._port_name].type, _pipeline_input
+            )
+            return _postfix, _expression_inputs
+
+        def _handle_component_output(
+            _component_output: NodeOutput,
+            _postfix: List[str],
+            _expression_inputs: Dict[str, ExpressionInput],
+        ) -> Tuple[List[str], dict]:
+            if _component_output._meta is not None and not _component_output._meta._is_primitive_type:
+                error_message = (
+                    f"Component output {_component_output._port_name} in expression must "
+                    f"be a primitive type with value {True!r}, "
+                    f"got {_component_output._meta._is_primitive_type!r}"
+                )
+                raise UserErrorException(message=error_message, no_personal_data_message=error_message)
+            _name = _component_output._port_name
+            _has_prefix = False
+            # "output" is the default output name for command component, add component's name as prefix
+            if _name == "output":
+                if _component_output._owner is not None and not isinstance(_component_output._owner.component, str):
+                    _name = f"{_component_output._owner.component.name}__output"
+                _has_prefix = True
+            # following loop is expected to execute at most twice:
+            #   1. add component's name to output(s)
+            #   2. use name with counter
+            while _name in _expression_inputs:
+                _seen_input = _expression_inputs[_name]
+                if isinstance(_seen_input.value, PipelineInput):
+                    if not _has_prefix:
+                        if _component_output._owner is not None and not isinstance(
+                            _component_output._owner.component, str
+                        ):
+                            _name = f"{_component_output._owner.component.name}__{_component_output._port_name}"
+                        _has_prefix = True
+                        continue
+                    _name = _get_or_create_input_name(_name, _component_output, _expression_inputs)
+                else:
+                    if not _has_prefix:
+                        _expression_inputs.pop(_name)
+                        _new_name = f"{_seen_input.value._owner.component.name}__{_seen_input.value._port_name}"
+                        _postfix = _update_postfix(_postfix, _name, _new_name)
+                        _expression_inputs[_new_name] = ExpressionInput(_new_name, _seen_input.type, _seen_input)
+                        if _component_output._owner is not None and not isinstance(
+                            _component_output._owner.component, str
+                        ):
+                            _name = f"{_component_output._owner.component.name}__{_component_output._port_name}"
+                        _has_prefix = True
+                    _name = _get_or_create_input_name(_name, _component_output, _expression_inputs)
+            _postfix.append(_name)
+            _expression_inputs[_name] = ExpressionInput(_name, _component_output.type, _component_output)
+            return _postfix, _expression_inputs
+
+        if operand is None or isinstance(operand, PipelineExpression._SUPPORTED_PRIMITIVE_TYPES):
+            postfix.append(repr(operand))
+        elif isinstance(operand, PipelineInput):
+            postfix, expression_inputs = _handle_pipeline_input(operand, postfix, expression_inputs)
+        elif isinstance(operand, NodeOutput):
+            postfix, expression_inputs = _handle_component_output(operand, postfix, expression_inputs)
+        elif isinstance(operand, PipelineExpression):
+            postfix.extend(operand._postfix.copy())
+            expression_inputs.update(operand._inputs.copy())
+        return postfix, expression_inputs
+
+    @staticmethod
+    def _from_operation(operand1: Any, operand2: Any, operator: str) -> "PipelineExpression":
+        if operator not in _SUPPORTED_OPERATORS:
+            error_message = (
+                f"Operator '{operator}' is not supported operator, "
+                f"currently supported operators are {','.join(_SUPPORTED_OPERATORS)}."
+            )
+            raise UserErrorException(message=error_message, no_personal_data_message=error_message)
+
+        # get all pipeline input types from builder stack
+        # TODO: check if there is pipeline input we cannot know its type (missing in `PipelineComponentBuilder.inputs`)?
+        from azure.ai.ml.dsl._pipeline_component_builder import _definition_builder_stack
+
+        res = _definition_builder_stack.top()
+        pipeline_inputs = res.inputs if res is not None else {}
+        postfix: List[str] = []
+        inputs: Dict[str, ExpressionInput] = {}
+        postfix, inputs = PipelineExpression._handle_operand(operand1, postfix, inputs, pipeline_inputs)
+        postfix, inputs = PipelineExpression._handle_operand(operand2, postfix, inputs, pipeline_inputs)
+        postfix.append(operator)
+        return PipelineExpression(postfix, inputs)
+
+    @property
+    def _string_concatenation(self) -> bool:
+        """If all operands are string and operations are addition, it is a string concatenation expression.
+
+        :return: Whether this represents string concatenation
+        :rtype: bool
+        """
+        for token in self._postfix:
+            # operator can only be "+" for string concatenation
+            if token in _SUPPORTED_OPERATORS:
+                if token != PipelineExpressionOperator.ADD:
+                    return False
+                continue
+            # constant and PiplineInput should be type string
+            if token in self._inputs:
+                if self._inputs[token].type != ComponentParameterTypes.STRING:
+                    return False
+            else:
+                if not isinstance(eval(token), str):  # pylint: disable=eval-used # nosec
+                    return False
+        return True
+
+    def _to_data_binding(self) -> str:
+        """Convert operands to data binding and concatenate them in the order of postfix expression.
+
+        :return: The data binding
+        :rtype: str
+        """
+        if not self._string_concatenation:
+            error_message = (
+                "Only string concatenation expression is supported to convert to data binding, "
+                f"current expression is '{self.expression}'."
+            )
+            raise UserErrorException(message=error_message, no_personal_data_message=error_message)
+
+        stack = []
+        for token in self._postfix:
+            if token != PipelineExpressionOperator.ADD:
+                if token in self._inputs:
+                    stack.append(self._inputs[token].value._data_binding())
+                else:
+                    stack.append(eval(token))  # pylint: disable=eval-used # nosec
+                continue
+            operand2, operand1 = stack.pop(), stack.pop()
+            stack.append(operand1 + operand2)
+        res: str = stack.pop()
+        return res
+
+    def resolve(self) -> Union[str, "BaseNode"]:
+        """Resolve expression to data binding or component, depend on the operations.
+
+        :return: The data binding string or the component
+        :rtype: Union[str, BaseNode]
+        """
+        if self._string_concatenation:
+            return self._to_data_binding()
+        return cast(Union[str, "BaseNode"], self._create_component())
+
+    @staticmethod
+    def parse_pipeline_inputs_from_data_binding(data_binding: str) -> List[str]:
+        """Parse all PipelineInputs name from data binding expression.
+
+        :param data_binding: Data binding expression
+        :type data_binding: str
+        :return: List of PipelineInput's name from given data binding expression
+        :rtype: List[str]
+        """
+        pipeline_input_names = []
+        for single_data_binding in get_all_data_binding_expressions(
+            value=data_binding,
+            binding_prefix=PipelineExpression._PIPELINE_INPUT_PREFIX,
+            is_singular=False,
+        ):
+            m = PipelineExpression._PIPELINE_INPUT_PATTERN.match(single_data_binding)
+            # `get_all_data_binding_expressions` should work as pre-filter, so no need to concern `m` is None
+            if m is not None:
+                pipeline_input_names.append(m.group(PipelineExpression._PIPELINE_INPUT_NAME_GROUP))
+        return pipeline_input_names
+
+    @staticmethod
+    def _get_operation_result_type(type1: str, operator: str, type2: str) -> str:
+        def _validate_operand_type(_type: str) -> None:
+            if _type != NONE_PARAMETER_TYPE and _type not in PipelineExpression._SUPPORTED_PIPELINE_INPUT_TYPES:
+                error_message = (
+                    f"Pipeline input type {_type!r} is not supported in expression; "
+                    f"currently only support None, "
+                    + ", ".join(PipelineExpression._SUPPORTED_PIPELINE_INPUT_TYPES)
+                    + "."
+                )
+                raise UserErrorException(message=error_message, no_personal_data_message=error_message)
+
+        _validate_operand_type(type1)
+        _validate_operand_type(type2)
+        operation = f"{type1} {operator} {type2}"
+        lookup_value = _OPERATION_RESULT_TYPE_LOOKUP.get(operation)
+        if isinstance(lookup_value, str):
+            return lookup_value  # valid operation, return result type
+        _user_exception: UserErrorException = lookup_value
+        raise _user_exception  # invalid operation, raise UserErrorException
+
+    def _get_operand_type(self, operand: str) -> str:
+        if operand in self._inputs:
+            res: str = self._inputs[operand].type
+            return res
+        primitive_type = type(eval(operand))  # pylint: disable=eval-used # nosec
+        res_type: str = IOConstants.PRIMITIVE_TYPE_2_STR.get(primitive_type, NONE_PARAMETER_TYPE)
+        return res_type
+
+    @property
+    def _component_code(self) -> str:
+        def _generate_function_code_lines() -> Tuple[List[str], str]:
+            """Return lines of code and return type.
+
+            :return: A 2-tuple of (function body, return type name)
+            :rtype: Tuple[List[str], str]
+            """
+            _inter_id, _code, _stack = 0, [], []
+            _line_recorder: Dict = {}
+            for _token in self._postfix:
+                if _token not in _SUPPORTED_OPERATORS:
+                    _type = self._get_operand_type(_token)
+                    _stack.append((_token, _type))
+                    continue
+                _operand2, _type2 = _stack.pop()
+                _operand1, _type1 = _stack.pop()
+                _current_line = f"{_operand1} {_token} {_operand2}"
+                if _current_line in _line_recorder:
+                    _inter_var, _inter_var_type = _line_recorder[_current_line]
+                else:
+                    _inter_var = f"inter_var_{_inter_id}"
+                    _inter_id += 1
+                    _inter_var_type = self._get_operation_result_type(_type1, _token, _type2)
+                    _code.append(f"{self._INDENTATION}{_inter_var} = {_current_line}")
+                    _line_recorder[_current_line] = (_inter_var, _inter_var_type)
+                _stack.append((_inter_var, _inter_var_type))
+            _return_var, _result_type = _stack.pop()
+            _code.append(f"{self._INDENTATION}return {_return_var}")
+            return _code, _result_type
+
+        def _generate_function_decorator_and_declaration_lines(_return_type: str) -> List[str]:
+            # decorator parameters
+            _display_name = f'{self._INDENTATION}display_name="Expression: {self.expression}",'
+            _decorator_parameters = "\n" + "\n".join([_display_name]) + "\n"
+            # component parameters
+            _component_parameters = []
+            for _name in sorted(self._inputs):
+                _type = self._TO_PYTHON_TYPE[self._inputs[_name].type]
+                _component_parameters.append(f"{_name}: {_type}")
+            _component_parameters_str = (
+                "\n"
+                + "\n".join(
+                    [f"{self._INDENTATION}{_component_parameter}," for _component_parameter in _component_parameters]
+                )
+                + "\n"
+            )
+            return [
+                self._IMPORT_MLDESIGNER_LINE + "\n\n",
+                self._DECORATOR_LINE.replace("@@decorator_parameters@@", _decorator_parameters),
+                self._COMPONENT_FUNC_DECLARATION_LINE.replace(
+                    "@@component_parameters@@", _component_parameters_str
+                ).replace("@@return_type@@", f'"{_return_type}"'),
+            ]
+
+        lines, result_type = _generate_function_code_lines()
+        self._result_type = result_type
+        code = _generate_function_decorator_and_declaration_lines(result_type) + lines
+        return "\n".join(code) + "\n"
+
+    def _create_component(self) -> Any:
+        def _generate_python_file(_folder: Path) -> None:
+            _folder.mkdir()
+            with open(_folder / "expression_component.py", "w", encoding=DefaultOpenEncoding.WRITE) as _f:
+                _f.write(self._component_code)
+
+        def _generate_yaml_file(_path: Path) -> None:
+            _data_folder = Path(__file__).parent / "data"
+            # update YAML content from template and dump
+            with open(_data_folder / "expression_component_template.yml", "r", encoding=DefaultOpenEncoding.READ) as _f:
+                _data = load_yaml(_f)
+            _data["display_name"] = f"Expression: {self.expression}"
+            _data["inputs"] = {}
+            _data["outputs"]["output"]["type"] = self._result_type
+            _command_inputs_items = []
+            for _name in sorted(self._inputs):
+                _type = self._inputs[_name].type
+                _data["inputs"][_name] = {"type": _type}
+                _command_inputs_items.append(_name + '="${{inputs.' + _name + '}}"')
+            _command_inputs_string = " ".join(_command_inputs_items)
+            _command_output_string = 'output="${{outputs.output}}"'
+            _command = (
+                "mldesigner execute --source expression_component.py --name expression_func"
+                " --inputs " + _command_inputs_string + " --outputs " + _command_output_string
+            )
+            _data["command"] = _data["command"].format(command_placeholder=_command)
+            dump_yaml_to_file(_path, _data)
+
+        if self._created_component is None:
+            tmp_folder = Path(tempfile.mkdtemp())
+            code_folder = tmp_folder / "src"
+            yaml_path = tmp_folder / "component_spec.yml"
+            _generate_python_file(code_folder)
+            _generate_yaml_file(yaml_path)
+
+            from azure.ai.ml import load_component
+
+            component_func = load_component(yaml_path)
+            component_kwargs = {k: v.value for k, v in self._inputs.items()}
+            self._created_component = component_func(**component_kwargs)
+            if self._created_component is not None:
+                self._created_component.environment_variables = {AZUREML_PRIVATE_FEATURES_ENV_VAR: "true"}
+        return self._created_component
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py
new file mode 100644
index 00000000..3a7d89e7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py
@@ -0,0 +1,182 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import re
+from typing import Dict, List, Tuple, Type, Union
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import InputDeliveryMode
+from azure.ai.ml._restclient.v2023_04_01_preview.models import JobInput as RestJobInput
+from azure.ai.ml._restclient.v2023_04_01_preview.models import JobOutput as RestJobOutput
+from azure.ai.ml._restclient.v2023_04_01_preview.models import Mpi, PyTorch, Ray, TensorFlow
+from azure.ai.ml.constants._component import ComponentJobConstants
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job._input_output_helpers import (
+    INPUT_MOUNT_MAPPING_FROM_REST,
+    INPUT_MOUNT_MAPPING_TO_REST,
+    OUTPUT_MOUNT_MAPPING_FROM_REST,
+    OUTPUT_MOUNT_MAPPING_TO_REST,
+)
+from azure.ai.ml.entities._util import normalize_job_input_output_type
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+
+def process_sdk_component_job_io(
+    io: Dict,
+    io_binding_regex_list: List[str],
+) -> Tuple:
+    """Separates SDK ComponentJob inputs that are data bindings (i.e. string inputs prefixed with 'inputs.' or
+    'outputs.') and dataset and literal inputs/outputs.
+
+    :param io: Input or output dictionary of an SDK ComponentJob
+    :type io:  Dict[str, Union[str, float, bool, Input]]
+    :param io_binding_regex_list: A list of regexes for io bindings
+    :type io_binding_regex_list: List[str]
+    :return: A tuple of dictionaries:
+      * One mapping inputs to REST formatted ComponentJobInput/ComponentJobOutput for data binding io.
+      * The other dictionary contains any IO that is not a databinding that is yet to be turned into REST form
+    :rtype: Tuple[Dict[str, str], Dict[str, Union[str, float, bool, Input]]]
+    """
+    io_bindings: Dict = {}
+    dataset_literal_io: Dict = {}
+    legacy_io_binding_regex_list = [
+        ComponentJobConstants.LEGACY_INPUT_PATTERN,
+        ComponentJobConstants.LEGACY_OUTPUT_PATTERN,
+    ]
+    for io_name, io_value in io.items():
+        if isinstance(io_value, (Input, Output)) and isinstance(io_value.path, str):
+            mode = io_value.mode
+            path = io_value.path
+            name = io_value.name if hasattr(io_value, "name") else None
+            version = io_value.version if hasattr(io_value, "version") else None
+            if any(re.match(item, path) for item in io_binding_regex_list):
+                # Yaml syntax requires using ${{}} to enclose inputs and outputs bindings
+                # io_bindings[io_name] = io_value
+                io_bindings.update({io_name: {"value": path}})
+                # add mode to literal value for binding input
+                if mode:
+                    if isinstance(io_value, Input):
+                        io_bindings[io_name].update({"mode": INPUT_MOUNT_MAPPING_TO_REST[mode]})
+                    else:
+                        io_bindings[io_name].update({"mode": OUTPUT_MOUNT_MAPPING_TO_REST[mode]})
+                if name or version:
+                    assert isinstance(io_value, Output)
+                    if name:
+                        io_bindings[io_name].update({"name": name})
+                    if version:
+                        io_bindings[io_name].update({"version": version})
+                if isinstance(io_value, Output) and io_value.name is not None:
+                    # when the output should be registered,
+                    # we add io_value to dataset_literal_io for further to_rest_data_outputs
+                    dataset_literal_io[io_name] = io_value
+            elif any(re.match(item, path) for item in legacy_io_binding_regex_list):
+                new_format = path.replace("{{", "{{parent.")
+                msg = "{} has changed to {}, please change to use new format."
+                raise ValidationException(
+                    message=msg.format(path, new_format),
+                    no_personal_data_message=msg.format("[io_value]", "[io_value_new_format]"),
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+            else:
+                dataset_literal_io[io_name] = io_value
+        else:
+            # Collect non-input data inputs
+            dataset_literal_io[io_name] = io_value
+    return io_bindings, dataset_literal_io
+
+
+def from_dict_to_rest_io(
+    io: Dict[str, Union[str, dict]],
+    rest_object_class: Union[Type[RestJobInput], Type[RestJobOutput]],
+    io_binding_regex_list: List[str],
+) -> Tuple[Dict[str, str], Dict[str, Union[RestJobInput, RestJobOutput]]]:
+    """Translate rest JObject dictionary to rest inputs/outputs and bindings.
+
+    :param io: Input or output dictionary.
+    :type io: Dict[str, Union[str, dict]]
+    :param rest_object_class: RestJobInput or RestJobOutput
+    :type rest_object_class: Union[Type[RestJobInput], Type[RestJobOutput]]
+    :param io_binding_regex_list: A list of regexes for io bindings
+    :type io_binding_regex_list: List[str]
+    :return: Map from IO name to IO bindings and Map from IO name to IO objects.
+    :rtype: Tuple[Dict[str, str], Dict[str, Union[RestJobInput, RestJobOutput]]]
+    """
+    io_bindings: dict = {}
+    rest_io_objects = {}
+    DIRTY_MODE_MAPPING = {
+        "Mount": InputDeliveryMode.READ_ONLY_MOUNT,
+        "RoMount": InputDeliveryMode.READ_ONLY_MOUNT,
+        "RwMount": InputDeliveryMode.READ_WRITE_MOUNT,
+    }
+    for key, val in io.items():
+        if isinstance(val, dict):
+            # convert the input of camel to snake to be compatible with the Jun api
+            # todo: backend help convert node level input/output type
+            normalize_job_input_output_type(val)
+
+            # Add casting as sometimes we got value like 1(int)
+            io_value = str(val.get("value", ""))
+            io_mode = val.get("mode", None)
+            io_name = val.get("name", None)
+            io_version = val.get("version", None)
+            if any(re.match(item, io_value) for item in io_binding_regex_list):
+                io_bindings.update({key: {"path": io_value}})
+                # add mode to literal value for binding input
+                if io_mode:
+                    # deal with dirty mode data submitted before
+                    if io_mode in DIRTY_MODE_MAPPING:
+                        io_mode = DIRTY_MODE_MAPPING[io_mode]
+                        val["mode"] = io_mode
+                    if io_mode in OUTPUT_MOUNT_MAPPING_FROM_REST:
+                        io_bindings[key].update({"mode": OUTPUT_MOUNT_MAPPING_FROM_REST[io_mode]})
+                    else:
+                        io_bindings[key].update({"mode": INPUT_MOUNT_MAPPING_FROM_REST[io_mode]})
+                # add name and version for binding input
+                if io_name or io_version:
+                    assert rest_object_class.__name__ == "JobOutput"
+                    # current code only support dump name and version for JobOutput
+                    # this assert can be deleted if we need to dump name/version for JobInput
+                    if io_name:
+                        io_bindings[key].update({"name": io_name})
+                    if io_version:
+                        io_bindings[key].update({"version": io_version})
+                if not io_mode and not io_name and not io_version:
+                    io_bindings[key] = io_value
+            else:
+                if rest_object_class.__name__ == "JobOutput":
+                    # current code only support dump name and version for JobOutput
+                    # this condition can be deleted if we need to dump name/version for JobInput
+                    if "name" in val.keys():
+                        val["asset_name"] = val.pop("name")
+                    if "version" in val.keys():
+                        val["asset_version"] = val.pop("version")
+                rest_obj = rest_object_class.from_dict(val)
+                rest_io_objects[key] = rest_obj
+        else:
+            msg = "Got unsupported type of input/output: {}:" + f"{type(val)}"
+            raise ValidationException(
+                message=msg.format(val),
+                no_personal_data_message=msg.format("[val]"),
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+    return io_bindings, rest_io_objects
+
+
+def from_dict_to_rest_distribution(distribution_dict: Dict) -> Union[PyTorch, Mpi, TensorFlow, Ray]:
+    target_type = distribution_dict["distribution_type"].lower()
+    if target_type == "pytorch":
+        return PyTorch(**distribution_dict)
+    if target_type == "mpi":
+        return Mpi(**distribution_dict)
+    if target_type == "tensorflow":
+        return TensorFlow(**distribution_dict)
+    if target_type == "ray":
+        return Ray(**distribution_dict)
+    msg = "Distribution type must be pytorch, mpi, tensorflow or ray: {}".format(target_type)
+    raise ValidationException(
+        message=msg,
+        no_personal_data_message=msg,
+        target=ErrorTarget.PIPELINE,
+        error_category=ErrorCategory.USER_ERROR,
+    )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/data/expression_component_template.yml b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/data/expression_component_template.yml
new file mode 100644
index 00000000..10d391aa
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/data/expression_component_template.yml
@@ -0,0 +1,16 @@
+$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
+type: command
+
+name: expression_component
+version: 1
+
+outputs:
+  output:
+    is_control: true
+
+code: ./src
+
+environment: azureml://registries/azureml/environments/mldesigner/labels/latest
+
+command: >-
+  {command_placeholder}
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job.py
new file mode 100644
index 00000000..7ddbbc46
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job.py
@@ -0,0 +1,711 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+import itertools
+import logging
+import typing
+from functools import partial
+from pathlib import Path
+from typing import Any, Dict, Generator, List, Optional, Union, cast
+
+from typing_extensions import Literal
+
+from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase
+from azure.ai.ml._restclient.v2024_01_01_preview.models import PipelineJob as RestPipelineJob
+from azure.ai.ml._schema import PathAwareSchema
+from azure.ai.ml._schema.pipeline.pipeline_job import PipelineJobSchema
+from azure.ai.ml._utils._arm_id_utils import get_resource_name_from_arm_id_safe
+from azure.ai.ml._utils.utils import (
+    camel_to_snake,
+    is_data_binding_expression,
+    is_private_preview_enabled,
+    transform_dict_keys,
+)
+from azure.ai.ml.constants import JobType
+from azure.ai.ml.constants._common import AZUREML_PRIVATE_FEATURES_ENV_VAR, BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.constants._component import ComponentSource
+from azure.ai.ml.constants._job.pipeline import ValidationErrorCode
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.condition_node import ConditionNode
+from azure.ai.ml.entities._builders.control_flow_node import LoopNode
+from azure.ai.ml.entities._builders.import_node import Import
+from azure.ai.ml.entities._builders.parallel import Parallel
+from azure.ai.ml.entities._builders.pipeline import Pipeline
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._component.pipeline_component import PipelineComponent
+
+# from azure.ai.ml.entities._job.identity import AmlToken, Identity, ManagedIdentity, UserIdentity
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+    _BaseJobIdentityConfiguration,
+)
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._inputs_outputs.group_input import GroupInput
+from azure.ai.ml.entities._job._input_output_helpers import (
+    from_rest_data_outputs,
+    from_rest_inputs_to_dataset_literal,
+    to_rest_data_outputs,
+    to_rest_dataset_literal_inputs,
+)
+from azure.ai.ml.entities._job.import_job import ImportJob
+from azure.ai.ml.entities._job.job import Job
+from azure.ai.ml.entities._job.job_service import JobServiceBase
+from azure.ai.ml.entities._job.pipeline._io import PipelineInput, PipelineJobIOMixin
+from azure.ai.ml.entities._job.pipeline.pipeline_job_settings import PipelineJobSettings
+from azure.ai.ml.entities._mixins import YamlTranslatableMixin
+from azure.ai.ml.entities._system_data import SystemData
+from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin
+from azure.ai.ml.exceptions import ErrorTarget, UserErrorException, ValidationException
+
+module_logger = logging.getLogger(__name__)
+
+
+class PipelineJob(Job, YamlTranslatableMixin, PipelineJobIOMixin, PathAwareSchemaValidatableMixin):
+    """Pipeline job.
+
+    You should not instantiate this class directly. Instead, you should
+    use the `@pipeline` decorator to create a `PipelineJob`.
+
+    :param component: Pipeline component version. The field is mutually exclusive with 'jobs'.
+    :type component: Union[str, ~azure.ai.ml.entities._component.pipeline_component.PipelineComponent]
+    :param inputs: Inputs to the pipeline job.
+    :type inputs: dict[str, Union[~azure.ai.ml.entities.Input, str, bool, int, float]]
+    :param outputs: Outputs of the pipeline job.
+    :type outputs: dict[str, ~azure.ai.ml.entities.Output]
+    :param name: Name of the PipelineJob. Defaults to None.
+    :type name: str
+    :param description: Description of the pipeline job. Defaults to None
+    :type description: str
+    :param display_name: Display name of the pipeline job. Defaults to None
+    :type display_name: str
+    :param experiment_name: Name of the experiment the job will be created under.
+        If None is provided, the experiment will be set to the current directory. Defaults to None
+    :type experiment_name: str
+    :param jobs: Pipeline component node name to component object. Defaults to None
+    :type jobs: dict[str, ~azure.ai.ml.entities._builders.BaseNode]
+    :param settings: Setting of the pipeline job. Defaults to None
+    :type settings: ~azure.ai.ml.entities.PipelineJobSettings
+    :param identity: Identity that the training job will use while running on compute. Defaults to None
+    :type identity: Union[
+        ~azure.ai.ml.entities._credentials.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities._credentials.AmlTokenConfiguration,
+        ~azure.ai.ml.entities._credentials.UserIdentityConfiguration
+
+    ]
+    :param compute: Compute target name of the built pipeline. Defaults to None
+    :type compute: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None
+    :type tags: dict[str, str]
+    :param kwargs: A dictionary of additional configuration parameters. Defaults to None
+    :type kwargs: dict
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_pipeline_job_configurations.py
+            :start-after: [START configure_pipeline_job_and_settings]
+            :end-before: [END configure_pipeline_job_and_settings]
+            :language: python
+            :dedent: 8
+            :caption: Shows how to create a pipeline using this class.
+    """
+
+    def __init__(
+        self,
+        *,
+        component: Optional[Union[str, PipelineComponent, Component]] = None,
+        inputs: Optional[Dict[str, Union[Input, str, bool, int, float]]] = None,
+        outputs: Optional[Dict[str, Output]] = None,
+        name: Optional[str] = None,
+        description: Optional[str] = None,
+        display_name: Optional[str] = None,
+        experiment_name: Optional[str] = None,
+        jobs: Optional[Dict[str, BaseNode]] = None,
+        settings: Optional[PipelineJobSettings] = None,
+        identity: Optional[
+            Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
+        ] = None,
+        compute: Optional[str] = None,
+        tags: Optional[Dict[str, str]] = None,
+        **kwargs: Any,
+    ) -> None:
+        # initialize io
+        inputs, outputs = inputs or {}, outputs or {}
+        if isinstance(component, PipelineComponent) and component._source in [
+            ComponentSource.DSL,
+            ComponentSource.YAML_COMPONENT,
+        ]:
+            self._inputs = self._build_inputs_dict(inputs, input_definition_dict=component.inputs)
+            # for pipeline component created pipeline jobs,
+            # it's output should have same value with the component outputs,
+            # then override it with given outputs (filter out None value)
+            pipeline_outputs = {k: v for k, v in (outputs or {}).items() if v}
+            self._outputs = self._build_pipeline_outputs_dict({**component.outputs, **pipeline_outputs})
+        else:
+            # Build inputs/outputs dict without meta when definition not available
+            self._inputs = self._build_inputs_dict(inputs)
+            # for node created pipeline jobs,
+            # it's output should have same value with the given outputs
+            self._outputs = self._build_pipeline_outputs_dict(outputs=outputs)
+        source = kwargs.pop("_source", ComponentSource.CLASS)
+        if component is None:
+            component = PipelineComponent(
+                jobs=jobs,
+                description=description,
+                display_name=display_name,
+                base_path=kwargs.get(BASE_PATH_CONTEXT_KEY),
+                _source=source,
+            )
+
+        # If component is Pipeline component, jobs will be component.jobs
+        self._jobs = (jobs or {}) if isinstance(component, str) else {}
+
+        self.component: Union[PipelineComponent, str] = cast(Union[PipelineComponent, str], component)
+        if "type" not in kwargs:
+            kwargs["type"] = JobType.PIPELINE
+        if isinstance(component, PipelineComponent):
+            description = component.description if description is None else description
+            display_name = component.display_name if display_name is None else display_name
+        super(PipelineJob, self).__init__(
+            name=name,
+            description=description,
+            tags=tags,
+            display_name=display_name,
+            experiment_name=experiment_name,
+            compute=compute,
+            **kwargs,
+        )
+
+        self._remove_pipeline_input()
+        self.compute = compute
+        self._settings: Any = None
+        self.settings = settings
+        self.identity = identity
+        # TODO: remove default code & environment?
+        self._default_code = None
+        self._default_environment = None
+
+    @property
+    def inputs(self) -> Dict:
+        """Inputs of the pipeline job.
+
+        :return: Inputs of the pipeline job.
+        :rtype: dict[str, Union[~azure.ai.ml.entities.Input, str, bool, int, float]]
+        """
+        return self._inputs
+
+    @property
+    def outputs(self) -> Dict[str, Union[str, Output]]:
+        """Outputs of the pipeline job.
+
+        :return: Outputs of the pipeline job.
+        :rtype: dict[str, Union[str, ~azure.ai.ml.entities.Output]]
+        """
+        return self._outputs
+
+    @property
+    def jobs(self) -> Dict:
+        """Return jobs of pipeline job.
+
+        :return: Jobs of pipeline job.
+        :rtype: dict
+        """
+        res: dict = self.component.jobs if isinstance(self.component, PipelineComponent) else self._jobs
+        return res
+
+    @property
+    def settings(self) -> Optional[PipelineJobSettings]:
+        """Settings of the pipeline job.
+
+        :return: Settings of the pipeline job.
+        :rtype: ~azure.ai.ml.entities.PipelineJobSettings
+        """
+        if self._settings is None:
+            self._settings = PipelineJobSettings()
+        res: Optional[PipelineJobSettings] = self._settings
+        return res
+
+    @settings.setter
+    def settings(self, value: Union[Dict, PipelineJobSettings]) -> None:
+        """Set the pipeline job settings.
+
+        :param value: The pipeline job settings.
+        :type value: Union[dict, ~azure.ai.ml.entities.PipelineJobSettings]
+        """
+        if value is not None:
+            if isinstance(value, PipelineJobSettings):
+                # since PipelineJobSettings inherit _AttrDict, we need add this branch to distinguish with dict
+                pass
+            elif isinstance(value, dict):
+                value = PipelineJobSettings(**value)
+            else:
+                raise TypeError("settings must be PipelineJobSettings or dict but got {}".format(type(value)))
+        self._settings = value
+
+    @classmethod
+    def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
+        return ValidationException(
+            message=message,
+            no_personal_data_message=no_personal_data_message,
+            target=ErrorTarget.PIPELINE,
+        )
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
+        # import this to ensure that nodes are registered before schema is created.
+
+        return PipelineJobSchema(context=context)
+
+    @classmethod
+    def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]:
+        # jobs validations are done in _customized_validate()
+        return ["component", "jobs"]
+
+    @property
+    def _skip_required_compute_missing_validation(self) -> Literal[True]:
+        return True
+
+    def _validate_compute_is_set(self) -> MutableValidationResult:
+        validation_result = self._create_empty_validation_result()
+        if self.compute is not None:
+            return validation_result
+        if self.settings is not None and self.settings.default_compute is not None:
+            return validation_result
+
+        if not isinstance(self.component, str):
+            validation_result.merge_with(self.component._validate_compute_is_set())
+        return validation_result
+
+    def _customized_validate(self) -> MutableValidationResult:
+        """Validate that all provided inputs and parameters are valid for current pipeline and components in it.
+
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        validation_result = super(PipelineJob, self)._customized_validate()
+
+        if isinstance(self.component, PipelineComponent):
+            # Merge with pipeline component validate result for structure validation.
+            # Skip top level parameter missing type error
+            validation_result.merge_with(
+                self.component._customized_validate(),
+                condition_skip=lambda x: x.error_code == ValidationErrorCode.PARAMETER_TYPE_UNKNOWN
+                and x.yaml_path.startswith("inputs"),
+            )
+            # Validate compute
+            validation_result.merge_with(self._validate_compute_is_set())
+        # Validate Input
+        validation_result.merge_with(self._validate_input())
+        # Validate initialization & finalization jobs
+        validation_result.merge_with(self._validate_init_finalize_job())
+
+        return validation_result
+
+    def _validate_input(self) -> MutableValidationResult:
+        validation_result = self._create_empty_validation_result()
+        if not isinstance(self.component, str):
+            # TODO(1979547): refine this logic: not all nodes have `_get_input_binding_dict` method
+            used_pipeline_inputs = set(
+                itertools.chain(
+                    *[
+                        self.component._get_input_binding_dict(node if not isinstance(node, LoopNode) else node.body)[0]
+                        for node in self.jobs.values()
+                        if not isinstance(node, ConditionNode)
+                        # condition node has no inputs
+                    ]
+                )
+            )
+        # validate inputs
+        if not isinstance(self.component, Component):
+            return validation_result
+        for key, meta in self.component.inputs.items():
+            if key not in used_pipeline_inputs:  # pylint: disable=possibly-used-before-assignment
+                # Only validate inputs certainly used.
+                continue
+            # raise error when required input with no default value not set
+            if (
+                self.inputs.get(key, None) is None  # input not provided
+                and meta.optional is not True  # and it's required
+                and meta.default is None  # and it does not have default
+            ):
+                name = self.name or self.display_name
+                name = f"{name!r} " if name else ""
+                validation_result.append_error(
+                    yaml_path=f"inputs.{key}",
+                    message=f"Required input {key!r} for pipeline {name}not provided.",
+                )
+        return validation_result
+
+    def _validate_init_finalize_job(self) -> MutableValidationResult:  # pylint: disable=too-many-statements
+        from azure.ai.ml.entities._job.pipeline._io import InputOutputBase, _GroupAttrDict
+
+        validation_result = self._create_empty_validation_result()
+        # subgraph (PipelineComponent) should not have on_init/on_finalize set
+        for job_name, job in self.jobs.items():
+            if job.type != "pipeline":
+                continue
+            if job.settings.on_init:
+                validation_result.append_error(
+                    yaml_path=f"jobs.{job_name}.settings.on_init",
+                    message="On_init is not supported for pipeline component.",
+                )
+            if job.settings.on_finalize:
+                validation_result.append_error(
+                    yaml_path=f"jobs.{job_name}.settings.on_finalize",
+                    message="On_finalize is not supported for pipeline component.",
+                )
+
+        on_init = None
+        on_finalize = None
+
+        if self.settings is not None:
+            # quick return if neither on_init nor on_finalize is set
+            if self.settings.on_init is None and self.settings.on_finalize is None:
+                return validation_result
+
+            on_init, on_finalize = self.settings.on_init, self.settings.on_finalize
+
+        append_on_init_error = partial(validation_result.append_error, "settings.on_init")
+        append_on_finalize_error = partial(validation_result.append_error, "settings.on_finalize")
+        # on_init and on_finalize cannot be same
+        if on_init == on_finalize:
+            append_on_init_error(f"Invalid on_init job {on_init}, it should be different from on_finalize.")
+            append_on_finalize_error(f"Invalid on_finalize job {on_finalize}, it should be different from on_init.")
+        # pipeline should have at least one normal node
+        if len(set(self.jobs.keys()) - {on_init, on_finalize}) == 0:
+            validation_result.append_error(yaml_path="jobs", message="No other job except for on_init/on_finalize job.")
+
+        def _is_control_flow_node(_validate_job_name: str) -> bool:
+            from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
+
+            _validate_job = self.jobs[_validate_job_name]
+            return issubclass(type(_validate_job), ControlFlowNode)
+
+        def _is_isolated_job(_validate_job_name: str) -> bool:
+            def _try_get_data_bindings(
+                _name: str, _input_output_data: Union["_GroupAttrDict", "InputOutputBase"]
+            ) -> Optional[List]:
+                """Try to get data bindings from input/output data, return None if not found.
+                :param _name: The name to use when flattening GroupAttrDict
+                :type _name: str
+                :param _input_output_data: The input/output data
+                :type _input_output_data: Union[_GroupAttrDict, str, InputOutputBase]
+                :return: A list of data bindings, or None if not found
+                :rtype: Optional[List[str]]
+                """
+                # handle group input
+                if GroupInput._is_group_attr_dict(_input_output_data):
+                    _new_input_output_data: _GroupAttrDict = cast(_GroupAttrDict, _input_output_data)
+                    # flatten to avoid nested cases
+                    flattened_values: List[Input] = list(_new_input_output_data.flatten(_name).values())
+                    # handle invalid empty group
+                    if len(flattened_values) == 0:
+                        return None
+                    return [_value.path for _value in flattened_values]
+                _input_output_data = _input_output_data._data
+                if isinstance(_input_output_data, str):
+                    return [_input_output_data]
+                if not hasattr(_input_output_data, "_data_binding"):
+                    return None
+                return [_input_output_data._data_binding()]
+
+            _validate_job = self.jobs[_validate_job_name]
+            # no input to validate job
+            for _input_name in _validate_job.inputs:
+                _data_bindings = _try_get_data_bindings(_input_name, _validate_job.inputs[_input_name])
+                if _data_bindings is None:
+                    continue
+                for _data_binding in _data_bindings:
+                    if is_data_binding_expression(_data_binding, ["parent", "jobs"]):
+                        return False
+            # no output from validate job - iterate other jobs input(s) to validate
+            for _job_name, _job in self.jobs.items():
+                # exclude control flow node as it does not have inputs
+                if _is_control_flow_node(_job_name):
+                    continue
+                for _input_name in _job.inputs:
+                    _data_bindings = _try_get_data_bindings(_input_name, _job.inputs[_input_name])
+                    if _data_bindings is None:
+                        continue
+                    for _data_binding in _data_bindings:
+                        if is_data_binding_expression(_data_binding, ["parent", "jobs", _validate_job_name]):
+                            return False
+            return True
+
+        # validate on_init
+        if on_init is not None:
+            if on_init not in self.jobs:
+                append_on_init_error(f"On_init job name {on_init} not exists in jobs.")
+            else:
+                if _is_control_flow_node(on_init):
+                    append_on_init_error("On_init job should not be a control flow node.")
+                elif not _is_isolated_job(on_init):
+                    append_on_init_error("On_init job should not have connection to other execution node.")
+        # validate on_finalize
+        if on_finalize is not None:
+            if on_finalize not in self.jobs:
+                append_on_finalize_error(f"On_finalize job name {on_finalize} not exists in jobs.")
+            else:
+                if _is_control_flow_node(on_finalize):
+                    append_on_finalize_error("On_finalize job should not be a control flow node.")
+                elif not _is_isolated_job(on_finalize):
+                    append_on_finalize_error("On_finalize job should not have connection to other execution node.")
+        return validation_result
+
+    def _remove_pipeline_input(self) -> None:
+        """Remove None pipeline input.If not remove, it will pass "None" to backend."""
+        redundant_pipeline_inputs = []
+        for pipeline_input_name, pipeline_input in self._inputs.items():
+            if isinstance(pipeline_input, PipelineInput) and pipeline_input._data is None:
+                redundant_pipeline_inputs.append(pipeline_input_name)
+        for redundant_pipeline_input in redundant_pipeline_inputs:
+            self._inputs.pop(redundant_pipeline_input)
+
+    def _check_private_preview_features(self) -> None:
+        """Checks is private preview features included in pipeline.
+
+        If private preview environment not set, raise exception.
+        """
+        if not is_private_preview_enabled():
+            error_msg = (
+                "{} is a private preview feature, "
+                f"please set environment variable {AZUREML_PRIVATE_FEATURES_ENV_VAR} to true to use it."
+            )
+            # check has not supported nodes
+            for _, node in self.jobs.items():
+                # TODO: Remove in PuP
+                if isinstance(node, (ImportJob, Import)):
+                    msg = error_msg.format("Import job in pipeline")
+                    raise UserErrorException(message=msg, no_personal_data_message=msg)
+
+    def _to_node(self, context: Optional[Dict] = None, **kwargs: Any) -> "Pipeline":
+        """Translate a command job to a pipeline node when load schema.
+
+        (Write a pipeline job as node in yaml is not supported presently.)
+
+        :param context: Context of command job YAML file.
+        :type context: dict
+        :return: Translated command component.
+        :rtype: Pipeline
+        """
+        component = self._to_component(context, **kwargs)
+
+        return Pipeline(
+            component=component,
+            compute=self.compute,
+            # Need to supply the inputs with double curly.
+            inputs=self.inputs,
+            outputs=self.outputs,
+            description=self.description,
+            tags=self.tags,
+            display_name=self.display_name,
+            properties=self.properties,
+        )
+
+    def _to_rest_object(self) -> JobBase:
+        """Build current parameterized pipeline instance to a pipeline job object before submission.
+
+        :return: Rest pipeline job.
+        :rtype: JobBase
+        """
+        # Check if there are private preview features in it
+        self._check_private_preview_features()
+
+        # Build the inputs to dict. Handle both value & binding assignment.
+        # Example: {
+        #   "input_data": {"data": {"path": "path/to/input/data"},  "mode"="Mount"},
+        #   "input_value": 10,
+        #   "learning_rate": "${{jobs.step1.inputs.learning_rate}}"
+        # }
+        built_inputs = self._build_inputs()
+
+        # Build the outputs to dict
+        # example: {"eval_output": "${{jobs.eval.outputs.eval_output}}"}
+        built_outputs = self._build_outputs()
+
+        if self.settings is not None:
+            settings_dict = self.settings._to_dict()
+
+        if isinstance(self.component, PipelineComponent):
+            source = self.component._source
+            # Build the jobs to dict
+            rest_component_jobs = self.component._build_rest_component_jobs()
+        else:
+            source = ComponentSource.REMOTE_WORKSPACE_JOB
+            rest_component_jobs = {}
+        # add _source on pipeline job.settings
+        if "_source" not in settings_dict:  # pylint: disable=possibly-used-before-assignment
+            settings_dict.update({"_source": source})
+
+        # TODO: Revisit this logic when multiple types of component jobs are supported
+        rest_compute = self.compute
+        # This will be resolved in job_operations _resolve_arm_id_or_upload_dependencies.
+        component_id = self.component if isinstance(self.component, str) else self.component.id
+
+        # TODO remove it in the future.
+        # MFE not support pass None or empty input value. Remove the empty inputs in pipeline job.
+        built_inputs = {k: v for k, v in built_inputs.items() if v is not None and v != ""}
+
+        pipeline_job = RestPipelineJob(
+            compute_id=rest_compute,
+            component_id=component_id,
+            display_name=self.display_name,
+            tags=self.tags,
+            description=self.description,
+            properties=self.properties,
+            experiment_name=self.experiment_name,
+            jobs=rest_component_jobs,
+            inputs=to_rest_dataset_literal_inputs(built_inputs, job_type=self.type),
+            outputs=to_rest_data_outputs(built_outputs),
+            settings=settings_dict,
+            services={k: v._to_rest_object() for k, v in self.services.items()} if self.services else None,
+            identity=self.identity._to_job_rest_object() if self.identity else None,
+        )
+
+        rest_job = JobBase(properties=pipeline_job)
+        rest_job.name = self.name
+        return rest_job
+
+    @classmethod
+    def _load_from_rest(cls, obj: JobBase) -> "PipelineJob":
+        """Build a pipeline instance from rest pipeline object.
+
+        :param obj: The REST Pipeline Object
+        :type obj: JobBase
+        :return: pipeline job.
+        :rtype: PipelineJob
+        """
+        properties: RestPipelineJob = obj.properties
+        # Workaround for BatchEndpoint as these fields are not filled in
+        # Unpack the inputs
+        from_rest_inputs = from_rest_inputs_to_dataset_literal(properties.inputs) or {}
+        from_rest_outputs = from_rest_data_outputs(properties.outputs) or {}
+        # Unpack the component jobs
+        sub_nodes = PipelineComponent._resolve_sub_nodes(properties.jobs) if properties.jobs else {}
+        # backend may still store Camel settings, eg: DefaultDatastore, translate them to snake when load back
+        settings_dict = transform_dict_keys(properties.settings, camel_to_snake) if properties.settings else None
+        settings_sdk = PipelineJobSettings(**settings_dict) if settings_dict else PipelineJobSettings()
+        # Create component or use component id
+        if getattr(properties, "component_id", None):
+            component = properties.component_id
+        else:
+            component = PipelineComponent._load_from_rest_pipeline_job(
+                {
+                    "inputs": from_rest_inputs,
+                    "outputs": from_rest_outputs,
+                    "display_name": properties.display_name,
+                    "description": properties.description,
+                    "jobs": sub_nodes,
+                }
+            )
+
+        job = PipelineJob(
+            component=component,
+            inputs=from_rest_inputs,
+            outputs=from_rest_outputs,
+            name=obj.name,
+            id=obj.id,
+            jobs=sub_nodes,
+            display_name=properties.display_name,
+            tags=properties.tags,
+            properties=properties.properties,
+            experiment_name=properties.experiment_name,
+            status=properties.status,
+            creation_context=SystemData._from_rest_object(obj.system_data) if obj.system_data else None,
+            services=JobServiceBase._from_rest_job_services(properties.services) if properties.services else None,
+            compute=get_resource_name_from_arm_id_safe(properties.compute_id),
+            settings=settings_sdk,
+            identity=(
+                _BaseJobIdentityConfiguration._from_rest_object(properties.identity) if properties.identity else None
+            ),
+        )
+
+        return job
+
+    def _to_dict(self) -> Dict:
+        res: dict = self._dump_for_validation()
+        return res
+
+    @classmethod
+    def _component_items_from_path(cls, data: Dict) -> Generator:
+        if "jobs" in data:
+            for node_name, job_instance in data["jobs"].items():
+                potential_component_path = job_instance["component"] if "component" in job_instance else None
+                if isinstance(potential_component_path, str) and potential_component_path.startswith("file:"):
+                    yield node_name, potential_component_path
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "PipelineJob":
+        path_first_occurrence: dict = {}
+        component_first_occurrence = {}
+        for node_name, component_path in cls._component_items_from_path(data):
+            if component_path in path_first_occurrence:
+                component_first_occurrence[node_name] = path_first_occurrence[component_path]
+                # set components to be replaced here may break the validation logic
+            else:
+                path_first_occurrence[component_path] = node_name
+
+        # use this instead of azure.ai.ml.entities._util.load_from_dict to avoid parsing
+        loaded_schema = cls._create_schema_for_validation(context=context).load(data, **kwargs)
+
+        # replace repeat component with first occurrence to reduce arm id resolution
+        # current load yaml file logic is in azure.ai.ml._schema.core.schema.YamlFileSchema.load_from_file
+        # is it possible to load the same yaml file only once in 1 pipeline loading?
+        for node_name, first_occurrence in component_first_occurrence.items():
+            job = loaded_schema["jobs"][node_name]
+            job._component = loaded_schema["jobs"][first_occurrence].component
+            # For Parallel job, should also align task attribute which is usually from component.task
+            if isinstance(job, Parallel):
+                job.task = job._component.task
+                # parallel.task.code is based on parallel._component.base_path, so need to update it
+                job._base_path = job._component.base_path
+        return PipelineJob(
+            base_path=context[BASE_PATH_CONTEXT_KEY],
+            _source=ComponentSource.YAML_JOB,
+            **loaded_schema,
+        )
+
+    def __str__(self) -> str:
+        try:
+            res_to_yaml: str = self._to_yaml()
+            return res_to_yaml
+        except BaseException:  # pylint: disable=W0718
+            res: str = super(PipelineJob, self).__str__()
+            return res
+
+    def _get_telemetry_values(self) -> Dict:
+        telemetry_values: dict = super()._get_telemetry_values()
+        if isinstance(self.component, PipelineComponent):
+            telemetry_values.update(self.component._get_telemetry_values())
+        else:
+            telemetry_values.update({"source": ComponentSource.REMOTE_WORKSPACE_JOB})
+        telemetry_values.pop("is_anonymous")
+        return telemetry_values
+
+    def _to_component(self, context: Optional[Dict] = None, **kwargs: Any) -> "PipelineComponent":
+        """Translate a pipeline job to pipeline component.
+
+        :param context: Context of pipeline job YAML file.
+        :type context: dict
+        :return: Translated pipeline component.
+        :rtype: PipelineComponent
+        """
+        ignored_keys = PipelineComponent._check_ignored_keys(self)
+        if ignored_keys:
+            name = self.name or self.display_name
+            name = f"{name!r} " if name else ""
+            module_logger.warning("%s ignored when translating PipelineJob %sto PipelineComponent.", ignored_keys, name)
+        pipeline_job_dict = kwargs.get("pipeline_job_dict", {})
+        context = context or {BASE_PATH_CONTEXT_KEY: Path("./")}
+
+        # Create anonymous pipeline component with default version as 1
+        return PipelineComponent(
+            base_path=context[BASE_PATH_CONTEXT_KEY],
+            display_name=self.display_name,
+            inputs=self._to_inputs(inputs=self.inputs, pipeline_job_dict=pipeline_job_dict),
+            outputs=self._to_outputs(outputs=self.outputs, pipeline_job_dict=pipeline_job_dict),
+            jobs=self.jobs,
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job_settings.py
new file mode 100644
index 00000000..0fe41e2e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/pipeline/pipeline_job_settings.py
@@ -0,0 +1,75 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, Generator, Optional
+
+from azure.ai.ml.entities._job.pipeline._attr_dict import _AttrDict
+
+
+class PipelineJobSettings(_AttrDict):
+    """Settings of PipelineJob.
+
+    :param default_datastore: The default datastore of the pipeline.
+    :type default_datastore: str
+    :param default_compute: The default compute target of the pipeline.
+    :type default_compute: str
+    :param continue_on_step_failure: Flag indicating whether to continue pipeline execution if a step fails.
+    :type continue_on_step_failure: bool
+    :param force_rerun: Flag indicating whether to force rerun pipeline execution.
+    :type force_rerun: bool
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_pipeline_job_configurations.py
+            :start-after: [START configure_pipeline_job_and_settings]
+            :end-before: [END configure_pipeline_job_and_settings]
+            :language: python
+            :dedent: 8
+            :caption: Shows how to set pipeline properties using this class.
+    """
+
+    def __init__(
+        self,
+        default_datastore: Optional[str] = None,
+        default_compute: Optional[str] = None,
+        continue_on_step_failure: Optional[bool] = None,
+        force_rerun: Optional[bool] = None,
+        **kwargs: Any
+    ) -> None:
+        self._init = True
+        super().__init__()
+        self.default_compute: Any = default_compute
+        self.default_datastore: Any = default_datastore
+        self.continue_on_step_failure = continue_on_step_failure
+        self.force_rerun = force_rerun
+        self.on_init = kwargs.get("on_init", None)
+        self.on_finalize = kwargs.get("on_finalize", None)
+        for k, v in kwargs.items():
+            setattr(self, k, v)
+        self._init = False
+
+    def _get_valid_keys(self) -> Generator[str, Any, None]:
+        for k, v in self.__dict__.items():
+            if v is None:
+                continue
+            # skip private attributes inherited from _AttrDict
+            if k in ["_logger", "_allowed_keys", "_init", "_key_restriction"]:
+                continue
+            yield k
+
+    def _to_dict(self) -> Dict:
+        result = {}
+        for k in self._get_valid_keys():
+            result[k] = self.__dict__[k]
+        result.update(self._get_attrs())
+        return result
+
+    def _initializing(self) -> bool:
+        return self._init
+
+    def __bool__(self) -> bool:
+        for _ in self._get_valid_keys():
+            return True
+        # _attr_dict will return False if no extra attributes are set
+        return self.__len__() > 0