about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/__init__.py28
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/base_node.py568
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command.py1017
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py314
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/condition_node.py146
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/control_flow_node.py170
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py575
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer_func.py335
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/do_while.py357
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/fl_scatter_gather.py886
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_func.py93
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_node.py205
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel.py551
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_for.py362
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py285
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py225
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark.py663
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py306
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/subcomponents.py59
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/sweep.py454
20 files changed, 7599 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/__init__.py
new file mode 100644
index 00000000..95dfca0a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/__init__.py
@@ -0,0 +1,28 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from .base_node import BaseNode, parse_inputs_outputs
+from .command import Command
+from .do_while import DoWhile
+from .import_node import Import
+from .parallel import Parallel
+from .pipeline import Pipeline
+from .spark import Spark
+from .sweep import Sweep
+from .data_transfer import DataTransfer, DataTransferCopy, DataTransferImport, DataTransferExport
+
+__all__ = [
+    "BaseNode",
+    "Sweep",
+    "Parallel",
+    "Command",
+    "Import",
+    "Spark",
+    "Pipeline",
+    "parse_inputs_outputs",
+    "DoWhile",
+    "DataTransfer",
+    "DataTransferCopy",
+    "DataTransferImport",
+    "DataTransferExport",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/base_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/base_node.py
new file mode 100644
index 00000000..98eba6a5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/base_node.py
@@ -0,0 +1,568 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+
+import logging
+import os
+import uuid
+from abc import abstractmethod
+from enum import Enum
+from functools import wraps
+from typing import Any, Dict, List, Optional, Union
+
+from azure.ai.ml._utils._arm_id_utils import get_resource_name_from_arm_id_safe
+from azure.ai.ml.constants import JobType
+from azure.ai.ml.constants._common import CommonYamlFields
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.entities import Data, Model
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job._input_output_helpers import build_input_output
+from azure.ai.ml.entities._job.job import Job
+from azure.ai.ml.entities._job.pipeline._attr_dict import _AttrDict
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
+from azure.ai.ml.entities._job.pipeline._io.mixin import NodeWithGroupInputMixin
+from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression
+from azure.ai.ml.entities._job.sweep.search_space import SweepDistribution
+from azure.ai.ml.entities._mixins import YamlTranslatableMixin
+from azure.ai.ml.entities._util import convert_ordered_dict_to_dict, resolve_pipeline_parameters
+from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin
+from azure.ai.ml.exceptions import ErrorTarget, ValidationException
+
+module_logger = logging.getLogger(__name__)
+
+
+def parse_inputs_outputs(data: dict) -> dict:
+    """Parse inputs and outputs from data. If data is a list, parse each item in the list.
+
+    :param data: A dict that may contain "inputs" or "outputs" keys
+    :type data: dict
+    :return: Dict with parsed "inputs" and "outputs" keys
+    :rtype: Dict
+    """
+
+    if "inputs" in data:
+        data["inputs"] = {key: build_input_output(val) for key, val in data["inputs"].items()}
+    if "outputs" in data:
+        data["outputs"] = {key: build_input_output(val, inputs=False) for key, val in data["outputs"].items()}
+    return data
+
+
+def pipeline_node_decorator(func: Any) -> Any:
+    """Wrap a function and add its return value to the current DSL pipeline.
+
+    :param func: The function to be wrapped.
+    :type func: callable
+    :return: The wrapped function.
+    :rtype: callable
+    """
+
+    @wraps(func)
+    def wrapper(*args: Any, **kwargs: Any) -> Any:
+        automl_job = func(*args, **kwargs)
+        from azure.ai.ml.dsl._pipeline_component_builder import (
+            _add_component_to_current_definition_builder,
+            _is_inside_dsl_pipeline_func,
+        )
+
+        if _is_inside_dsl_pipeline_func():
+            # Build automl job to automl node if it's defined inside DSL pipeline func.
+            automl_job._instance_id = str(uuid.uuid4())
+            _add_component_to_current_definition_builder(automl_job)
+        return automl_job
+
+    return wrapper
+
+
+# pylint: disable=too-many-instance-attributes
+class BaseNode(Job, YamlTranslatableMixin, _AttrDict, PathAwareSchemaValidatableMixin, NodeWithGroupInputMixin):
+    """Base class for node in pipeline, used for component version consumption. Can't be instantiated directly.
+
+    You should not instantiate this class directly. Instead, you should
+    create from a builder function.
+
+    :param type: Type of pipeline node. Defaults to JobType.COMPONENT.
+    :type type: str
+    :param component: Id or instance of the component version to be run for the step
+    :type component: Component
+    :param inputs: The inputs for the node.
+    :type inputs: Optional[Dict[str, Union[
+        ~azure.ai.ml.entities._job.pipeline._io.PipelineInput,
+        ~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
+        ~azure.ai.ml.entities.Input,
+        str,
+        bool,
+        int,
+        float,
+        Enum,
+        'Input']]]
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: Optional[Dict[str, Union[str, ~azure.ai.ml.entities.Output, 'Output']]]
+    :param name: The name of the node.
+    :type name: Optional[str]
+    :param display_name: The display name of the node.
+    :type display_name: Optional[str]
+    :param description: The description of the node.
+    :type description: Optional[str]
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: Optional[Dict]
+    :param properties: The properties of the job.
+    :type properties: Optional[Dict]
+    :param comment: Comment of the pipeline node, which will be shown in designer canvas.
+    :type comment: Optional[str]
+    :param compute: Compute definition containing the compute information for the step.
+    :type compute: Optional[str]
+    :param experiment_name: Name of the experiment the job will be created under,
+        if None is provided, default will be set to current directory name.
+        Will be ignored as a pipeline step.
+    :type experiment_name: Optional[str]
+    :param kwargs: Additional keyword arguments for future compatibility.
+    """
+
+    def __init__(
+        self,
+        *,
+        type: str = JobType.COMPONENT,  # pylint: disable=redefined-builtin
+        component: Any,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        name: Optional[str] = None,
+        display_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        properties: Optional[Dict] = None,
+        comment: Optional[str] = None,
+        compute: Optional[str] = None,
+        experiment_name: Optional[str] = None,
+        **kwargs: Any,
+    ) -> None:
+        self._init = True
+        # property _source can't be set
+        source = kwargs.pop("_source", None)
+        _from_component_func = kwargs.pop("_from_component_func", False)
+        self._name: Optional[str] = None
+        super(BaseNode, self).__init__(
+            type=type,
+            name=name,
+            display_name=display_name,
+            description=description,
+            tags=tags,
+            properties=properties,
+            compute=compute,
+            experiment_name=experiment_name,
+            **kwargs,
+        )
+        self.comment = comment
+
+        # initialize io
+        inputs = resolve_pipeline_parameters(inputs)
+        inputs, outputs = inputs or {}, outputs or {}
+        # parse empty dict to None so we won't pass default mode, type to backend
+        # add `isinstance` to avoid converting to expression
+        for k, v in inputs.items():
+            if isinstance(v, dict) and v == {}:
+                inputs[k] = None
+
+        # TODO: get rid of self._job_inputs, self._job_outputs once we have unified Input
+        self._job_inputs, self._job_outputs = inputs, outputs
+        if isinstance(component, Component):
+            # Build the inputs from component input definition and given inputs, unfilled inputs will be None
+            self._inputs = self._build_inputs_dict(inputs or {}, input_definition_dict=component.inputs)
+            # Build the outputs from component output definition and given outputs, unfilled outputs will be None
+            self._outputs = self._build_outputs_dict(outputs or {}, output_definition_dict=component.outputs)
+        else:
+            # Build inputs/outputs dict without meta when definition not available
+            self._inputs = self._build_inputs_dict(inputs or {})
+            self._outputs = self._build_outputs_dict(outputs or {})
+
+        self._component = component
+        self._referenced_control_flow_node_instance_id: Optional[str] = None
+        self.kwargs = kwargs
+
+        # Generate an id for every instance
+        self._instance_id = str(uuid.uuid4())
+        if _from_component_func:
+            # add current component in pipeline stack for dsl scenario
+            self._register_in_current_pipeline_component_builder()
+
+        if source is None:
+            if isinstance(component, Component):
+                source = self._component._source
+            else:
+                source = Component._resolve_component_source_from_id(id=self._component)
+        self._source = source
+        self._validate_required_input_not_provided = True
+        self._init = False
+
+    @property
+    def name(self) -> Optional[str]:
+        """Get the name of the node.
+
+        :return: The name of the node.
+        :rtype: str
+        """
+        return self._name
+
+    @name.setter
+    def name(self, value: str) -> None:
+        """Set the name of the node.
+
+        :param value: The name to set for the node.
+        :type value: str
+        :return: None
+        """
+        # when name is not lower case, lower it to make sure it's a valid node name
+        if value and value != value.lower():
+            module_logger.warning(
+                "Changing node name %s to lower case: %s since upper case is not allowed node name.",
+                value,
+                value.lower(),
+            )
+            value = value.lower()
+        self._name = value
+
+    @classmethod
+    def _get_supported_inputs_types(cls) -> Any:
+        """Get the supported input types for node input.
+
+        :param cls: The class (or instance) to retrieve supported input types for.
+        :type cls: object
+
+        :return: A tuple of supported input types.
+        :rtype: tuple
+        """
+        # supported input types for node input
+        return (
+            PipelineInput,
+            NodeOutput,
+            Input,
+            Data,
+            Model,
+            str,
+            bool,
+            int,
+            float,
+            Enum,
+            PipelineExpression,
+        )
+
+    @property
+    def _skip_required_compute_missing_validation(self) -> bool:
+        return False
+
+    def _initializing(self) -> bool:
+        # use this to indicate ongoing init process so all attributes set during init process won't be set as
+        # arbitrary attribute in _AttrDict
+        # TODO: replace this hack
+        return self._init
+
+    def _set_base_path(self, base_path: Optional[Union[str, os.PathLike]]) -> None:
+        """Set the base path for the node.
+
+        Will be used for schema validation. If not set, will use Path.cwd() as the base path
+        (default logic defined in SchemaValidatableMixin._base_path_for_validation).
+
+        :param base_path: The new base path
+        :type base_path: Union[str, os.PathLike]
+        """
+        self._base_path = base_path
+
+    def _set_referenced_control_flow_node_instance_id(self, instance_id: str) -> None:
+        """Set the referenced control flow node instance id.
+
+        If this node is referenced to a control flow node, the instance_id will not be modified.
+
+        :param instance_id: The new instance id
+        :type instance_id: str
+        """
+        if not self._referenced_control_flow_node_instance_id:
+            self._referenced_control_flow_node_instance_id = instance_id
+
+    def _get_component_id(self) -> Union[str, Component]:
+        """Return component id if possible.
+
+        :return: The component id
+        :rtype: Union[str, Component]
+        """
+        if isinstance(self._component, Component) and self._component.id:
+            # If component is remote, return it's asset id
+            return self._component.id
+        # Otherwise, return the component version or arm id.
+        res: Union[str, Component] = self._component
+        return res
+
+    def _get_component_name(self) -> Optional[str]:
+        # first use component version/job's display name or name as component name
+        # make it unique when pipeline build finished.
+        if self._component is None:
+            return None
+        if isinstance(self._component, str):
+            return self._component
+        return str(self._component.name)
+
+    def _to_dict(self) -> Dict:
+        return dict(convert_ordered_dict_to_dict(self._dump_for_validation()))
+
+    @classmethod
+    def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
+        return ValidationException(
+            message=message,
+            no_personal_data_message=no_personal_data_message,
+            target=ErrorTarget.PIPELINE,
+        )
+
+    def _validate_inputs(self) -> MutableValidationResult:
+        validation_result = self._create_empty_validation_result()
+        if self._validate_required_input_not_provided:
+            # validate required inputs not provided
+            if isinstance(self._component, Component):
+                for key, meta in self._component.inputs.items():
+                    # raise error when required input with no default value not set
+                    if (
+                        not self._is_input_set(input_name=key)  # input not provided
+                        and meta.optional is not True  # and it's required
+                        and meta.default is None  # and it does not have default
+                    ):
+                        validation_result.append_error(
+                            yaml_path=f"inputs.{key}",
+                            message=f"Required input {key!r} for component {self.name!r} not provided.",
+                        )
+
+        inputs = self._build_inputs()
+        for input_name, input_obj in inputs.items():
+            if isinstance(input_obj, SweepDistribution):
+                validation_result.append_error(
+                    yaml_path=f"inputs.{input_name}",
+                    message=f"Input of command {self.name} is a SweepDistribution, "
+                    f"please use command.sweep to transform the command into a sweep node.",
+                )
+        return validation_result
+
+    def _customized_validate(self) -> MutableValidationResult:
+        """Validate the resource with customized logic.
+
+        Override this method to add customized validation logic.
+
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        validate_result = self._validate_inputs()
+        return validate_result
+
+    @classmethod
+    def _get_skip_fields_in_schema_validation(cls) -> List[str]:
+        return [
+            "inputs",  # processed separately
+            "outputs",  # processed separately
+            "name",
+            "display_name",
+            "experiment_name",  # name is not part of schema but may be set in dsl/yml file
+            "kwargs",
+        ]
+
+    @classmethod
+    def _get_component_attr_name(cls) -> str:
+        return "component"
+
+    @abstractmethod
+    def _to_job(self) -> Job:
+        """This private function is used by the CLI to get a plain job object
+        so that the CLI can properly serialize the object.
+
+        It is needed as BaseNode._to_dict() dumps objects using pipeline child job schema instead of standalone job
+        schema, for example Command objects dump have a nested component property, which doesn't apply to stand alone
+        command jobs. BaseNode._to_dict() needs to be able to dump to both pipeline child job dict as well as stand
+        alone job dict base on context.
+        """
+
+    @classmethod
+    def _from_rest_object(cls, obj: dict) -> "BaseNode":
+        if CommonYamlFields.TYPE not in obj:
+            obj[CommonYamlFields.TYPE] = NodeType.COMMAND
+
+        from azure.ai.ml.entities._job.pipeline._load_component import pipeline_node_factory
+
+        # todo: refine Hard code for now to support different task type for DataTransfer node
+        _type = obj[CommonYamlFields.TYPE]
+        if _type == NodeType.DATA_TRANSFER:
+            _type = "_".join([NodeType.DATA_TRANSFER, obj.get("task", "")])
+        instance: BaseNode = pipeline_node_factory.get_create_instance_func(_type)()
+        init_kwargs = instance._from_rest_object_to_init_params(obj)
+        # TODO: Bug Item number: 2883415
+        instance.__init__(**init_kwargs)  # type: ignore
+        return instance
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: dict) -> Dict:
+        """Convert the rest object to a dict containing items to init the node.
+
+        Will be used in _from_rest_object. Please override this method instead of _from_rest_object to make the logic
+        reusable.
+
+        :param obj: The REST object
+        :type obj: dict
+        :return: The init params
+        :rtype: Dict
+        """
+        inputs = obj.get("inputs", {})
+        outputs = obj.get("outputs", {})
+
+        obj["inputs"] = BaseNode._from_rest_inputs(inputs)
+        obj["outputs"] = BaseNode._from_rest_outputs(outputs)
+
+        # Change computeId -> compute
+        compute_id = obj.pop("computeId", None)
+        obj["compute"] = get_resource_name_from_arm_id_safe(compute_id)
+
+        # Change componentId -> component. Note that sweep node has no componentId.
+        if "componentId" in obj:
+            obj["component"] = obj.pop("componentId")
+
+        # distribution, sweep won't have distribution
+        if "distribution" in obj and obj["distribution"]:
+            from azure.ai.ml.entities._job.distribution import DistributionConfiguration
+
+            obj["distribution"] = DistributionConfiguration._from_rest_object(obj["distribution"])
+
+        return obj
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        """List of fields to be picked from self._to_dict() in self._to_rest_object().
+
+        By default, returns an empty list.
+
+        Override this method to add custom fields.
+
+        :return: List of fields to pick
+        :rtype: List[str]
+        """
+
+        return []
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:  # pylint: disable=unused-argument
+        """Convert self to a rest object for remote call.
+
+        :return: The rest object
+        :rtype: dict
+        """
+        base_dict, rest_obj = self._to_dict(), {}
+        for key in self._picked_fields_from_dict_to_rest_object():
+            if key in base_dict:
+                rest_obj[key] = base_dict.get(key)
+
+        rest_obj.update(
+            dict(  # pylint: disable=use-dict-literal
+                name=self.name,
+                type=self.type,
+                display_name=self.display_name,
+                tags=self.tags,
+                computeId=self.compute,
+                inputs=self._to_rest_inputs(),
+                outputs=self._to_rest_outputs(),
+                properties=self.properties,
+                _source=self._source,
+                # add all arbitrary attributes to support setting unknown attributes
+                **self._get_attrs(),
+            )
+        )
+        # only add comment in REST object when it is set
+        if self.comment is not None:
+            rest_obj.update({"comment": self.comment})
+
+        return dict(convert_ordered_dict_to_dict(rest_obj))
+
+    @property
+    def inputs(self) -> Dict:
+        """Get the inputs for the object.
+
+        :return: A dictionary containing the inputs for the object.
+        :rtype: Dict[str, Union[Input, str, bool, int, float]]
+        """
+        return self._inputs  # type: ignore
+
+    @property
+    def outputs(self) -> Dict:
+        """Get the outputs of the object.
+
+        :return: A dictionary containing the outputs for the object.
+        :rtype: Dict[str, Union[str, Output]]
+        """
+        return self._outputs  # type: ignore
+
+    def __str__(self) -> str:
+        try:
+            return str(self._to_yaml())
+        except BaseException:  # pylint: disable=W0718
+            # add try catch in case component job failed in schema parse
+            _obj: _AttrDict = _AttrDict()
+            return _obj.__str__()
+
+    def __hash__(self) -> int:  # type: ignore
+        return hash(self.__str__())
+
+    def __help__(self) -> Any:
+        # only show help when component has definition
+        if isinstance(self._component, Component):
+            # TODO: Bug Item number: 2883422
+            return self._component.__help__()  # type: ignore
+        return None
+
+    def __bool__(self) -> bool:
+        # _attr_dict will return False if no extra attributes are set
+        return True
+
+    def _get_origin_job_outputs(self) -> Dict[str, Union[str, Output]]:
+        """Restore outputs to JobOutput/BindingString and return them.
+
+        :return: The origin job outputs
+        :rtype: Dict[str, Union[str, Output]]
+        """
+        outputs: Dict = {}
+        if self.outputs is not None:
+            for output_name, output_obj in self.outputs.items():
+                if isinstance(output_obj, NodeOutput):
+                    outputs[output_name] = output_obj._data
+                else:
+                    raise TypeError("unsupported built output type: {}: {}".format(output_name, type(output_obj)))
+        return outputs
+
+    def _get_telemetry_values(self) -> Dict:
+        telemetry_values = {"type": self.type, "source": self._source}
+        return telemetry_values
+
+    def _register_in_current_pipeline_component_builder(self) -> None:
+        """Register this node in current pipeline component builder by adding self to a global stack."""
+        from azure.ai.ml.dsl._pipeline_component_builder import _add_component_to_current_definition_builder
+
+        # TODO: would it be better if we make _add_component_to_current_definition_builder a public function of
+        #  _PipelineComponentBuilderStack and make _PipelineComponentBuilderStack a singleton?
+        _add_component_to_current_definition_builder(self)
+
+    def _is_input_set(self, input_name: str) -> bool:
+        built_inputs = self._build_inputs()
+        return input_name in built_inputs and built_inputs[input_name] is not None
+
+    @classmethod
+    def _refine_optional_inputs_with_no_value(cls, node: "BaseNode", kwargs: Any) -> None:
+        """Refine optional inputs that have no default value and no value is provided when calling command/parallel
+        function.
+
+        This is to align with behavior of calling component to generate a pipeline node.
+
+        :param node: The node
+        :type node: BaseNode
+        :param kwargs: The kwargs
+        :type kwargs: dict
+        """
+        for key, value in node.inputs.items():
+            meta = value._data
+            if (
+                isinstance(meta, Input)
+                and meta._is_primitive_type is False
+                and meta.optional is True
+                and not meta.path
+                and key not in kwargs
+            ):
+                value._data = None
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command.py
new file mode 100644
index 00000000..0073307c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command.py
@@ -0,0 +1,1017 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access,too-many-lines
+import copy
+import logging
+import os
+from enum import Enum
+from os import PathLike
+from typing import Any, Dict, List, Optional, Tuple, Union, cast, overload
+
+from marshmallow import INCLUDE, Schema
+
+from azure.ai.ml._restclient.v2025_01_01_preview.models import CommandJob as RestCommandJob
+from azure.ai.ml._restclient.v2025_01_01_preview.models import JobBase
+from azure.ai.ml._schema.core.fields import ExperimentalField, NestedField, UnionField
+from azure.ai.ml._schema.job.command_job import CommandJobSchema
+from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema
+from azure.ai.ml._schema.job.services import JobServiceSchema
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, LOCAL_COMPUTE_PROPERTY, LOCAL_COMPUTE_TARGET
+from azure.ai.ml.constants._component import ComponentSource, NodeType
+from azure.ai.ml.entities._assets import Environment
+from azure.ai.ml.entities._component.command_component import CommandComponent
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+    _BaseJobIdentityConfiguration,
+)
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job._input_output_helpers import from_rest_data_outputs, from_rest_inputs_to_dataset_literal
+from azure.ai.ml.entities._job.command_job import CommandJob
+from azure.ai.ml.entities._job.distribution import (
+    DistributionConfiguration,
+    MpiDistribution,
+    PyTorchDistribution,
+    RayDistribution,
+    TensorFlowDistribution,
+)
+from azure.ai.ml.entities._job.job_limits import CommandJobLimits
+from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration
+from azure.ai.ml.entities._job.job_service import (
+    JobService,
+    JobServiceBase,
+    JupyterLabJobService,
+    SshJobService,
+    TensorBoardJobService,
+    VsCodeJobService,
+)
+from azure.ai.ml.entities._job.queue_settings import QueueSettings
+from azure.ai.ml.entities._job.sweep.early_termination_policy import EarlyTerminationPolicy
+from azure.ai.ml.entities._job.sweep.objective import Objective
+from azure.ai.ml.entities._job.sweep.search_space import (
+    Choice,
+    LogNormal,
+    LogUniform,
+    Normal,
+    QLogNormal,
+    QLogUniform,
+    QNormal,
+    QUniform,
+    Randint,
+    SweepDistribution,
+    Uniform,
+)
+from azure.ai.ml.entities._system_data import SystemData
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
+
+from ..._schema import PathAwareSchema
+from ..._schema.job.distribution import (
+    MPIDistributionSchema,
+    PyTorchDistributionSchema,
+    RayDistributionSchema,
+    TensorFlowDistributionSchema,
+)
+from .._job.pipeline._io import NodeWithGroupInputMixin
+from .._util import (
+    convert_ordered_dict_to_dict,
+    from_rest_dict_to_dummy_rest_object,
+    get_rest_dict_for_node_attrs,
+    load_from_dict,
+    validate_attribute_type,
+)
+from .base_node import BaseNode
+from .sweep import Sweep
+
+module_logger = logging.getLogger(__name__)
+
+
+class Command(BaseNode, NodeWithGroupInputMixin):
+    """Base class for command node, used for command component version consumption.
+
+    You should not instantiate this class directly. Instead, you should create it using the builder function: command().
+
+    :keyword component: The ID or instance of the command component or job to be run for the step.
+    :paramtype component: Union[str, ~azure.ai.ml.entities.CommandComponent]
+    :keyword compute: The compute target the job will run on.
+    :paramtype compute: Optional[str]
+    :keyword inputs: A mapping of input names to input data sources used in the job.
+    :paramtype inputs: Optional[dict[str, Union[
+        ~azure.ai.ml.Input, str, bool, int, float, Enum]]]
+    :keyword outputs: A mapping of output names to output data sources used in the job.
+    :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
+    :keyword limits: The limits for the command component or job.
+    :paramtype limits: ~azure.ai.ml.entities.CommandJobLimits
+    :keyword identity: The identity that the command job will use while running on compute.
+    :paramtype identity: Optional[Union[
+        dict[str, str],
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration]]
+    :keyword distribution: The configuration for distributed jobs.
+    :paramtype distribution: Optional[Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
+        ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]]
+    :keyword environment: The environment that the job will run in.
+    :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+    :keyword environment_variables:  A dictionary of environment variable names and values.
+        These environment variables are set on the process where the user script is being executed.
+    :paramtype environment_variables: Optional[dict[str, str]]
+    :keyword resources: The compute resource configuration for the command.
+    :paramtype resources: Optional[~azure.ai.ml.entities.JobResourceConfiguration]
+    :keyword services: The interactive services for the node. This is an experimental parameter, and may change at any
+        time. Please see https://aka.ms/azuremlexperimental for more information.
+    :paramtype services: Optional[dict[str, Union[~azure.ai.ml.entities.JobService,
+        ~azure.ai.ml.entities.JupyterLabJobService,
+        ~azure.ai.ml.entities.SshJobService, ~azure.ai.ml.entities.TensorBoardJobService,
+        ~azure.ai.ml.entities.VsCodeJobService]]]
+    :keyword queue_settings: Queue settings for the job.
+    :paramtype queue_settings: Optional[~azure.ai.ml.entities.QueueSettings]
+    :keyword parent_job_name: parent job id for command job
+    :paramtype parent_job_name: Optional[str]
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Command cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    # pylint: disable=too-many-instance-attributes
+    def __init__(
+        self,
+        *,
+        component: Union[str, CommandComponent],
+        compute: Optional[str] = None,
+        inputs: Optional[
+            Dict[
+                str,
+                Union[
+                    Input,
+                    str,
+                    bool,
+                    int,
+                    float,
+                    Enum,
+                ],
+            ]
+        ] = None,
+        outputs: Optional[Dict[str, Union[str, Output]]] = None,
+        limits: Optional[CommandJobLimits] = None,
+        identity: Optional[
+            Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
+        ] = None,
+        distribution: Optional[
+            Union[
+                Dict,
+                MpiDistribution,
+                TensorFlowDistribution,
+                PyTorchDistribution,
+                RayDistribution,
+                DistributionConfiguration,
+            ]
+        ] = None,
+        environment: Optional[Union[Environment, str]] = None,
+        environment_variables: Optional[Dict] = None,
+        resources: Optional[JobResourceConfiguration] = None,
+        services: Optional[
+            Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]]
+        ] = None,
+        queue_settings: Optional[QueueSettings] = None,
+        parent_job_name: Optional[str] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+        # resolve normal dict to dict[str, JobService]
+        services = _resolve_job_services(services)
+        kwargs.pop("type", None)
+        self._parameters: dict = kwargs.pop("parameters", {})
+        BaseNode.__init__(
+            self,
+            type=NodeType.COMMAND,
+            inputs=inputs,
+            outputs=outputs,
+            component=component,
+            compute=compute,
+            services=services,
+            **kwargs,
+        )
+
+        # init mark for _AttrDict
+        self._init = True
+        # initialize command job properties
+        self.limits = limits
+        self.identity = identity
+        self._distribution = distribution
+        self.environment_variables = {} if environment_variables is None else environment_variables
+        self.environment: Any = environment
+        self._resources = resources
+        self._services = services
+        self.queue_settings = queue_settings
+        self.parent_job_name = parent_job_name
+
+        if isinstance(self.component, CommandComponent):
+            self.resources = self.resources or self.component.resources  # type: ignore[assignment]
+            self.distribution = self.distribution or self.component.distribution
+
+        self._swept: bool = False
+        self._init = False
+
+    @classmethod
+    def _get_supported_inputs_types(cls) -> Tuple:
+        supported_types = super()._get_supported_inputs_types() or ()
+        return (
+            SweepDistribution,
+            *supported_types,
+        )
+
+    @classmethod
+    def _get_supported_outputs_types(cls) -> Tuple:
+        return str, Output
+
+    @property
+    def parameters(self) -> Dict[str, str]:
+        """MLFlow parameters to be logged during the job.
+
+        :return: The MLFlow parameters to be logged during the job.
+        :rtype: dict[str, str]
+        """
+        return self._parameters
+
+    @property
+    def distribution(
+        self,
+    ) -> Optional[
+        Union[
+            Dict,
+            MpiDistribution,
+            TensorFlowDistribution,
+            PyTorchDistribution,
+            RayDistribution,
+            DistributionConfiguration,
+        ]
+    ]:
+        """The configuration for the distributed command component or job.
+
+        :return: The configuration for distributed jobs.
+        :rtype: Union[~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
+            ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]
+        """
+        return self._distribution
+
+    @distribution.setter
+    def distribution(
+        self,
+        value: Union[Dict, PyTorchDistribution, TensorFlowDistribution, MpiDistribution, RayDistribution],
+    ) -> None:
+        """Sets the configuration for the distributed command component or job.
+
+        :param value: The configuration for distributed jobs.
+        :type value: Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
+            ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]
+        """
+        if isinstance(value, dict):
+            dist_schema = UnionField(
+                [
+                    NestedField(PyTorchDistributionSchema, unknown=INCLUDE),
+                    NestedField(TensorFlowDistributionSchema, unknown=INCLUDE),
+                    NestedField(MPIDistributionSchema, unknown=INCLUDE),
+                    ExperimentalField(NestedField(RayDistributionSchema, unknown=INCLUDE)),
+                ]
+            )
+            value = dist_schema._deserialize(value=value, attr=None, data=None)
+        self._distribution = value
+
+    @property
+    def resources(self) -> JobResourceConfiguration:
+        """The compute resource configuration for the command component or job.
+
+        :rtype: ~azure.ai.ml.entities.JobResourceConfiguration
+        """
+        return cast(JobResourceConfiguration, self._resources)
+
+    @resources.setter
+    def resources(self, value: Union[Dict, JobResourceConfiguration]) -> None:
+        """Sets the compute resource configuration for the command component or job.
+
+        :param value: The compute resource configuration for the command component or job.
+        :type value: Union[dict, ~azure.ai.ml.entities.JobResourceConfiguration]
+        """
+        if isinstance(value, dict):
+            value = JobResourceConfiguration(**value)
+        self._resources = value
+
+    @property
+    def queue_settings(self) -> Optional[QueueSettings]:
+        """The queue settings for the command component or job.
+
+        :return: The queue settings for the command component or job.
+        :rtype: ~azure.ai.ml.entities.QueueSettings
+        """
+        return self._queue_settings
+
+    @queue_settings.setter
+    def queue_settings(self, value: Union[Dict, QueueSettings]) -> None:
+        """Sets the queue settings for the command component or job.
+
+        :param value: The queue settings for the command component or job.
+        :type value: Union[dict, ~azure.ai.ml.entities.QueueSettings]
+        """
+        if isinstance(value, dict):
+            value = QueueSettings(**value)
+        self._queue_settings = value
+
+    @property
+    def identity(
+        self,
+    ) -> Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]:
+        """The identity that the job will use while running on compute.
+
+        :return: The identity that the job will use while running on compute.
+        :rtype: Optional[Union[~azure.ai.ml.ManagedIdentityConfiguration, ~azure.ai.ml.AmlTokenConfiguration,
+            ~azure.ai.ml.UserIdentityConfiguration]]
+        """
+        return self._identity
+
+    @identity.setter
+    def identity(
+        self,
+        value: Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]],
+    ) -> None:
+        """Sets the identity that the job will use while running on compute.
+
+        :param value: The identity that the job will use while running on compute.
+        :type value: Union[dict[str, str], ~azure.ai.ml.ManagedIdentityConfiguration,
+            ~azure.ai.ml.AmlTokenConfiguration, ~azure.ai.ml.UserIdentityConfiguration]
+        """
+        if isinstance(value, dict):
+            identity_schema = UnionField(
+                [
+                    NestedField(ManagedIdentitySchema, unknown=INCLUDE),
+                    NestedField(AMLTokenIdentitySchema, unknown=INCLUDE),
+                    NestedField(UserIdentitySchema, unknown=INCLUDE),
+                ]
+            )
+            value = identity_schema._deserialize(value=value, attr=None, data=None)
+        self._identity = value
+
+    @property
+    def services(
+        self,
+    ) -> Optional[
+        Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]]
+    ]:
+        """The interactive services for the node.
+
+        This is an experimental parameter, and may change at any time.
+        Please see https://aka.ms/azuremlexperimental for more information.
+
+        :rtype: dict[str, Union[~azure.ai.ml.entities.JobService, ~azure.ai.ml.entities.JupyterLabJobService,
+            ~azure.ai.ml.entities.SshJobService, ~azure.ai.ml.entities.TensorBoardJobService,
+            ~azure.ai.ml.entities.VsCodeJobService]]
+        """
+        return self._services
+
+    @services.setter
+    def services(
+        self,
+        value: Dict,
+    ) -> None:
+        """Sets the interactive services for the node.
+
+        This is an experimental parameter, and may change at any time.
+        Please see https://aka.ms/azuremlexperimental for more information.
+
+        :param value: The interactive services for the node.
+        :type value: dict[str, Union[~azure.ai.ml.entities.JobService, ~azure.ai.ml.entities.JupyterLabJobService,
+            ~azure.ai.ml.entities.SshJobService, ~azure.ai.ml.entities.TensorBoardJobService,
+            ~azure.ai.ml.entities.VsCodeJobService]]
+        """
+        self._services = _resolve_job_services(value)  # type: ignore[assignment]
+
+    @property
+    def component(self) -> Union[str, CommandComponent]:
+        """The ID or instance of the command component or job to be run for the step.
+
+        :return: The ID or instance of the command component or job to be run for the step.
+        :rtype: Union[str, ~azure.ai.ml.entities.CommandComponent]
+        """
+        return self._component
+
+    @property
+    def command(self) -> Optional[str]:
+        """The command to be executed.
+
+        :rtype: Optional[str]
+        """
+        # the same as code
+        if not isinstance(self.component, CommandComponent):
+            return None
+
+        if self.component.command is None:
+            return None
+        return str(self.component.command)
+
+    @command.setter
+    def command(self, value: str) -> None:
+        """Sets the command to be executed.
+
+        :param value: The command to be executed.
+        :type value: str
+        """
+        if isinstance(self.component, CommandComponent):
+            self.component.command = value
+        else:
+            msg = "Can't set command property for a registered component {}. Tried to set it to {}."
+            raise ValidationException(
+                message=msg.format(self.component, value),
+                no_personal_data_message=msg,
+                target=ErrorTarget.COMMAND_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+
+    @property
+    def code(self) -> Optional[Union[str, PathLike]]:
+        """The source code to run the job.
+
+        :rtype: Optional[Union[str, os.PathLike]]
+        """
+        # BaseNode is an _AttrDict to allow dynamic attributes, so that lower version of SDK can work with attributes
+        # added in higher version of SDK.
+        # self.code will be treated as an Arbitrary attribute if it raises AttributeError in getting
+        # (when self.component doesn't have attribute code, self.component = 'azureml:xxx:1' e.g.
+        # you may check _AttrDict._is_arbitrary_attr for detailed logic for Arbitrary judgement),
+        # then its value will be set to _AttrDict and be deserialized as {"shape": {}} instead of None,
+        # which is invalid in schema validation.
+        if not isinstance(self.component, CommandComponent):
+            return None
+
+        if self.component.code is None:
+            return None
+
+        return str(self.component.code)
+
+    @code.setter
+    def code(self, value: str) -> None:
+        """Sets the source code to run the job.
+
+        :param value: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url
+            pointing to a remote location.
+        :type value: str
+        """
+        if isinstance(self.component, CommandComponent):
+            self.component.code = value
+        else:
+            msg = "Can't set code property for a registered component {}"
+            raise ValidationException(
+                message=msg.format(self.component),
+                no_personal_data_message=msg,
+                target=ErrorTarget.COMMAND_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+
+    def set_resources(
+        self,
+        *,
+        instance_type: Optional[Union[str, List[str]]] = None,
+        instance_count: Optional[int] = None,
+        locations: Optional[List[str]] = None,
+        properties: Optional[Dict] = None,
+        docker_args: Optional[Union[str, List[str]]] = None,
+        shm_size: Optional[str] = None,
+        # pylint: disable=unused-argument
+        **kwargs: Any,
+    ) -> None:
+        """Set resources for Command.
+
+        :keyword instance_type: The type of compute instance to run the job on. If not specified, the job will run on
+            the default compute target.
+        :paramtype instance_type: Optional[Union[str, List[str]]]
+        :keyword instance_count: The number of instances to run the job on. If not specified, the job will run on a
+            single instance.
+        :paramtype instance_count: Optional[int]
+        :keyword locations: The list of locations where the job will run. If not specified, the job will run on the
+            default compute target.
+        :paramtype locations: Optional[List[str]]
+        :keyword properties: The properties of the job.
+        :paramtype properties: Optional[dict]
+        :keyword docker_args: The Docker arguments for the job.
+        :paramtype docker_args: Optional[Union[str,List[str]]]
+        :keyword shm_size: The size of the docker container's shared memory block. This should be in the
+            format of (number)(unit) where the number has to be greater than 0 and the unit can be one of
+            b(bytes), k(kilobytes), m(megabytes), or g(gigabytes).
+        :paramtype shm_size: Optional[str]
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_command_configurations.py
+                :start-after: [START command_set_resources]
+                :end-before: [END command_set_resources]
+                :language: python
+                :dedent: 8
+                :caption: Setting resources on a Command.
+        """
+        if self.resources is None:
+            self.resources = JobResourceConfiguration()
+
+        if locations is not None:
+            self.resources.locations = locations
+        if instance_type is not None:
+            self.resources.instance_type = instance_type
+        if instance_count is not None:
+            self.resources.instance_count = instance_count
+        if properties is not None:
+            self.resources.properties = properties
+        if docker_args is not None:
+            self.resources.docker_args = docker_args
+        if shm_size is not None:
+            self.resources.shm_size = shm_size
+
+        # Save the resources to internal component as well, otherwise calling sweep() will loose the settings
+        if isinstance(self.component, CommandComponent):
+            self.component.resources = self.resources
+
+    def set_limits(self, *, timeout: int, **kwargs: Any) -> None:  # pylint: disable=unused-argument
+        """Set limits for Command.
+
+        :keyword timeout: The timeout for the job in seconds.
+        :paramtype timeout: int
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_command_configurations.py
+                :start-after: [START command_set_limits]
+                :end-before: [END command_set_limits]
+                :language: python
+                :dedent: 8
+                :caption: Setting a timeout limit of 10 seconds on a Command.
+        """
+        if isinstance(self.limits, CommandJobLimits):
+            self.limits.timeout = timeout
+        else:
+            self.limits = CommandJobLimits(timeout=timeout)
+
+    def set_queue_settings(self, *, job_tier: Optional[str] = None, priority: Optional[str] = None) -> None:
+        """Set QueueSettings for the job.
+
+        :keyword job_tier: The job tier. Accepted values are "Spot", "Basic", "Standard", or "Premium".
+        :paramtype job_tier: Optional[str]
+        :keyword priority:  The priority of the job on the compute. Defaults to "Medium".
+        :paramtype priority: Optional[str]
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_command_configurations.py
+                :start-after: [START command_set_queue_settings]
+                :end-before: [END command_set_queue_settings]
+                :language: python
+                :dedent: 8
+                :caption: Configuring queue settings on a Command.
+        """
+        if isinstance(self.queue_settings, QueueSettings):
+            self.queue_settings.job_tier = job_tier
+            self.queue_settings.priority = priority
+        else:
+            self.queue_settings = QueueSettings(job_tier=job_tier, priority=priority)
+
+    def sweep(
+        self,
+        *,
+        primary_metric: str,
+        goal: str,
+        sampling_algorithm: str = "random",
+        compute: Optional[str] = None,
+        max_concurrent_trials: Optional[int] = None,
+        max_total_trials: Optional[int] = None,
+        timeout: Optional[int] = None,
+        trial_timeout: Optional[int] = None,
+        early_termination_policy: Optional[Union[EarlyTerminationPolicy, str]] = None,
+        search_space: Optional[
+            Dict[
+                str,
+                Union[
+                    Choice, LogNormal, LogUniform, Normal, QLogNormal, QLogUniform, QNormal, QUniform, Randint, Uniform
+                ],
+            ]
+        ] = None,
+        identity: Optional[
+            Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
+        ] = None,
+        queue_settings: Optional[QueueSettings] = None,
+        job_tier: Optional[str] = None,
+        priority: Optional[str] = None,
+    ) -> Sweep:
+        """Turns the command into a sweep node with extra sweep run setting. The command component
+        in the current command node will be used as its trial component. A command node can sweep
+        multiple times, and the generated sweep node will share the same trial component.
+
+        :keyword primary_metric: The primary metric of the sweep objective - e.g. AUC (Area Under the Curve).
+            The metric must be logged while running the trial component.
+        :paramtype primary_metric: str
+        :keyword goal: The goal of the Sweep objective. Accepted values are "minimize" or "maximize".
+        :paramtype goal: str
+        :keyword sampling_algorithm: The sampling algorithm to use inside the search space.
+            Acceptable values are "random", "grid", or "bayesian". Defaults to "random".
+        :paramtype sampling_algorithm: str
+        :keyword compute: The target compute to run the node on. If not specified, the current node's compute
+            will be used.
+        :paramtype compute: Optional[str]
+        :keyword max_total_trials: The maximum number of total trials to run. This value will overwrite the value in
+            CommandJob.limits if specified.
+        :paramtype max_total_trials: Optional[int]
+        :keyword max_concurrent_trials: The maximum number of concurrent trials for the Sweep job.
+        :paramtype max_concurrent_trials: Optional[int]
+        :keyword timeout: The maximum run duration in seconds, after which the job will be cancelled.
+        :paramtype timeout: Optional[int]
+        :keyword trial_timeout: The Sweep Job trial timeout value, in seconds.
+        :paramtype trial_timeout: Optional[int]
+        :keyword early_termination_policy: The early termination policy of the sweep node. Acceptable
+            values are "bandit", "median_stopping", or "truncation_selection". Defaults to None.
+        :paramtype early_termination_policy: Optional[Union[~azure.ai.ml.sweep.BanditPolicy,
+            ~azure.ai.ml.sweep.TruncationSelectionPolicy, ~azure.ai.ml.sweep.MedianStoppingPolicy, str]]
+        :keyword identity: The identity that the job will use while running on compute.
+        :paramtype identity: Optional[Union[
+            ~azure.ai.ml.ManagedIdentityConfiguration,
+            ~azure.ai.ml.AmlTokenConfiguration,
+            ~azure.ai.ml.UserIdentityConfiguration]]
+        :keyword search_space: The search space to use for the sweep job.
+        :paramtype search_space: Optional[Dict[str, Union[
+            Choice,
+            LogNormal,
+            LogUniform,
+            Normal,
+            QLogNormal,
+            QLogUniform,
+            QNormal,
+            QUniform,
+            Randint,
+            Uniform
+
+        ]]]
+
+        :keyword queue_settings: The queue settings for the job.
+        :paramtype queue_settings: Optional[~azure.ai.ml.entities.QueueSettings]
+        :keyword job_tier: **Experimental** The job tier. Accepted values are "Spot", "Basic",
+            "Standard", or "Premium".
+        :paramtype job_tier: Optional[str]
+        :keyword priority: **Experimental** The compute priority. Accepted values are "low",
+            "medium", and "high".
+        :paramtype priority: Optional[str]
+        :return: A Sweep node with the component from current Command node as its trial component.
+        :rtype: ~azure.ai.ml.entities.Sweep
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_sweep_configurations.py
+                :start-after: [START configure_sweep_job_bandit_policy]
+                :end-before: [END configure_sweep_job_bandit_policy]
+                :language: python
+                :dedent: 8
+                :caption: Creating a Sweep node from a Command job.
+        """
+        self._swept = True
+        # inputs & outputs are already built in source Command obj
+        inputs, inputs_search_space = Sweep._get_origin_inputs_and_search_space(self.inputs)
+        if search_space:
+            inputs_search_space.update(search_space)
+
+        if not queue_settings:
+            queue_settings = self.queue_settings
+        if queue_settings is not None:
+            if job_tier is not None:
+                queue_settings.job_tier = job_tier
+            if priority is not None:
+                queue_settings.priority = priority
+
+        sweep_node = Sweep(
+            trial=copy.deepcopy(
+                self.component
+            ),  # Make a copy of the underneath Component so that the original node can still be used.
+            compute=self.compute if compute is None else compute,
+            objective=Objective(goal=goal, primary_metric=primary_metric),
+            sampling_algorithm=sampling_algorithm,
+            inputs=inputs,
+            outputs=self._get_origin_job_outputs(),
+            search_space=inputs_search_space,
+            early_termination=early_termination_policy,
+            name=self.name,
+            description=self.description,
+            display_name=self.display_name,
+            tags=self.tags,
+            properties=self.properties,
+            experiment_name=self.experiment_name,
+            identity=self.identity if not identity else identity,
+            _from_component_func=True,
+            queue_settings=queue_settings,
+        )
+        sweep_node.set_limits(
+            max_total_trials=max_total_trials,
+            max_concurrent_trials=max_concurrent_trials,
+            timeout=timeout,
+            trial_timeout=trial_timeout,
+        )
+        return sweep_node
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "component": (str, CommandComponent),
+            "environment": (str, Environment),
+            "environment_variables": dict,
+            "resources": (dict, JobResourceConfiguration),
+            "limits": (dict, CommandJobLimits),
+            "code": (str, os.PathLike),
+        }
+
+    def _to_job(self) -> CommandJob:
+        if isinstance(self.component, CommandComponent):
+            return CommandJob(
+                id=self.id,
+                name=self.name,
+                display_name=self.display_name,
+                description=self.description,
+                tags=self.tags,
+                properties=self.properties,
+                command=self.component.command,
+                experiment_name=self.experiment_name,
+                code=self.component.code,
+                compute=self.compute,
+                status=self.status,
+                environment=self.environment,
+                distribution=self.distribution,
+                identity=self.identity,
+                environment_variables=self.environment_variables,
+                resources=self.resources,
+                limits=self.limits,
+                inputs=self._job_inputs,
+                outputs=self._job_outputs,
+                services=self.services,
+                creation_context=self.creation_context,
+                parameters=self.parameters,
+                queue_settings=self.queue_settings,
+                parent_job_name=self.parent_job_name,
+            )
+
+        return CommandJob(
+            id=self.id,
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            tags=self.tags,
+            properties=self.properties,
+            command=None,
+            experiment_name=self.experiment_name,
+            code=None,
+            compute=self.compute,
+            status=self.status,
+            environment=self.environment,
+            distribution=self.distribution,
+            identity=self.identity,
+            environment_variables=self.environment_variables,
+            resources=self.resources,
+            limits=self.limits,
+            inputs=self._job_inputs,
+            outputs=self._job_outputs,
+            services=self.services,
+            creation_context=self.creation_context,
+            parameters=self.parameters,
+            queue_settings=self.queue_settings,
+            parent_job_name=self.parent_job_name,
+        )
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return ["resources", "distribution", "limits", "environment_variables", "queue_settings"]
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj = super()._to_rest_object(**kwargs)
+        for key, value in {
+            "componentId": self._get_component_id(),
+            "distribution": get_rest_dict_for_node_attrs(self.distribution, clear_empty_value=True),
+            "limits": get_rest_dict_for_node_attrs(self.limits, clear_empty_value=True),
+            "resources": get_rest_dict_for_node_attrs(self.resources, clear_empty_value=True),
+            "services": get_rest_dict_for_node_attrs(self.services),
+            "identity": get_rest_dict_for_node_attrs(self.identity),
+            "queue_settings": get_rest_dict_for_node_attrs(self.queue_settings, clear_empty_value=True),
+        }.items():
+            if value is not None:
+                rest_obj[key] = value
+        return cast(dict, convert_ordered_dict_to_dict(rest_obj))
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Command":
+        from .command_func import command
+
+        loaded_data = load_from_dict(CommandJobSchema, data, context, additional_message, **kwargs)
+
+        # resources a limits properties are flatten in command() function, exact them and set separately
+        resources = loaded_data.pop("resources", None)
+        limits = loaded_data.pop("limits", None)
+
+        command_job: Command = command(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
+
+        command_job.resources = resources
+        command_job.limits = limits
+        return command_job
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: dict) -> Dict:
+        obj = BaseNode._from_rest_object_to_init_params(obj)
+
+        if "resources" in obj and obj["resources"]:
+            obj["resources"] = JobResourceConfiguration._from_rest_object(obj["resources"])
+
+        # services, sweep won't have services
+        if "services" in obj and obj["services"]:
+            # pipeline node rest object are dicts while _from_rest_job_services expect RestJobService
+            services = {}
+            for service_name, service in obj["services"].items():
+                # in rest object of a pipeline job, service will be transferred to a dict as
+                # it's attributes of a node, but JobService._from_rest_object expect a
+                # RestJobService, so we need to convert it back. Here we convert the dict to a
+                # dummy rest object which may work as a RestJobService instead.
+                services[service_name] = from_rest_dict_to_dummy_rest_object(service)
+            obj["services"] = JobServiceBase._from_rest_job_services(services)
+
+        # handle limits
+        if "limits" in obj and obj["limits"]:
+            obj["limits"] = CommandJobLimits._from_rest_object(obj["limits"])
+
+        if "identity" in obj and obj["identity"]:
+            obj["identity"] = _BaseJobIdentityConfiguration._from_rest_object(obj["identity"])
+
+        if "queue_settings" in obj and obj["queue_settings"]:
+            obj["queue_settings"] = QueueSettings._from_rest_object(obj["queue_settings"])
+
+        return obj
+
+    @classmethod
+    def _load_from_rest_job(cls, obj: JobBase) -> "Command":
+        from .command_func import command
+
+        rest_command_job: RestCommandJob = obj.properties
+
+        command_job: Command = command(
+            name=obj.name,
+            display_name=rest_command_job.display_name,
+            description=rest_command_job.description,
+            tags=rest_command_job.tags,
+            properties=rest_command_job.properties,
+            command=rest_command_job.command,
+            experiment_name=rest_command_job.experiment_name,
+            services=JobServiceBase._from_rest_job_services(rest_command_job.services),
+            status=rest_command_job.status,
+            creation_context=SystemData._from_rest_object(obj.system_data) if obj.system_data else None,
+            code=rest_command_job.code_id,
+            compute=rest_command_job.compute_id,
+            environment=rest_command_job.environment_id,
+            distribution=DistributionConfiguration._from_rest_object(rest_command_job.distribution),
+            parameters=rest_command_job.parameters,
+            identity=(
+                _BaseJobIdentityConfiguration._from_rest_object(rest_command_job.identity)
+                if rest_command_job.identity
+                else None
+            ),
+            environment_variables=rest_command_job.environment_variables,
+            inputs=from_rest_inputs_to_dataset_literal(rest_command_job.inputs),
+            outputs=from_rest_data_outputs(rest_command_job.outputs),
+        )
+        command_job._id = obj.id
+        command_job.resources = cast(
+            JobResourceConfiguration, JobResourceConfiguration._from_rest_object(rest_command_job.resources)
+        )
+        command_job.limits = CommandJobLimits._from_rest_object(rest_command_job.limits)
+        command_job.queue_settings = QueueSettings._from_rest_object(rest_command_job.queue_settings)
+        if isinstance(command_job.component, CommandComponent):
+            command_job.component._source = (
+                ComponentSource.REMOTE_WORKSPACE_JOB
+            )  # This is used by pipeline job telemetries.
+
+        # Handle special case of local job
+        if (
+            command_job.resources is not None
+            and command_job.resources.properties is not None
+            and command_job.resources.properties.get(LOCAL_COMPUTE_PROPERTY, None)
+        ):
+            command_job.compute = LOCAL_COMPUTE_TARGET
+            command_job.resources.properties.pop(LOCAL_COMPUTE_PROPERTY)
+        return command_job
+
+    def _build_inputs(self) -> Dict:
+        inputs = super(Command, self)._build_inputs()
+        built_inputs = {}
+        # Validate and remove non-specified inputs
+        for key, value in inputs.items():
+            if value is not None:
+                built_inputs[key] = value
+
+        return built_inputs
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline import CommandSchema
+
+        return CommandSchema(context=context)
+
+    # pylint: disable-next=docstring-missing-param
+    def __call__(self, *args: Any, **kwargs: Any) -> "Command":
+        """Call Command as a function will return a new instance each time.
+
+        :return: A Command node
+        :rtype: Command
+        """
+        if isinstance(self._component, CommandComponent):
+            # call this to validate inputs
+            node: Command = self._component(*args, **kwargs)
+            # merge inputs
+            for name, original_input in self.inputs.items():
+                if name not in kwargs:
+                    # use setattr here to make sure owner of input won't change
+                    setattr(node.inputs, name, original_input._data)
+                    node._job_inputs[name] = original_input._data
+                # get outputs
+            for name, original_output in self.outputs.items():
+                # use setattr here to make sure owner of input won't change
+                if not isinstance(original_output, str):
+                    setattr(node.outputs, name, original_output._data)
+                    node._job_outputs[name] = original_output._data
+            self._refine_optional_inputs_with_no_value(node, kwargs)
+            # set default values: compute, environment_variables, outputs
+            # won't copy name to be able to distinguish if a node's name is assigned by user
+            # e.g. node_1 = command_func()
+            # In above example, node_1.name will be None so we can apply node_1 as it's name
+            node.compute = self.compute
+            node.tags = self.tags
+            # Pass through the display name only if the display name is not system generated.
+            node.display_name = self.display_name if self.display_name != self.name else None
+            node.environment = copy.deepcopy(self.environment)
+            # deep copy for complex object
+            node.environment_variables = copy.deepcopy(self.environment_variables)
+            node.limits = copy.deepcopy(self.limits)
+            node.distribution = copy.deepcopy(self.distribution)
+            node.resources = copy.deepcopy(self.resources)
+            node.queue_settings = copy.deepcopy(self.queue_settings)
+            node.services = copy.deepcopy(self.services)
+            node.identity = copy.deepcopy(self.identity)
+            return node
+        msg = "Command can be called as a function only when referenced component is {}, currently got {}."
+        raise ValidationException(
+            message=msg.format(type(CommandComponent), self._component),
+            no_personal_data_message=msg.format(type(CommandComponent), "self._component"),
+            target=ErrorTarget.COMMAND_JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+
+
+@overload
+def _resolve_job_services(services: Optional[Dict]): ...
+
+
+@overload
+def _resolve_job_services(
+    services: Dict[str, Union[JobServiceBase, Dict]],
+) -> Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]]: ...
+
+
+def _resolve_job_services(
+    services: Optional[Dict[str, Union[JobServiceBase, Dict]]],
+) -> Optional[Dict]:
+    """Resolve normal dict to dict[str, JobService]
+
+    :param services: A dict that maps service names to either a JobServiceBase object, or a Dict used to build one
+    :type services: Optional[Dict[str, Union[JobServiceBase, Dict]]]
+    :return:
+        * None if `services` is None
+        * A map of job service names to job services
+    :rtype: Optional[
+            Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]]
+        ]
+    """
+    if services is None:
+        return None
+
+    if not isinstance(services, dict):
+        msg = f"Services must be a dict, got {type(services)} instead."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.COMMAND_JOB,
+            error_category=ErrorCategory.USER_ERROR,
+        )
+
+    result = {}
+    for name, service in services.items():
+        if isinstance(service, dict):
+            service = load_from_dict(JobServiceSchema, service, context={BASE_PATH_CONTEXT_KEY: "."})
+        elif not isinstance(
+            service, (JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService)
+        ):
+            msg = f"Service value for key {name!r} must be a dict or JobService object, got {type(service)} instead."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.COMMAND_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        result[name] = service
+    return result
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py
new file mode 100644
index 00000000..c542f880
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py
@@ -0,0 +1,314 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import os
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+from azure.ai.ml.constants._common import AssetTypes, LegacyAssetTypes
+from azure.ai.ml.constants._component import ComponentSource
+from azure.ai.ml.entities._assets.environment import Environment
+from azure.ai.ml.entities._component.command_component import CommandComponent
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+)
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.distribution import (
+    DistributionConfiguration,
+    MpiDistribution,
+    PyTorchDistribution,
+    RayDistribution,
+    TensorFlowDistribution,
+)
+from azure.ai.ml.entities._job.job_service import (
+    JobService,
+    JupyterLabJobService,
+    SshJobService,
+    TensorBoardJobService,
+    VsCodeJobService,
+)
+from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
+from azure.ai.ml.entities._job.sweep.search_space import SweepDistribution
+from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException
+
+from .command import Command
+
+SUPPORTED_INPUTS = [
+    LegacyAssetTypes.PATH,
+    AssetTypes.URI_FILE,
+    AssetTypes.URI_FOLDER,
+    AssetTypes.CUSTOM_MODEL,
+    AssetTypes.MLFLOW_MODEL,
+    AssetTypes.MLTABLE,
+    AssetTypes.TRITON_MODEL,
+]
+
+
+def _parse_input(input_value: Union[Input, Dict, SweepDistribution, str, bool, int, float]) -> Tuple:
+    component_input = None
+    job_input: Optional[Union[Input, Dict, SweepDistribution, str, bool, int, float]] = None
+
+    if isinstance(input_value, Input):
+        component_input = Input(**input_value._to_dict())
+        input_type = input_value.type
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value._to_dict())
+    elif isinstance(input_value, dict):
+        # if user provided dict, we try to parse it to Input.
+        # for job input, only parse for path type
+        input_type = input_value.get("type", None)
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value)
+        component_input = Input(**input_value)
+    elif isinstance(input_value, (SweepDistribution, str, bool, int, float)):
+        # Input bindings are not supported
+        component_input = ComponentTranslatableMixin._to_input_builder_function(input_value)
+        job_input = input_value
+    else:
+        msg = f"Unsupported input type: {type(input_value)}"
+        msg += ", only Input, dict, str, bool, int and float are supported."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+    return component_input, job_input
+
+
+def _parse_output(output_value: Optional[Union[Output, Dict, str]]) -> Tuple:
+    component_output = None
+    job_output: Optional[Union[Output, Dict, str]] = None
+
+    if isinstance(output_value, Output):
+        component_output = Output(**output_value._to_dict())
+        job_output = Output(**output_value._to_dict())
+    elif not output_value:
+        # output value can be None or empty dictionary
+        # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder
+        component_output = ComponentTranslatableMixin._to_output(output_value)
+        job_output = output_value
+    elif isinstance(output_value, dict):  # When output value is a non-empty dictionary
+        job_output = Output(**output_value)
+        component_output = Output(**output_value)
+    elif isinstance(output_value, str):  # When output is passed in from pipeline job yaml
+        job_output = output_value
+    else:
+        msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+    return component_output, job_output
+
+
+def _parse_inputs_outputs(io_dict: Dict, parse_func: Callable) -> Tuple[Dict, Dict]:
+    component_io_dict, job_io_dict = {}, {}
+    if io_dict:
+        for key, val in io_dict.items():
+            component_io, job_io = parse_func(val)
+            component_io_dict[key] = component_io
+            job_io_dict[key] = job_io
+    return component_io_dict, job_io_dict
+
+
+def command(
+    *,
+    name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    properties: Optional[Dict] = None,
+    display_name: Optional[str] = None,
+    command: Optional[str] = None,  # pylint: disable=redefined-outer-name
+    experiment_name: Optional[str] = None,
+    environment: Optional[Union[str, Environment]] = None,
+    environment_variables: Optional[Dict] = None,
+    distribution: Optional[
+        Union[
+            Dict,
+            MpiDistribution,
+            TensorFlowDistribution,
+            PyTorchDistribution,
+            RayDistribution,
+            DistributionConfiguration,
+        ]
+    ] = None,
+    compute: Optional[str] = None,
+    inputs: Optional[Dict] = None,
+    outputs: Optional[Dict] = None,
+    instance_count: Optional[int] = None,
+    instance_type: Optional[str] = None,
+    locations: Optional[List[str]] = None,
+    docker_args: Optional[Union[str, List[str]]] = None,
+    shm_size: Optional[str] = None,
+    timeout: Optional[int] = None,
+    code: Optional[Union[str, os.PathLike]] = None,
+    identity: Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]] = None,
+    is_deterministic: bool = True,
+    services: Optional[
+        Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]]
+    ] = None,
+    job_tier: Optional[str] = None,
+    priority: Optional[str] = None,
+    parent_job_name: Optional[str] = None,
+    **kwargs: Any,
+) -> Command:
+    """Creates a Command object which can be used inside a dsl.pipeline function or used as a standalone Command job.
+
+    :keyword name: The name of the Command job or component.
+    :paramtype name: Optional[str]
+    :keyword description: The description of the Command. Defaults to None.
+    :paramtype description: Optional[str]
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None.
+    :paramtype tags: Optional[dict[str, str]]
+    :keyword properties: The job property dictionary. Defaults to None.
+    :paramtype properties: Optional[dict[str, str]]
+    :keyword display_name: The display name of the job. Defaults to a randomly generated name.
+    :paramtype display_name: Optional[str]
+    :keyword command: The command to be executed. Defaults to None.
+    :paramtype command: Optional[str]
+    :keyword experiment_name: The name of the experiment that the job will be created under. Defaults to current
+        directory name.
+    :paramtype experiment_name: Optional[str]
+    :keyword environment: The environment that the job will run in.
+    :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+    :keyword environment_variables: A dictionary of environment variable names and values.
+        These environment variables are set on the process where user script is being executed.
+        Defaults to None.
+    :paramtype environment_variables: Optional[dict[str, str]]
+    :keyword distribution: The configuration for distributed jobs. Defaults to None.
+    :paramtype distribution: Optional[Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
+        ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]]
+    :keyword compute: The compute target the job will run on. Defaults to default compute.
+    :paramtype compute: Optional[str]
+    :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None.
+    :paramtype inputs: Optional[dict[str, Union[~azure.ai.ml.Input, str, bool, int, float, Enum]]]
+    :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None.
+    :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
+    :keyword instance_count: The number of instances or nodes to be used by the compute target. Defaults to 1.
+    :paramtype instance_count: Optional[int]
+    :keyword instance_type: The type of VM to be used by the compute target.
+    :paramtype instance_type: Optional[str]
+    :keyword locations: The list of locations where the job will run.
+    :paramtype locations: Optional[List[str]]
+    :keyword docker_args: Extra arguments to pass to the Docker run command. This would override any
+        parameters that have already been set by the system, or in this section. This parameter is only
+        supported for Azure ML compute types. Defaults to None.
+    :paramtype docker_args: Optional[Union[str,List[str]]]
+    :keyword shm_size: The size of the Docker container's shared memory block. This should be in the
+        format of (number)(unit) where the number has to be greater than 0 and the unit can be one of
+        b(bytes), k(kilobytes), m(megabytes), or g(gigabytes).
+    :paramtype shm_size: Optional[str]
+    :keyword timeout: The number, in seconds, after which the job will be cancelled.
+    :paramtype timeout: Optional[int]
+    :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url
+        pointing to a remote location.
+    :paramtype code: Optional[Union[str, os.PathLike]]
+    :keyword identity: The identity that the command job will use while running on compute.
+    :paramtype identity: Optional[Union[
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration]]
+    :keyword is_deterministic: Specifies whether the Command will return the same output given the same input.
+        Defaults to True. When True, if a Command Component is deterministic and has been run before in the
+        current workspace with the same input and settings, it will reuse results from a previously submitted
+        job when used as a node or step in a pipeline. In that scenario, no compute resources will be used.
+    :paramtype is_deterministic: bool
+    :keyword services: The interactive services for the node. Defaults to None. This is an experimental parameter,
+        and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
+    :paramtype services: Optional[dict[str, Union[~azure.ai.ml.entities.JobService,
+        ~azure.ai.ml.entities.JupyterLabJobService, ~azure.ai.ml.entities.SshJobService,
+        ~azure.ai.ml.entities.TensorBoardJobService, ~azure.ai.ml.entities.VsCodeJobService]]]
+    :keyword job_tier: The job tier. Accepted values are "Spot", "Basic", "Standard", or "Premium".
+    :paramtype job_tier: Optional[str]
+    :keyword priority: The priority of the job on the compute. Accepted values are "low", "medium", and "high".
+        Defaults to "medium".
+    :paramtype priority: Optional[str]
+    :keyword parent_job_name: parent job id for command job
+    :paramtype parent_job_name: Optional[str]
+    :return: A Command object.
+    :rtype: ~azure.ai.ml.entities.Command
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_command_configurations.py
+            :start-after: [START command_function]
+            :end-before: [END command_function]
+            :language: python
+            :dedent: 8
+            :caption: Creating a Command Job using the command() builder method.
+    """
+    # pylint: disable=too-many-locals
+    inputs = inputs or {}
+    outputs = outputs or {}
+    component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+
+    component = kwargs.pop("component", None)
+    if component is None:
+        component = CommandComponent(
+            name=name,
+            tags=tags,
+            code=code,
+            command=command,
+            environment=environment,
+            display_name=display_name,
+            description=description,
+            inputs=component_inputs,
+            outputs=component_outputs,
+            distribution=distribution,
+            environment_variables=environment_variables,
+            _source=ComponentSource.BUILDER,
+            is_deterministic=is_deterministic,
+            **kwargs,
+        )
+    command_obj = Command(
+        component=component,
+        name=name,
+        description=description,
+        tags=tags,
+        properties=properties,
+        display_name=display_name,
+        experiment_name=experiment_name,
+        compute=compute,
+        inputs=job_inputs,
+        outputs=job_outputs,
+        identity=identity,
+        distribution=distribution,
+        environment=environment,
+        environment_variables=environment_variables,
+        services=services,
+        parent_job_name=parent_job_name,
+        **kwargs,
+    )
+
+    if (
+        locations is not None
+        or instance_count is not None
+        or instance_type is not None
+        or docker_args is not None
+        or shm_size is not None
+    ):
+        command_obj.set_resources(
+            locations=locations,
+            instance_count=instance_count,
+            instance_type=instance_type,
+            docker_args=docker_args,
+            shm_size=shm_size,
+        )
+
+    if timeout is not None:
+        command_obj.set_limits(timeout=timeout)
+
+    if job_tier is not None or priority is not None:
+        command_obj.set_queue_settings(job_tier=job_tier, priority=priority)
+
+    return command_obj
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/condition_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/condition_node.py
new file mode 100644
index 00000000..5a5ad58b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/condition_node.py
@@ -0,0 +1,146 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, List, Optional
+
+from azure.ai.ml._schema import PathAwareSchema
+from azure.ai.ml._utils.utils import is_data_binding_expression
+from azure.ai.ml.constants._component import ControlFlowType
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
+from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+from azure.ai.ml.entities._job.pipeline._io import InputOutputBase
+from azure.ai.ml.entities._validation import MutableValidationResult
+
+
+class ConditionNode(ControlFlowNode):
+    """Conditional node in the pipeline.
+
+    Please do not directly use this class.
+
+    :param condition: The condition for the conditional node.
+    :type condition: Any
+    :param true_block: The list of nodes to execute when the condition is true.
+    :type true_block: List[~azure.ai.ml.entities._builders.BaseNode]
+    :param false_block: The list of nodes to execute when the condition is false.
+    :type false_block: List[~azure.ai.ml.entities._builders.BaseNode]
+    """
+
+    def __init__(
+        self, condition: Any, *, true_block: Optional[List] = None, false_block: Optional[List] = None, **kwargs: Any
+    ) -> None:
+        kwargs.pop("type", None)
+        super(ConditionNode, self).__init__(type=ControlFlowType.IF_ELSE, **kwargs)
+        self.condition = condition
+        if true_block and not isinstance(true_block, list):
+            true_block = [true_block]
+        self._true_block = true_block
+        if false_block and not isinstance(false_block, list):
+            false_block = [false_block]
+        self._false_block = false_block
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
+        from azure.ai.ml._schema.pipeline.condition_node import ConditionNodeSchema
+
+        return ConditionNodeSchema(context=context)
+
+    @classmethod
+    def _from_rest_object(cls, obj: dict) -> "ConditionNode":
+        return cls(**obj)
+
+    @classmethod
+    def _create_instance_from_schema_dict(cls, loaded_data: Dict) -> "ConditionNode":
+        """Create a condition node instance from schema parsed dict.
+
+        :param loaded_data: The loaded data
+        :type loaded_data: Dict
+        :return: The ConditionNode node
+        :rtype: ConditionNode
+        """
+        return cls(**loaded_data)
+
+    @property
+    def true_block(self) -> Optional[List]:
+        """Get the list of nodes to execute when the condition is true.
+
+        :return: The list of nodes to execute when the condition is true.
+        :rtype: List[~azure.ai.ml.entities._builders.BaseNode]
+        """
+        return self._true_block
+
+    @property
+    def false_block(self) -> Optional[List]:
+        """Get the list of nodes to execute when the condition is false.
+
+        :return: The list of nodes to execute when the condition is false.
+        :rtype: List[~azure.ai.ml.entities._builders.BaseNode]
+        """
+        return self._false_block
+
+    def _customized_validate(self) -> MutableValidationResult:
+        return self._validate_params()
+
+    def _validate_params(self) -> MutableValidationResult:
+        # pylint disable=protected-access
+        validation_result = self._create_empty_validation_result()
+        if not isinstance(self.condition, (str, bool, InputOutputBase)):
+            validation_result.append_error(
+                yaml_path="condition",
+                message=f"'condition' of dsl.condition node must be an instance of "
+                f"{str}, {bool} or {InputOutputBase}, got {type(self.condition)}.",
+            )
+
+        # Check if output is a control output.
+        # pylint: disable=protected-access
+        if isinstance(self.condition, InputOutputBase) and self.condition._meta is not None:
+            # pylint: disable=protected-access
+            output_definition = self.condition._meta
+            if output_definition is not None and not output_definition._is_primitive_type:
+                validation_result.append_error(
+                    yaml_path="condition",
+                    message=f"'condition' of dsl.condition node must be primitive type "
+                    f"with value 'True', got {output_definition._is_primitive_type}",
+                )
+
+        # check if condition is valid binding
+        if isinstance(self.condition, str) and not is_data_binding_expression(
+            self.condition, ["parent"], is_singular=False
+        ):
+            error_tail = "for example, ${{parent.jobs.xxx.outputs.output}}"
+            validation_result.append_error(
+                yaml_path="condition",
+                message=f"'condition' of dsl.condition has invalid binding expression: {self.condition}, {error_tail}",
+            )
+
+        error_msg = (
+            "{!r} of dsl.condition node must be an instance of " f"{BaseNode}, {AutoMLJob} or {str}," "got {!r}."
+        )
+        blocks = self.true_block if self.true_block else []
+        for block in blocks:
+            if block is not None and not isinstance(block, (BaseNode, AutoMLJob, str)):
+                validation_result.append_error(
+                    yaml_path="true_block", message=error_msg.format("true_block", type(block))
+                )
+        blocks = self.false_block if self.false_block else []
+        for block in blocks:
+            if block is not None and not isinstance(block, (BaseNode, AutoMLJob, str)):
+                validation_result.append_error(
+                    yaml_path="false_block", message=error_msg.format("false_block", type(block))
+                )
+
+        # check if true/false block is valid binding
+        for name, blocks in {"true_block": self.true_block, "false_block": self.false_block}.items():  # type: ignore
+            blocks = blocks if blocks else []
+            for block in blocks:
+                if block is None or not isinstance(block, str):
+                    continue
+                error_tail = "for example, ${{parent.jobs.xxx}}"
+                if not is_data_binding_expression(block, ["parent", "jobs"], is_singular=False):
+                    validation_result.append_error(
+                        yaml_path=name,
+                        message=f"'{name}' of dsl.condition has invalid binding expression: {block}, {error_tail}",
+                    )
+
+        return validation_result
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/control_flow_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/control_flow_node.py
new file mode 100644
index 00000000..c757a1e4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/control_flow_node.py
@@ -0,0 +1,170 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import logging
+import re
+import uuid
+from abc import ABC
+from typing import Any, Dict, Union, cast  # pylint: disable=unused-import
+
+from marshmallow import ValidationError
+
+from azure.ai.ml._utils.utils import is_data_binding_expression
+from azure.ai.ml.constants._common import CommonYamlFields
+from azure.ai.ml.constants._component import ComponentSource, ControlFlowType
+from azure.ai.ml.entities._mixins import YamlTranslatableMixin
+from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
+
+from .._util import convert_ordered_dict_to_dict
+from .base_node import BaseNode
+
+module_logger = logging.getLogger(__name__)
+
+
+# ControlFlowNode did not inherit from BaseNode since it doesn't have inputs/outputs like other nodes.
+class ControlFlowNode(YamlTranslatableMixin, PathAwareSchemaValidatableMixin, ABC):
+    """Base class for control flow node in the pipeline.
+
+    Please do not directly use this class.
+
+    :param kwargs: Additional keyword arguments.
+    :type kwargs: Dict[str, Union[Any]]
+    """
+
+    def __init__(self, **kwargs: Any) -> None:
+        # TODO(1979547): refactor this
+        _source = kwargs.pop("_source", None)
+        self._source = _source if _source else ComponentSource.DSL
+        _from_component_func = kwargs.pop("_from_component_func", False)
+        self._type = kwargs.get("type", None)
+        self._instance_id = str(uuid.uuid4())
+        self.name = None
+        if _from_component_func:
+            # add current control flow node in pipeline stack for dsl scenario and remove the body from the pipeline
+            # stack.
+            self._register_in_current_pipeline_component_builder()
+
+    @property
+    def type(self) -> Any:
+        """Get the type of the control flow node.
+
+        :return: The type of the control flow node.
+        :rtype: self._type
+        """
+        return self._type
+
+    def _to_dict(self) -> Dict:
+        return dict(self._dump_for_validation())
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:  # pylint: disable=unused-argument
+        """Convert self to a rest object for remote call.
+
+        :return: The rest object
+        :rtype: dict
+        """
+        rest_obj = self._to_dict()
+        rest_obj["_source"] = self._source
+        return cast(dict, convert_ordered_dict_to_dict(rest_obj))
+
+    def _register_in_current_pipeline_component_builder(self) -> None:
+        """Register this node in current pipeline component builder by adding self to a global stack."""
+        from azure.ai.ml.dsl._pipeline_component_builder import _add_component_to_current_definition_builder
+
+        _add_component_to_current_definition_builder(self)  # type: ignore[arg-type]
+
+    @classmethod
+    def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
+        return ValidationException(
+            message=message,
+            no_personal_data_message=no_personal_data_message,
+            target=ErrorTarget.PIPELINE,
+        )
+
+
+class LoopNode(ControlFlowNode, ABC):
+    """Base class for loop node in the pipeline.
+
+    Please do not directly use this class.
+
+    :param body: The body of the loop node.
+    :type body: ~azure.ai.ml.entities._builders.BaseNode
+    :param kwargs: Additional keyword arguments.
+    :type kwargs: Dict[str, Union[Any]]
+    """
+
+    def __init__(self, *, body: BaseNode, **kwargs: Any) -> None:
+        self._body = body
+        super(LoopNode, self).__init__(**kwargs)
+        # always set the referenced control flow node instance id to the body.
+        self.body._set_referenced_control_flow_node_instance_id(self._instance_id)
+
+    @property
+    def body(self) -> BaseNode:
+        """Get the body of the loop node.
+
+        :return: The body of the loop node.
+        :rtype: ~azure.ai.ml.entities._builders.BaseNode
+        """
+        return self._body
+
+    _extra_body_types = None
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        from .command import Command
+        from .pipeline import Pipeline
+
+        enable_body_type = (Command, Pipeline)
+        if cls._extra_body_types is not None:
+            enable_body_type = enable_body_type + cls._extra_body_types
+        return {
+            "body": enable_body_type,
+        }
+
+    @classmethod
+    def _get_body_from_pipeline_jobs(cls, pipeline_jobs: Dict[str, BaseNode], body_name: str) -> BaseNode:
+        # Get body object from pipeline job list.
+        if body_name not in pipeline_jobs:
+            raise ValidationError(
+                message=f'Cannot find the do-while loop body "{body_name}" in the pipeline.',
+                target=ErrorTarget.PIPELINE,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+        return pipeline_jobs[body_name]
+
+    def _validate_body(self) -> MutableValidationResult:
+        # pylint: disable=protected-access
+        validation_result = self._create_empty_validation_result()
+
+        if self._instance_id != self.body._referenced_control_flow_node_instance_id:
+            # When the body is used in another loop node record the error message in validation result.
+            validation_result.append_error("body", "The body of loop node cannot be promoted as another loop again.")
+        return validation_result
+
+    def _get_body_binding_str(self) -> str:
+        return "${{parent.jobs.%s}}" % self.body.name
+
+    @staticmethod
+    def _get_data_binding_expression_value(expression: str, regex: str) -> str:
+        try:
+            if is_data_binding_expression(expression):
+                return str(re.findall(regex, expression)[0])
+
+            return expression
+        except Exception:  # pylint: disable=W0718
+            module_logger.warning("Cannot get the value from data binding expression %s.", expression)
+            return expression
+
+    @staticmethod
+    def _is_loop_node_dict(obj: Any) -> bool:
+        return obj.get(CommonYamlFields.TYPE, None) in [ControlFlowType.DO_WHILE, ControlFlowType.PARALLEL_FOR]
+
+    @classmethod
+    def _from_rest_object(cls, obj: dict, pipeline_jobs: dict) -> "LoopNode":
+        from azure.ai.ml.entities._job.pipeline._load_component import pipeline_node_factory
+
+        node_type = obj.get(CommonYamlFields.TYPE, None)
+        load_from_rest_obj_func = pipeline_node_factory.get_load_from_rest_object_func(_type=node_type)
+        return load_from_rest_obj_func(obj, pipeline_jobs)  # type: ignore
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py
new file mode 100644
index 00000000..83e88a48
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py
@@ -0,0 +1,575 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple, Union, cast
+
+from marshmallow import Schema
+
+from azure.ai.ml._restclient.v2022_10_01_preview.models import JobBase
+from azure.ai.ml._schema.job.data_transfer_job import (
+    DataTransferCopyJobSchema,
+    DataTransferExportJobSchema,
+    DataTransferImportJobSchema,
+)
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AssetTypes
+from azure.ai.ml.constants._component import DataTransferTaskType, ExternalDataType, NodeType
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._component.datatransfer_component import (
+    DataTransferComponent,
+    DataTransferCopyComponent,
+    DataTransferExportComponent,
+    DataTransferImportComponent,
+)
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
+from azure.ai.ml.entities._job.data_transfer.data_transfer_job import (
+    DataTransferCopyJob,
+    DataTransferExportJob,
+    DataTransferImportJob,
+)
+from azure.ai.ml.entities._validation.core import MutableValidationResult
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
+
+from ..._schema import PathAwareSchema
+from .._job.pipeline._io import NodeOutput
+from .._util import convert_ordered_dict_to_dict, load_from_dict, validate_attribute_type
+from .base_node import BaseNode
+
+module_logger = logging.getLogger(__name__)
+
+
+def _build_source_sink(io_dict: Optional[Union[Dict, Database, FileSystem]]) -> Optional[Union[Database, FileSystem]]:
+    if io_dict is None:
+        return io_dict
+    if isinstance(io_dict, (Database, FileSystem)):
+        component_io = io_dict
+    else:
+        if isinstance(io_dict, dict):
+            data_type = io_dict.pop("type", None)
+            if data_type == ExternalDataType.DATABASE:
+                component_io = Database(**io_dict)
+            elif data_type == ExternalDataType.FILE_SYSTEM:
+                component_io = FileSystem(**io_dict)
+            else:
+                msg = "Type in source or sink only support {} and {}, currently got {}."
+                raise ValidationException(
+                    message=msg.format(
+                        ExternalDataType.DATABASE,
+                        ExternalDataType.FILE_SYSTEM,
+                        data_type,
+                    ),
+                    no_personal_data_message=msg.format(
+                        ExternalDataType.DATABASE,
+                        ExternalDataType.FILE_SYSTEM,
+                        "data_type",
+                    ),
+                    target=ErrorTarget.DATA_TRANSFER_JOB,
+                    error_category=ErrorCategory.USER_ERROR,
+                    error_type=ValidationErrorType.INVALID_VALUE,
+                )
+        else:
+            msg = "Source or sink only support dict, Database and FileSystem"
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.DATA_TRANSFER_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+
+    return component_io
+
+
+class DataTransfer(BaseNode):
+    """Base class for data transfer node, used for data transfer component version consumption.
+
+    You should not instantiate this class directly.
+    """
+
+    def __init__(
+        self,
+        *,
+        component: Union[str, DataTransferCopyComponent, DataTransferImportComponent],
+        compute: Optional[str] = None,
+        inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None,
+        outputs: Optional[Dict[str, Union[str, Output]]] = None,
+        **kwargs: Any,
+    ):
+        # resolve normal dict to dict[str, JobService]
+        kwargs.pop("type", None)
+        super().__init__(
+            type=NodeType.DATA_TRANSFER,
+            inputs=inputs,
+            outputs=outputs,
+            component=component,
+            compute=compute,
+            **kwargs,
+        )
+
+    @property
+    def component(self) -> Union[str, DataTransferComponent]:
+        res: Union[str, DataTransferComponent] = self._component
+        return res
+
+    @classmethod
+    def _load_from_rest_job(cls, obj: JobBase) -> "DataTransfer":
+        # Todo: need update rest api
+        raise NotImplementedError("Not support submit standalone job for now")
+
+    @classmethod
+    def _get_supported_outputs_types(cls) -> Tuple:
+        return str, Output
+
+    def _build_inputs(self) -> Dict:
+        inputs = super(DataTransfer, self)._build_inputs()
+        built_inputs = {}
+        # Validate and remove non-specified inputs
+        for key, value in inputs.items():
+            if value is not None:
+                built_inputs[key] = value
+
+        return built_inputs
+
+
+@experimental
+class DataTransferCopy(DataTransfer):
+    """Base class for data transfer copy node.
+
+    You should not instantiate this class directly. Instead, you should
+    create from builder function: copy_data.
+
+    :param component: Id or instance of the data transfer component/job to be run for the step
+    :type component: DataTransferCopyComponent
+    :param inputs: Inputs to the data transfer.
+    :type inputs: Dict[str, Union[NodeOutput, Input, str]]
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: Dict[str, Union[str, Output, dict]]
+    :param name: Name of the data transfer.
+    :type name: str
+    :param description: Description of the data transfer.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict[str, str]
+    :param display_name: Display name of the job.
+    :type display_name: str
+    :param experiment_name:  Name of the experiment the job will be created under,
+        if None is provided, default will be set to current directory name.
+    :type experiment_name: str
+    :param compute: The compute target the job runs on.
+    :type compute: str
+    :param data_copy_mode: data copy mode in copy task, possible value is "merge_with_overwrite", "fail_if_conflict".
+    :type data_copy_mode: str
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferCopy cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        component: Union[str, DataTransferCopyComponent],
+        compute: Optional[str] = None,
+        inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None,
+        outputs: Optional[Dict[str, Union[str, Output]]] = None,
+        data_copy_mode: Optional[str] = None,
+        **kwargs: Any,
+    ):
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+        super().__init__(
+            inputs=inputs,
+            outputs=outputs,
+            component=component,
+            compute=compute,
+            **kwargs,
+        )
+        # init mark for _AttrDict
+        self._init = True
+        self.task = DataTransferTaskType.COPY_DATA
+        self.data_copy_mode = data_copy_mode
+        is_component = isinstance(component, DataTransferCopyComponent)
+        if is_component:
+            _component: DataTransferCopyComponent = cast(DataTransferCopyComponent, component)
+            self.task = _component.task or self.task
+            self.data_copy_mode = _component.data_copy_mode or self.data_copy_mode
+        self._init = False
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "component": (str, DataTransferCopyComponent),
+        }
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline import DataTransferCopySchema
+
+        return DataTransferCopySchema(context=context)
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return ["type", "task", "data_copy_mode"]
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj = super()._to_rest_object(**kwargs)
+        for key, value in {
+            "componentId": self._get_component_id(),
+            "data_copy_mode": self.data_copy_mode,
+        }.items():
+            if value is not None:
+                rest_obj[key] = value
+        return cast(dict, convert_ordered_dict_to_dict(rest_obj))
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> Any:
+        from .data_transfer_func import copy_data
+
+        loaded_data = load_from_dict(DataTransferCopyJobSchema, data, context, additional_message, **kwargs)
+        data_transfer_job = copy_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
+
+        return data_transfer_job
+
+    def _to_job(self) -> DataTransferCopyJob:
+        return DataTransferCopyJob(
+            experiment_name=self.experiment_name,
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            tags=self.tags,
+            status=self.status,
+            inputs=self._job_inputs,
+            outputs=self._job_outputs,
+            services=self.services,
+            compute=self.compute,
+            data_copy_mode=self.data_copy_mode,
+        )
+
+    # pylint: disable-next=docstring-missing-param
+    def __call__(self, *args: Any, **kwargs: Any) -> "DataTransferCopy":
+        """Call DataTransferCopy as a function will return a new instance each time.
+
+        :return: A DataTransferCopy node
+        :rtype: DataTransferCopy
+        """
+        if isinstance(self._component, Component):
+            # call this to validate inputs
+            node: DataTransferCopy = self._component(*args, **kwargs)
+            # merge inputs
+            for name, original_input in self.inputs.items():
+                if name not in kwargs:
+                    # use setattr here to make sure owner of input won't change
+                    setattr(node.inputs, name, original_input._data)
+                    node._job_inputs[name] = original_input._data
+                # get outputs
+            for name, original_output in self.outputs.items():
+                # use setattr here to make sure owner of input won't change
+                if not isinstance(original_output, str):
+                    setattr(node.outputs, name, original_output._data)
+            self._refine_optional_inputs_with_no_value(node, kwargs)
+            # set default values: compute, environment_variables, outputs
+            node._name = self.name
+            node.compute = self.compute
+            node.tags = self.tags
+            # Pass through the display name only if the display name is not system generated.
+            node.display_name = self.display_name if self.display_name != self.name else None
+            return node
+        msg = "copy_data can be called as a function only when referenced component is {}, currently got {}."
+        raise ValidationException(
+            message=msg.format(type(Component), self._component),
+            no_personal_data_message=msg.format(type(Component), "self._component"),
+            target=ErrorTarget.DATA_TRANSFER_JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+
+
+@experimental
+class DataTransferImport(DataTransfer):
+    """Base class for data transfer import node.
+
+    You should not instantiate this class directly. Instead, you should
+    create from builder function: import_data.
+
+    :param component: Id of the data transfer built in component to be run for the step
+    :type component: str
+    :param source: The data source of file system or database
+    :type source: Union[Dict, Database, FileSystem]
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: Dict[str, Union[str, Output, dict]]
+    :param name: Name of the data transfer.
+    :type name: str
+    :param description: Description of the data transfer.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict[str, str]
+    :param display_name: Display name of the job.
+    :type display_name: str
+    :param experiment_name:  Name of the experiment the job will be created under,
+        if None is provided, default will be set to current directory name.
+    :type experiment_name: str
+    :param compute: The compute target the job runs on.
+    :type compute: str
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferImport cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        component: Union[str, DataTransferImportComponent],
+        compute: Optional[str] = None,
+        source: Optional[Union[Dict, Database, FileSystem]] = None,
+        outputs: Optional[Dict[str, Union[str, Output]]] = None,
+        **kwargs: Any,
+    ):
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+        super(DataTransferImport, self).__init__(
+            component=component,
+            outputs=outputs,
+            compute=compute,
+            **kwargs,
+        )
+        # init mark for _AttrDict
+        self._init = True
+        self.task = DataTransferTaskType.IMPORT_DATA
+        is_component = isinstance(component, DataTransferImportComponent)
+        if is_component:
+            _component: DataTransferImportComponent = cast(DataTransferImportComponent, component)
+            self.task = _component.task or self.task
+        self.source = _build_source_sink(source)
+        self._init = False
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "component": (str, DataTransferImportComponent),
+        }
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline import DataTransferImportSchema
+
+        return DataTransferImportSchema(context=context)
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return ["type", "task", "source"]
+
+    def _customized_validate(self) -> MutableValidationResult:
+        result = super()._customized_validate()
+        if self.source is None:
+            result.append_error(
+                yaml_path="source",
+                message="Source is a required field for import data task in DataTransfer job",
+            )
+        if len(self.outputs) != 1 or list(self.outputs.keys())[0] != "sink":
+            result.append_error(
+                yaml_path="outputs.sink",
+                message="Outputs field only support one output called sink in import task",
+            )
+        if (
+            "sink" in self.outputs
+            and not isinstance(self.outputs["sink"], str)
+            and isinstance(self.outputs["sink"]._data, Output)
+        ):
+            sink_output = self.outputs["sink"]._data
+            if self.source is not None:
+
+                if (self.source.type == ExternalDataType.DATABASE and sink_output.type != AssetTypes.MLTABLE) or (
+                    self.source.type == ExternalDataType.FILE_SYSTEM and sink_output.type != AssetTypes.URI_FOLDER
+                ):
+                    result.append_error(
+                        yaml_path="outputs.sink.type",
+                        message="Outputs field only support type {} for {} and {} for {}".format(
+                            AssetTypes.MLTABLE,
+                            ExternalDataType.DATABASE,
+                            AssetTypes.URI_FOLDER,
+                            ExternalDataType.FILE_SYSTEM,
+                        ),
+                    )
+        return result
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj = super()._to_rest_object(**kwargs)
+        for key, value in {
+            "componentId": self._get_component_id(),
+        }.items():
+            if value is not None:
+                rest_obj[key] = value
+        return cast(dict, convert_ordered_dict_to_dict(rest_obj))
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DataTransferImport":
+        from .data_transfer_func import import_data
+
+        loaded_data = load_from_dict(DataTransferImportJobSchema, data, context, additional_message, **kwargs)
+        data_transfer_job: DataTransferImport = import_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
+
+        return data_transfer_job
+
+    def _to_job(self) -> DataTransferImportJob:
+        return DataTransferImportJob(
+            experiment_name=self.experiment_name,
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            tags=self.tags,
+            status=self.status,
+            source=self.source,
+            outputs=self._job_outputs,
+            services=self.services,
+            compute=self.compute,
+        )
+
+
+@experimental
+class DataTransferExport(DataTransfer):
+    """Base class for data transfer export node.
+
+    You should not instantiate this class directly. Instead, you should
+    create from builder function: export_data.
+
+    :param component: Id of the data transfer built in component to be run for the step
+    :type component: str
+    :param sink: The sink of external data and databases.
+    :type sink: Union[Dict, Database, FileSystem]
+    :param inputs: Mapping of input data bindings used in the job.
+    :type inputs: Dict[str, Union[NodeOutput, Input, str, Input]]
+    :param name: Name of the data transfer.
+    :type name: str
+    :param description: Description of the data transfer.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict[str, str]
+    :param display_name: Display name of the job.
+    :type display_name: str
+    :param experiment_name:  Name of the experiment the job will be created under,
+        if None is provided, default will be set to current directory name.
+    :type experiment_name: str
+    :param compute: The compute target the job runs on.
+    :type compute: str
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferExport cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        component: Union[str, DataTransferCopyComponent, DataTransferImportComponent],
+        compute: Optional[str] = None,
+        sink: Optional[Union[Dict, Database, FileSystem]] = None,
+        inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None,
+        **kwargs: Any,
+    ):
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+        super(DataTransferExport, self).__init__(
+            component=component,
+            inputs=inputs,
+            compute=compute,
+            **kwargs,
+        )
+        # init mark for _AttrDict
+        self._init = True
+        self.task = DataTransferTaskType.EXPORT_DATA
+        is_component = isinstance(component, DataTransferExportComponent)
+        if is_component:
+            _component: DataTransferExportComponent = cast(DataTransferExportComponent, component)
+            self.task = _component.task or self.task
+        self.sink = sink
+        self._init = False
+
+    @property
+    def sink(self) -> Optional[Union[Dict, Database, FileSystem]]:
+        """The sink of external data and databases.
+
+        :return: The sink of external data and databases.
+        :rtype: Union[None, Database, FileSystem]
+        """
+        return self._sink
+
+    @sink.setter
+    def sink(self, value: Union[Dict, Database, FileSystem]) -> None:
+        self._sink = _build_source_sink(value)
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "component": (str, DataTransferExportComponent),
+        }
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline import DataTransferExportSchema
+
+        return DataTransferExportSchema(context=context)
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return ["type", "task", "sink"]
+
+    def _customized_validate(self) -> MutableValidationResult:
+        result = super()._customized_validate()
+        if self.sink is None:
+            result.append_error(
+                yaml_path="sink",
+                message="Sink is a required field for export data task in DataTransfer job",
+            )
+        if len(self.inputs) != 1 or list(self.inputs.keys())[0] != "source":
+            result.append_error(
+                yaml_path="inputs.source",
+                message="Inputs field only support one input called source in export task",
+            )
+        if "source" in self.inputs and isinstance(self.inputs["source"]._data, Input):
+            source_input = self.inputs["source"]._data
+            if self.sink is not None and not isinstance(self.sink, Dict):
+                if (self.sink.type == ExternalDataType.DATABASE and source_input.type != AssetTypes.URI_FILE) or (
+                    self.sink.type == ExternalDataType.FILE_SYSTEM and source_input.type != AssetTypes.URI_FOLDER
+                ):
+                    result.append_error(
+                        yaml_path="inputs.source.type",
+                        message="Inputs field only support type {} for {} and {} for {}".format(
+                            AssetTypes.URI_FILE,
+                            ExternalDataType.DATABASE,
+                            AssetTypes.URI_FOLDER,
+                            ExternalDataType.FILE_SYSTEM,
+                        ),
+                    )
+
+        return result
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj = super()._to_rest_object(**kwargs)
+        for key, value in {
+            "componentId": self._get_component_id(),
+        }.items():
+            if value is not None:
+                rest_obj[key] = value
+        return cast(dict, convert_ordered_dict_to_dict(rest_obj))
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DataTransferExport":
+        from .data_transfer_func import export_data
+
+        loaded_data = load_from_dict(DataTransferExportJobSchema, data, context, additional_message, **kwargs)
+        data_transfer_job: DataTransferExport = export_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
+
+        return data_transfer_job
+
+    def _to_job(self) -> DataTransferExportJob:
+        return DataTransferExportJob(
+            experiment_name=self.experiment_name,
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            tags=self.tags,
+            status=self.status,
+            sink=self.sink,
+            inputs=self._job_inputs,
+            services=self.services,
+            compute=self.compute,
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer_func.py
new file mode 100644
index 00000000..423c125b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer_func.py
@@ -0,0 +1,335 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+
+from typing import Any, Callable, Dict, Optional, Tuple, Union
+
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import AssetTypes, LegacyAssetTypes
+from azure.ai.ml.constants._component import ComponentSource, DataTransferBuiltinComponentUri, ExternalDataType
+from azure.ai.ml.entities._builders.base_node import pipeline_node_decorator
+from azure.ai.ml.entities._component.datatransfer_component import DataTransferCopyComponent
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
+from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
+from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException
+
+from .data_transfer import DataTransferCopy, DataTransferExport, DataTransferImport, _build_source_sink
+
+SUPPORTED_INPUTS = [
+    LegacyAssetTypes.PATH,
+    AssetTypes.URI_FILE,
+    AssetTypes.URI_FOLDER,
+    AssetTypes.CUSTOM_MODEL,
+    AssetTypes.MLFLOW_MODEL,
+    AssetTypes.MLTABLE,
+    AssetTypes.TRITON_MODEL,
+]
+
+
+def _parse_input(input_value: Union[Input, dict, str, PipelineInput, NodeOutput]) -> Tuple:
+    component_input = None
+    job_input: Union[Input, dict, str, PipelineInput, NodeOutput] = ""
+
+    if isinstance(input_value, Input):
+        component_input = Input(**input_value._to_dict())
+        input_type = input_value.type
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value._to_dict())
+    elif isinstance(input_value, dict):
+        # if user provided dict, we try to parse it to Input.
+        # for job input, only parse for path type
+        input_type = input_value.get("type", None)
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value)
+        component_input = Input(**input_value)
+    elif isinstance(input_value, str):
+        # Input bindings
+        component_input = ComponentTranslatableMixin._to_input_builder_function(input_value)
+        job_input = input_value
+    elif isinstance(input_value, (PipelineInput, NodeOutput)):
+        data: Any = None
+        # datatransfer node can accept PipelineInput/NodeOutput for export task.
+        if input_value._data is None or isinstance(input_value._data, Output):
+            data = Input(type=input_value.type, mode=input_value.mode)
+        else:
+            data = input_value._data
+        component_input, _ = _parse_input(data)
+        job_input = input_value
+    else:
+        msg = (
+            f"Unsupported input type: {type(input_value)}, only Input, dict, str, PipelineInput and NodeOutput are "
+            f"supported."
+        )
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+    return component_input, job_input
+
+
+def _parse_output(output_value: Union[Output, Dict]) -> Tuple:
+    component_output = None
+    job_output: Union[Output, Dict] = {}
+
+    if isinstance(output_value, Output):
+        component_output = Output(**output_value._to_dict())
+        job_output = Output(**output_value._to_dict())
+    elif not output_value:
+        # output value can be None or empty dictionary
+        # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder
+        component_output = ComponentTranslatableMixin._to_output(output_value)
+        job_output = output_value
+    elif isinstance(output_value, dict):  # When output value is a non-empty dictionary
+        job_output = Output(**output_value)
+        component_output = Output(**output_value)
+    elif isinstance(output_value, str):  # When output is passed in from pipeline job yaml
+        job_output = output_value
+    else:
+        msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+    return component_output, job_output
+
+
+def _parse_inputs_outputs(io_dict: Optional[Dict], parse_func: Callable) -> Tuple[Dict, Dict]:
+    component_io_dict, job_io_dict = {}, {}
+    if io_dict:
+        for key, val in io_dict.items():
+            component_io, job_io = parse_func(val)
+            component_io_dict[key] = component_io
+            job_io_dict[key] = job_io
+    return component_io_dict, job_io_dict
+
+
+@experimental
+def copy_data(
+    *,
+    name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    display_name: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    compute: Optional[str] = None,
+    inputs: Optional[Dict] = None,
+    outputs: Optional[Dict] = None,
+    is_deterministic: bool = True,
+    data_copy_mode: Optional[str] = None,
+    **kwargs: Any,
+) -> DataTransferCopy:
+    """Create a DataTransferCopy object which can be used inside dsl.pipeline as a function.
+
+    :keyword name: The name of the job.
+    :paramtype name: str
+    :keyword description: Description of the job.
+    :paramtype description: str
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+    :paramtype tags: dict[str, str]
+    :keyword display_name: Display name of the job.
+    :paramtype display_name: str
+    :keyword experiment_name:  Name of the experiment the job will be created under.
+    :paramtype experiment_name: str
+    :keyword compute: The compute resource the job runs on.
+    :paramtype compute: str
+    :keyword inputs: Mapping of inputs data bindings used in the job.
+    :paramtype inputs: dict
+    :keyword outputs: Mapping of outputs data bindings used in the job.
+    :paramtype outputs: dict
+    :keyword is_deterministic: Specify whether the command will return same output given same input.
+        If a command (component) is deterministic, when use it as a node/step in a pipeline,
+        it will reuse results from a previous submitted job in current workspace which has same inputs and settings.
+        In this case, this step will not use any compute resource.
+        Default to be True, specify is_deterministic=False if you would like to avoid such reuse behavior.
+    :paramtype is_deterministic: bool
+    :keyword data_copy_mode: data copy mode in copy task, possible value is "merge_with_overwrite", "fail_if_conflict".
+    :paramtype data_copy_mode: str
+    :return: A DataTransferCopy object.
+    :rtype: ~azure.ai.ml.entities._component.datatransfer_component.DataTransferCopyComponent
+    """
+    inputs = inputs or {}
+    outputs = outputs or {}
+    component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+    component = kwargs.pop("component", None)
+    if component is None:
+        component = DataTransferCopyComponent(
+            name=name,
+            tags=tags,
+            display_name=display_name,
+            description=description,
+            inputs=component_inputs,
+            outputs=component_outputs,
+            data_copy_mode=data_copy_mode,
+            _source=ComponentSource.BUILDER,
+            is_deterministic=is_deterministic,
+            **kwargs,
+        )
+    data_transfer_copy_obj = DataTransferCopy(
+        component=component,
+        name=name,
+        description=description,
+        tags=tags,
+        display_name=display_name,
+        experiment_name=experiment_name,
+        compute=compute,
+        inputs=job_inputs,
+        outputs=job_outputs,
+        data_copy_mode=data_copy_mode,
+        **kwargs,
+    )
+    return data_transfer_copy_obj
+
+
+@experimental
+@pipeline_node_decorator
+def import_data(
+    *,
+    name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    display_name: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    compute: Optional[str] = None,
+    source: Optional[Union[Dict, Database, FileSystem]] = None,
+    outputs: Optional[Dict] = None,
+    **kwargs: Any,
+) -> DataTransferImport:
+    """Create a DataTransferImport object which can be used inside dsl.pipeline.
+
+    :keyword name: The name of the job.
+    :paramtype name: str
+    :keyword description: Description of the job.
+    :paramtype description: str
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+    :paramtype tags: dict[str, str]
+    :keyword display_name: Display name of the job.
+    :paramtype display_name: str
+    :keyword experiment_name: Name of the experiment the job will be created under.
+    :paramtype experiment_name: str
+    :keyword compute: The compute resource the job runs on.
+    :paramtype compute: str
+    :keyword source: The data source of file system or database.
+    :paramtype source: Union[Dict, ~azure.ai.ml.entities._inputs_outputs.external_data.Database,
+        ~azure.ai.ml.entities._inputs_outputs.external_data.FileSystem]
+    :keyword outputs: Mapping of outputs data bindings used in the job.
+        The default will be an output port with the key "sink" and type "mltable".
+    :paramtype outputs: dict
+    :return: A DataTransferImport object.
+    :rtype: ~azure.ai.ml.entities._job.pipeline._component_translatable.DataTransferImport
+    """
+    source = _build_source_sink(source)
+    outputs = outputs or {"sink": Output(type=AssetTypes.MLTABLE)}
+    # # job inputs can not be None
+    # job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    _, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+    component = kwargs.pop("component", None)
+    update_source = False
+    if component is None:
+        if source and source.type == ExternalDataType.DATABASE:
+            component = DataTransferBuiltinComponentUri.IMPORT_DATABASE
+        else:
+            component = DataTransferBuiltinComponentUri.IMPORT_FILE_SYSTEM
+        update_source = True
+
+    data_transfer_import_obj = DataTransferImport(
+        component=component,
+        name=name,
+        description=description,
+        tags=tags,
+        display_name=display_name,
+        experiment_name=experiment_name,
+        compute=compute,
+        source=source,
+        outputs=job_outputs,
+        **kwargs,
+    )
+    if update_source:
+        data_transfer_import_obj._source = ComponentSource.BUILTIN
+
+    return data_transfer_import_obj
+
+
+@experimental
+@pipeline_node_decorator
+def export_data(
+    *,
+    name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    display_name: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    compute: Optional[str] = None,
+    sink: Optional[Union[Dict, Database, FileSystem]] = None,
+    inputs: Optional[Dict] = None,
+    **kwargs: Any,
+) -> DataTransferExport:
+    """Create a DataTransferExport object which can be used inside dsl.pipeline.
+
+    :keyword name: The name of the job.
+    :paramtype name: str
+    :keyword description: Description of the job.
+    :paramtype description: str
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated.
+    :paramtype tags: dict[str, str]
+    :keyword display_name: Display name of the job.
+    :paramtype display_name: str
+    :keyword experiment_name: Name of the experiment the job will be created under.
+    :paramtype experiment_name: str
+    :keyword compute: The compute resource the job runs on.
+    :paramtype compute: str
+    :keyword sink: The sink of external data and databases.
+    :paramtype sink: Union[
+        Dict,
+        ~azure.ai.ml.entities._inputs_outputs.external_data.Database,
+        ~azure.ai.ml.entities._inputs_outputs.external_data.FileSystem]
+    :keyword inputs: Mapping of inputs data bindings used in the job.
+    :paramtype inputs: dict
+    :return: A DataTransferExport object.
+    :rtype: ~azure.ai.ml.entities._job.pipeline._component_translatable.DataTransferExport
+    :raises ValidationException: If sink is not provided or exporting file system is not supported.
+    """
+    sink = _build_source_sink(sink)
+    _, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component = kwargs.pop("component", None)
+    update_source = False
+    if component is None:
+        if sink and sink.type == ExternalDataType.DATABASE:
+            component = DataTransferBuiltinComponentUri.EXPORT_DATABASE
+        else:
+            msg = "Sink is a required field for export data task and we don't support exporting file system for now."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.JOB,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+        update_source = True
+
+    data_transfer_export_obj = DataTransferExport(
+        component=component,
+        name=name,
+        description=description,
+        tags=tags,
+        display_name=display_name,
+        experiment_name=experiment_name,
+        compute=compute,
+        sink=sink,
+        inputs=job_inputs,
+        **kwargs,
+    )
+    if update_source:
+        data_transfer_export_obj._source = ComponentSource.BUILTIN
+
+    return data_transfer_export_obj
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/do_while.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/do_while.py
new file mode 100644
index 00000000..ecfd51ca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/do_while.py
@@ -0,0 +1,357 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import logging
+from typing import Any, Dict, Optional, Union
+
+from typing_extensions import Literal
+
+from azure.ai.ml._schema.pipeline.control_flow_job import DoWhileSchema
+from azure.ai.ml.constants._component import DO_WHILE_MAX_ITERATION, ControlFlowType
+from azure.ai.ml.entities._job.job_limits import DoWhileJobLimits
+from azure.ai.ml.entities._job.pipeline._io import InputOutputBase, NodeInput, NodeOutput
+from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
+from azure.ai.ml.entities._validation import MutableValidationResult
+
+from .._util import load_from_dict, validate_attribute_type
+from .base_node import BaseNode
+from .control_flow_node import LoopNode
+from .pipeline import Pipeline
+
+module_logger = logging.getLogger(__name__)
+
+
+class DoWhile(LoopNode):
+    """Do-while loop node in the pipeline job. By specifying the loop body and loop termination condition in this class,
+    a job-level do while loop can be implemented. It will be initialized when calling dsl.do_while or when loading the
+    pipeline yml containing do_while node. Please do not manually initialize this class.
+
+    :param body: Pipeline job for the do-while loop body.
+    :type body: ~azure.ai.ml.entities._builders.pipeline.Pipeline
+    :param condition: Boolean type control output of body as do-while loop condition.
+    :type condition: ~azure.ai.ml.entities.Output
+    :param mapping: Output-Input mapping for each round of the do-while loop.
+        Key is the last round output of the body. Value is the input port for the current body.
+    :type mapping: dict[Union[str, ~azure.ai.ml.entities.Output],
+        Union[str, ~azure.ai.ml.entities.Input, list]]
+    :param limits: Limits in running the do-while node.
+    :type limits: Union[dict, ~azure.ai.ml.entities._job.job_limits.DoWhileJobLimits]
+    :raises ValidationError: If the initialization parameters are not of valid types.
+    """
+
+    def __init__(
+        self,
+        *,
+        body: Union[Pipeline, BaseNode],
+        condition: Optional[Union[str, NodeInput, NodeOutput]],
+        mapping: Dict,
+        limits: Optional[Union[dict, DoWhileJobLimits]] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+        kwargs.pop("type", None)
+        super(DoWhile, self).__init__(
+            type=ControlFlowType.DO_WHILE,
+            body=body,
+            **kwargs,
+        )
+
+        # init mark for _AttrDict
+        self._init = True
+        self._mapping = mapping or {}
+        self._condition = condition
+        self._limits = limits
+        self._init = False
+
+    @property
+    def mapping(self) -> Dict:
+        """Get the output-input mapping for each round of the do-while loop.
+
+        :return: Output-Input mapping for each round of the do-while loop.
+        :rtype: dict[Union[str, ~azure.ai.ml.entities.Output],
+            Union[str, ~azure.ai.ml.entities.Input, list]]
+        """
+        return self._mapping
+
+    @property
+    def condition(self) -> Optional[Union[str, NodeInput, NodeOutput]]:
+        """Get the boolean type control output of the body as the do-while loop condition.
+
+        :return: Control output of the body as the do-while loop condition.
+        :rtype: ~azure.ai.ml.entities.Output
+        """
+        return self._condition
+
+    @property
+    def limits(self) -> Union[Dict, DoWhileJobLimits, None]:
+        """Get the limits in running the do-while node.
+
+        :return: Limits in running the do-while node.
+        :rtype: Union[dict, ~azure.ai.ml.entities._job.job_limits.DoWhileJobLimits]
+        """
+        return self._limits
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            **super(DoWhile, cls)._attr_type_map(),
+            "mapping": dict,
+            "limits": (dict, DoWhileJobLimits),
+        }
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DoWhile":
+        loaded_data = load_from_dict(DoWhileSchema, data, context, additional_message, **kwargs)
+
+        return cls(**loaded_data)
+
+    @classmethod
+    def _get_port_obj(
+        cls, body: BaseNode, port_name: str, is_input: bool = True, validate_port: bool = True
+    ) -> Union[str, NodeInput, NodeOutput]:
+        if is_input:
+            port = body.inputs.get(port_name, None)
+        else:
+            port = body.outputs.get(port_name, None)
+        if port is None:
+            if validate_port:
+                raise cls._create_validation_error(
+                    message=f"Cannot find {port_name} in do_while loop body {'inputs' if is_input else 'outputs'}.",
+                    no_personal_data_message=f"Miss port in do_while loop body {'inputs' if is_input else 'outputs'}.",
+                )
+            return port_name
+
+        res: Union[str, NodeInput, NodeOutput] = port
+        return res
+
+    @classmethod
+    def _create_instance_from_schema_dict(
+        cls, pipeline_jobs: Dict[str, BaseNode], loaded_data: Dict, validate_port: bool = True
+    ) -> "DoWhile":
+        """Create a do_while instance from schema parsed dict.
+
+        :param pipeline_jobs: The pipeline jobs
+        :type pipeline_jobs: Dict[str, BaseNode]
+        :param loaded_data: The loaded data
+        :type loaded_data: Dict
+        :param validate_port: Whether to raise if inputs/outputs are not present. Defaults to True
+        :type validate_port: bool
+        :return: The DoWhile node
+        :rtype: DoWhile
+        """
+
+        # Get body object from pipeline job list.
+        body_name = cls._get_data_binding_expression_value(loaded_data.pop("body"), regex=r"\{\{.*\.jobs\.(.*)\}\}")
+        body = cls._get_body_from_pipeline_jobs(pipeline_jobs, body_name)
+
+        # Convert mapping key-vault to input/output object
+        mapping = {}
+        for k, v in loaded_data.pop("mapping", {}).items():
+            output_name = cls._get_data_binding_expression_value(k, regex=r"\{\{.*\.%s\.outputs\.(.*)\}\}" % body_name)
+            input_names = v if isinstance(v, list) else [v]
+            input_names = [
+                cls._get_data_binding_expression_value(item, regex=r"\{\{.*\.%s\.inputs\.(.*)\}\}" % body_name)
+                for item in input_names
+            ]
+            mapping[output_name] = [cls._get_port_obj(body, item, validate_port=validate_port) for item in input_names]
+
+        limits = loaded_data.pop("limits", None)
+
+        if "condition" in loaded_data:
+            # Convert condition to output object
+            condition_name = cls._get_data_binding_expression_value(
+                loaded_data.pop("condition"), regex=r"\{\{.*\.%s\.outputs\.(.*)\}\}" % body_name
+            )
+            condition_value = cls._get_port_obj(body, condition_name, is_input=False, validate_port=validate_port)
+        else:
+            condition_value = None
+
+        do_while_instance = DoWhile(
+            body=body,
+            mapping=mapping,
+            condition=condition_value,
+            **loaded_data,
+        )
+        do_while_instance.set_limits(**limits)
+
+        return do_while_instance
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> DoWhileSchema:
+        return DoWhileSchema(context=context)
+
+    @classmethod
+    def _from_rest_object(cls, obj: dict, pipeline_jobs: dict) -> "DoWhile":
+        # pylint: disable=protected-access
+
+        obj = BaseNode._from_rest_object_to_init_params(obj)
+        return cls._create_instance_from_schema_dict(pipeline_jobs, obj, validate_port=False)
+
+    def set_limits(
+        self,
+        *,
+        max_iteration_count: int,
+        # pylint: disable=unused-argument
+        **kwargs: Any,
+    ) -> None:
+        """
+        Set the maximum iteration count for the do-while job.
+
+        The range of the iteration count is (0, 1000].
+
+        :keyword max_iteration_count: The maximum iteration count for the do-while job.
+        :paramtype max_iteration_count: int
+        """
+        if isinstance(self.limits, DoWhileJobLimits):
+            self.limits._max_iteration_count = max_iteration_count  # pylint: disable=protected-access
+        else:
+            self._limits = DoWhileJobLimits(max_iteration_count=max_iteration_count)
+
+    def _customized_validate(self) -> MutableValidationResult:
+        validation_result = self._validate_loop_condition()
+        validation_result.merge_with(self._validate_body())
+        validation_result.merge_with(self._validate_do_while_limit())
+        validation_result.merge_with(self._validate_body_output_mapping())
+        return validation_result
+
+    def _validate_port(
+        self,
+        port: Union[str, NodeInput, NodeOutput],
+        node_ports: Dict[str, Union[NodeInput, NodeOutput]],
+        port_type: Literal["input", "output"],
+        yaml_path: str,
+    ) -> MutableValidationResult:
+        """Validate input/output port is exist in the dowhile body.
+
+        :param port: Either:
+          * The name of an input or output
+          * An input object
+          * An output object
+        :type port: Union[str, NodeInput, NodeOutput],
+        :param node_ports: The node input/outputs
+        :type node_ports: Union[Dict[str, Union[NodeInput, NodeOutput]]]
+        :param port_type: The port type
+        :type port_type: Literal["input", "output"],
+        :param yaml_path: The yaml path
+        :type yaml_path: str,
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        validation_result = self._create_empty_validation_result()
+        if isinstance(port, str):
+            port_obj = node_ports.get(port, None)
+        else:
+            port_obj = port
+        if (
+            port_obj is not None
+            and port_obj._owner is not None  # pylint: disable=protected-access
+            and not isinstance(port_obj._owner, PipelineJob)  # pylint: disable=protected-access
+            and port_obj._owner._instance_id != self.body._instance_id  # pylint: disable=protected-access
+        ):
+            # Check the port owner is dowhile body.
+            validation_result.append_error(
+                yaml_path=yaml_path,
+                message=(
+                    f"{port_obj._port_name} is the {port_type} of {port_obj._owner.name}, "  # pylint: disable=protected-access
+                    f"dowhile only accept {port_type} of the body: {self.body.name}."
+                ),
+            )
+        elif port_obj is None or port_obj._port_name not in node_ports:  # pylint: disable=protected-access
+            # Check port is exist in dowhile body.
+            validation_result.append_error(
+                yaml_path=yaml_path,
+                message=(
+                    f"The {port_type} of mapping {port_obj._port_name if port_obj else port} does not "  # pylint: disable=protected-access
+                    f"exist in {self.body.name} {port_type}, existing {port_type}: {node_ports.keys()}"
+                ),
+            )
+        return validation_result
+
+    def _validate_loop_condition(self) -> MutableValidationResult:
+        # pylint: disable=protected-access
+        validation_result = self._create_empty_validation_result()
+        if self.condition is not None:
+            # Check condition exists in dowhile body.
+            validation_result.merge_with(
+                self._validate_port(self.condition, self.body.outputs, port_type="output", yaml_path="condition")
+            )
+            if validation_result.passed:
+                # Check condition is a control output.
+                condition_name = self.condition if isinstance(self.condition, str) else self.condition._port_name
+                if not self.body._outputs[condition_name]._is_primitive_type:
+                    validation_result.append_error(
+                        yaml_path="condition",
+                        message=(
+                            f"{condition_name} is not a control output and is not primitive type. "
+                            "The condition of dowhile must be the control output or primitive type of the body."
+                        ),
+                    )
+        return validation_result
+
+    def _validate_do_while_limit(self) -> MutableValidationResult:
+        validation_result = self._create_empty_validation_result()
+        if isinstance(self.limits, DoWhileJobLimits):
+            if not self.limits or self.limits.max_iteration_count is None:
+                return validation_result
+            if isinstance(self.limits.max_iteration_count, InputOutputBase):
+                validation_result.append_error(
+                    yaml_path="limit.max_iteration_count",
+                    message="The max iteration count cannot be linked with an primitive type input.",
+                )
+            elif self.limits.max_iteration_count > DO_WHILE_MAX_ITERATION or self.limits.max_iteration_count < 0:
+                validation_result.append_error(
+                    yaml_path="limit.max_iteration_count",
+                    message=f"The max iteration count cannot be less than 0 or larger than {DO_WHILE_MAX_ITERATION}.",
+                )
+        return validation_result
+
+    def _validate_body_output_mapping(self) -> MutableValidationResult:
+        # pylint disable=protected-access
+        validation_result = self._create_empty_validation_result()
+        if not isinstance(self.mapping, dict):
+            validation_result.append_error(
+                yaml_path="mapping", message=f"Mapping expects a dict type but passes in a {type(self.mapping)} type."
+            )
+        else:
+            # Record the mapping relationship between input and output
+            input_output_mapping: Dict = {}
+            # Validate mapping input&output should come from while body
+            for output, inputs in self.mapping.items():
+                # pylint: disable=protected-access
+                output_name = output if isinstance(output, str) else output._port_name
+                validate_results = self._validate_port(
+                    output, self.body.outputs, port_type="output", yaml_path="mapping"
+                )
+                if validate_results.passed:
+                    is_primitive_output = self.body._outputs[output_name]._is_primitive_type
+                    inputs = inputs if isinstance(inputs, list) else [inputs]
+                    for item in inputs:
+                        input_validate_results = self._validate_port(
+                            item, self.body.inputs, port_type="input", yaml_path="mapping"
+                        )
+                        validation_result.merge_with(input_validate_results)
+                        # pylint: disable=protected-access
+                        input_name = item if isinstance(item, str) else item._port_name
+                        input_output_mapping[input_name] = input_output_mapping.get(input_name, []) + [output_name]
+                        is_primitive_type = self.body._inputs[input_name]._meta._is_primitive_type
+
+                        if input_validate_results.passed and not is_primitive_output and is_primitive_type:
+                            validate_results.append_error(
+                                yaml_path="mapping",
+                                message=(
+                                    f"{output_name} is a non-primitive type output and {input_name} "
+                                    "is a primitive input. Non-primitive type output cannot be connected "
+                                    "to an a primitive type input."
+                                ),
+                            )
+
+                validation_result.merge_with(validate_results)
+            # Validate whether input is linked to multiple outputs
+            for _input, outputs in input_output_mapping.items():
+                if len(outputs) > 1:
+                    validation_result.append_error(
+                        yaml_path="mapping", message=f"Input {_input} has been linked to multiple outputs {outputs}."
+                    )
+        return validation_result
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/fl_scatter_gather.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/fl_scatter_gather.py
new file mode 100644
index 00000000..0ad6b0e2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/fl_scatter_gather.py
@@ -0,0 +1,886 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import re
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+from azure.ai.ml import Output
+from azure.ai.ml._schema import PathAwareSchema
+from azure.ai.ml._schema.pipeline.control_flow_job import FLScatterGatherSchema
+from azure.ai.ml.constants import JobType
+from azure.ai.ml.constants._common import AssetTypes
+from azure.ai.ml.dsl import pipeline
+from azure.ai.ml.dsl._do_while import do_while
+from azure.ai.ml.entities._assets.federated_learning_silo import FederatedLearningSilo
+from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
+from azure.ai.ml.entities._builders.pipeline import Pipeline
+from azure.ai.ml.entities._component.command_component import CommandComponent
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._inputs_outputs.input import Input
+from azure.ai.ml.entities._job.pipeline._io.mixin import NodeIOMixin
+from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
+from azure.ai.ml.entities._util import convert_ordered_dict_to_dict
+from azure.ai.ml.entities._validation import MutableValidationResult
+
+from .subcomponents import create_scatter_output_table
+
+# TODO 2293610: add support for more types of outputs besides uri_folder and mltable
+# Likely types that ought to be mergeable: string, int, uri_file
+MERGE_COMPONENT_MAPPING = {
+    "mltable": create_scatter_output_table,
+    "uri_folder": create_scatter_output_table,
+}
+
+
+ANCHORABLE_OUTPUT_TYPES = {AssetTypes.MLTABLE, AssetTypes.URI_FOLDER}
+
+ANCHORING_PATH_ROOT = "root"
+
+
+# big TODO: For some reason, surfacing this file in  __init__.py causes
+#  a circular import exception on the first attempted import
+# In notebooks, the second import succeeds, but then causes a silent failure where the
+# MLDesigner component created by the subcomponents.create_scatter_output_table function
+# will produce a ComponentExecutor object instead of the actual component.
+# TODO 2293541: Add telemetry of some sort
+# pylint: disable=too-many-instance-attributes
+class FLScatterGather(ControlFlowNode, NodeIOMixin):
+    """A node which creates a federated learning scatter-gather loop as a pipeline subgraph.
+    Intended for use inside a pipeline job. This is initialized when calling
+    `dsl.fl_scatter_gather()` or when loading a serialized version of this node from YAML.
+    Please do not manually initialize this class.
+
+    :param silo_configs: List of federated learning silo configurations.
+    :type silo_configs: List[~azure.ai.ml.entities._assets.federated_learning_silo.FederatedLearningSilo]
+    :param silo_component: Component representing the silo for federated learning.
+    :type silo_component: ~azure.ai.ml.entities.Component
+    :param aggregation_component: Component representing the aggregation step.
+    :type aggregation_component: ~azure.ai.ml.entities.Component
+    :param aggregation_compute: The compute resource for the aggregation step.
+    :type aggregation_compute: str
+    :param aggregation_datastore: The datastore for the aggregation step.
+    :type aggregation_datastore: str
+    :param shared_silo_kwargs: Keyword arguments shared across all silos.
+    :type shared_silo_kwargs: dict
+    :param aggregation_kwargs: Keyword arguments specific to the aggregation step.
+    :type aggregation_kwargs: dict
+    :param silo_to_aggregation_argument_map: Mapping of silo to aggregation arguments.
+    :type silo_to_aggregation_argument_map: dict
+    :param aggregation_to_silo_argument_map: Mapping of aggregation to silo arguments.
+    :type aggregation_to_silo_argument_map: dict
+    :param max_iterations: The maximum number of iterations for the scatter-gather loop.
+    :type max_iterations: int
+    :param create_default_mappings_if_needed: Whether to create default argument mappings if needed.
+    :type create_default_mappings_if_needed: bool
+    """
+
+    # See node class for input descriptions, no point maintaining
+    # double descriptions between a wrapper its interior.
+    def __init__(
+        self,
+        *,
+        silo_configs: List[FederatedLearningSilo],
+        silo_component: Component,
+        aggregation_component: Component,
+        aggregation_compute: Optional[str] = None,
+        aggregation_datastore: Optional[str] = None,
+        shared_silo_kwargs: Optional[Dict] = None,
+        aggregation_kwargs: Optional[Dict] = None,
+        silo_to_aggregation_argument_map: Optional[Dict] = None,
+        aggregation_to_silo_argument_map: Optional[Dict] = None,
+        max_iterations: int = 1,
+        create_default_mappings_if_needed: bool = False,
+        **kwargs: Any,
+    ) -> None:
+        # auto-create X_to_Y_argument_map values if allowed and needed.
+        if create_default_mappings_if_needed:
+            (
+                silo_to_aggregation_argument_map,
+                aggregation_to_silo_argument_map,
+            ) = FLScatterGather._try_create_default_mappings(
+                silo_component,
+                aggregation_component,
+                silo_to_aggregation_argument_map,
+                aggregation_to_silo_argument_map,
+            )
+
+        # input validation.
+        FLScatterGather.validate_inputs(
+            silo_configs=silo_configs,
+            silo_component=silo_component,
+            aggregation_component=aggregation_component,
+            shared_silo_kwargs=shared_silo_kwargs,
+            aggregation_compute=aggregation_compute,
+            aggregation_datastore=aggregation_datastore,
+            aggregation_kwargs=aggregation_kwargs,
+            silo_to_aggregation_argument_map=silo_to_aggregation_argument_map,
+            aggregation_to_silo_argument_map=aggregation_to_silo_argument_map,
+            max_iterations=max_iterations,
+        )
+
+        # store inputs
+        self.silo_configs = silo_configs
+        self.aggregation_compute = aggregation_compute
+        self.aggregation_datastore = aggregation_datastore
+        self.silo_component = silo_component
+        self.aggregation_component = aggregation_component
+        self.shared_silo_kwargs = shared_silo_kwargs
+        self.aggregation_kwargs = aggregation_kwargs
+        self.silo_to_aggregation_argument_map = silo_to_aggregation_argument_map
+        self.aggregation_to_silo_argument_map = aggregation_to_silo_argument_map
+        self.max_iterations = max_iterations
+        self._init = True  # Needed by parent class to work properly
+
+        self.scatter_gather_graph = self.scatter_gather()
+
+        # set SG node flag for telemetry
+        self.scatter_gather_graph.properties["azureml.telemetry.attribution"] = "FederatedLearningSGJobFlag"
+        self.scatter_gather_graph._to_rest_object()
+
+        # set output to final aggregation step's output
+        self._outputs = self.scatter_gather_graph.outputs
+        super(FLScatterGather, self).__init__(
+            type=JobType.COMPONENT,
+            component=None,
+            inputs=None,
+            outputs=self.scatter_gather_graph.outputs,
+            name=None,
+            display_name=None,
+            description=None,
+            tags=None,
+            properties=None,
+            comment=None,
+            compute=None,
+            experiment_name=None,
+        )
+
+    def scatter_gather(self) -> PipelineJob:
+        """Executes the scatter-gather loop by creating and executing a pipeline subgraph.
+        Returns the outputs of the final aggregation step.
+
+        :return: Outputs of the final aggregation step.
+        :rtype: list[~azure.ai.ml.Output]
+        """
+
+        @pipeline(
+            func=None,
+            name="Scatter gather",
+            description="It includes all steps that need to be executed in silo and aggregation",
+        )
+        # pylint: disable-next=docstring-missing-return,docstring-missing-rtype
+        def scatter_gather_iteration_body(**silo_inputs: Input) -> PipelineJob:
+            """
+                Performs a scatter-gather iteration by running copies of the silo step on different
+            computes/datstores according to this node's silo configs. The outputs of these
+            silo components are then merged by an internal helper component. The merged values
+            are then inputted into the user-provided aggregation component. Returns the executed aggregation component.
+
+            Kwargs are a dictionary of names and Inputs to be injected into each executed silo step. This dictionary is
+            merged with silo-specific inputs before each executed.
+            """
+
+            silo_outputs = []
+            # TODO 2293586 replace this for-loop with a parallel-for node
+            for silo_config in self.silo_configs:
+                silo_inputs.update(silo_config.inputs)
+                executed_silo_component = self.silo_component(**silo_inputs)
+                for v, k in executed_silo_component.inputs.items():
+                    if v in silo_config.inputs and k.type == "uri_folder":
+                        k.mode = "ro_mount"
+                FLScatterGather._anchor_step(
+                    pipeline_step=executed_silo_component,
+                    compute=silo_config.compute,
+                    internal_datastore=silo_config.datastore,
+                    orchestrator_datastore=self.aggregation_datastore,
+                )
+                # add to silo outputs list
+                silo_outputs.append(executed_silo_component)
+
+            # produce internal argument-merging components and record them in local subgraph
+            merge_comp_mapping = self._inject_merge_components(silo_outputs)
+
+            # produce aggregate step inputs by merging static kwargs and mapped arguments from
+            # internal merge components
+            agg_inputs: Dict = {}
+            if self.aggregation_kwargs is not None:
+                agg_inputs.update(self.aggregation_kwargs)
+            internal_merge_outputs = {
+                self._get_aggregator_input_name(k): v.outputs.aggregated_output for k, v in merge_comp_mapping.items()
+            }
+            agg_inputs.update(internal_merge_outputs)
+
+            # run the user aggregation step
+            executed_aggregation_component = self.aggregation_component(**agg_inputs)
+            # Set mode of aggregated mltable inputs as eval mount to allow files referenced within the table
+            # to be accessible by the component
+            for name, agg_input in executed_aggregation_component.inputs.items():
+                if (
+                    self.silo_to_aggregation_argument_map is not None
+                    and name in self.silo_to_aggregation_argument_map.values()
+                    and agg_input.type == "mltable"
+                ):
+                    agg_input.mode = "eval_download"
+
+            # Anchor both the internal merge components and the user-supplied aggregation step
+            # to the aggregation compute and datastore
+            if self.aggregation_compute is not None and self.aggregation_datastore is not None:
+                # internal merge component is also siloed to wherever the aggregation component lives.
+                for executed_merge_component in merge_comp_mapping.values():
+                    FLScatterGather._anchor_step(
+                        pipeline_step=executed_merge_component,
+                        compute=self.aggregation_compute,
+                        internal_datastore=self.aggregation_datastore,
+                        orchestrator_datastore=self.aggregation_datastore,
+                    )
+                FLScatterGather._anchor_step(
+                    pipeline_step=executed_aggregation_component,
+                    compute=self.aggregation_compute,
+                    internal_datastore=self.aggregation_datastore,
+                    orchestrator_datastore=self.aggregation_datastore,
+                )
+            res: PipelineJob = executed_aggregation_component.outputs
+            return res
+
+        @pipeline(func=None, name="Scatter gather graph")
+        # pylint: disable-next=docstring-missing-return,docstring-missing-rtype
+        def create_scatter_gather_graph() -> PipelineJob:
+            """
+            Creates a scatter-gather graph by executing the scatter_gather_iteration_body
+            function in a do-while loop. The loop terminates when the user-supplied
+            termination condition is met.
+            """
+
+            silo_inputs: Dict = {}
+            if self.shared_silo_kwargs is not None:
+                # Start with static inputs
+                silo_inputs.update(self.shared_silo_kwargs)
+
+            # merge in inputs passed in from previous iteration's aggregate step)
+            if self.aggregation_to_silo_argument_map is not None:
+                silo_inputs.update({v: None for v in self.aggregation_to_silo_argument_map.values()})
+
+            scatter_gather_body = scatter_gather_iteration_body(**silo_inputs)
+
+            # map aggregation outputs to scatter inputs
+            if self.aggregation_to_silo_argument_map is not None:
+                do_while_mapping = {
+                    k: getattr(scatter_gather_body.inputs, v) for k, v in self.aggregation_to_silo_argument_map.items()
+                }
+
+            do_while(
+                body=scatter_gather_body,  # type: ignore[arg-type]
+                mapping=do_while_mapping,  # pylint: disable=possibly-used-before-assignment
+                max_iteration_count=self.max_iterations,
+            )
+            res_scatter: PipelineJob = scatter_gather_body.outputs  # type: ignore[assignment]
+            return res_scatter
+
+        res: PipelineJob = create_scatter_gather_graph()
+        return res
+
+    @classmethod
+    def _get_fl_datastore_path(
+        cls,
+        datastore_name: Optional[str],
+        output_name: str,
+        unique_id: str = "${{name}}",
+        iteration_num: Optional[int] = None,
+    ) -> str:
+        """Construct a path string using the inputted values. The important aspect is that this produces a
+        path with a specified datastore.
+
+        :param datastore_name: The datastore to use in the constructed path.
+        :type datastore_name: str
+        :param output_name: The name of the output value that this path is assumed to belong to.
+            Is injected into the path.
+        :type output_name: str
+        :param unique_id: An additional string to inject if needed. Defaults to ${{name}}, which is the
+            output name again.
+        :type unique_id: str
+        :param iteration_num: The iteration number of the current scatter-gather iteration.
+            If set, inject this into the resulting path string.
+        :type iteration_num: Optional[int]
+        :return: A data path string containing the various aforementioned inputs.
+        :rtype: str
+
+        """
+        data_path = f"azureml://datastores/{datastore_name}/paths/federated_learning/{output_name}/{unique_id}/"
+        if iteration_num:
+            data_path += f"iteration_{iteration_num}/"
+        return data_path
+
+    @classmethod
+    def _check_datastore(cls, path: str, expected_datastore: Optional[str]) -> bool:
+        """Perform a simple regex check to try determine if the datastore in the inputted path string
+        matches the expected_datastore.
+
+
+        :param path: An output pathstring.
+        :type path: str
+        :param expected_datastore: A datastore name.
+        :type expected_datastore: str
+        :return: Whether or not the expected_datastore was found in the path at the expected location.
+        :rtype: bool
+        """
+        match = re.match("(.*datastore/)([^/]*)(/.*)", path)
+        if match:
+            groups = match.groups()
+            if groups[1] == expected_datastore:
+                return True
+        return False
+
+    @classmethod
+    def _check_or_set_datastore(
+        cls,
+        name: str,
+        output: Output,
+        target_datastore: Optional[str],
+        iteration_num: Optional[int] = None,
+    ) -> MutableValidationResult:
+        """Tries to assign output.path to a value which includes the target_datastore if it's not already
+        set. If the output's path is already set, return a warning if it doesn't match the target_datastore.
+
+        :param name: The name of the output to modify
+        :type name: str
+        :param output: The output object to examine and potentially change the datastore of.
+        :type output: Output
+        :param target_datastore: The name of the datastore to try applying to the output
+        :type target_datastore: str
+        :param iteration_num: the current iteration in the scatter gather loop. If set, include this in the generated
+            path.
+        :type iteration_num: Optional[int]
+        :return: A validation result containing any problems that arose. Contains a warning if the examined output
+            already contains a datastore that does not match 'target_datastore'.
+        :rtype: MutableValidationResult
+        """
+        validation_result = cls._create_empty_validation_result()
+        if not hasattr(output, "path") or not output.path:
+            output.path = cls._get_fl_datastore_path(target_datastore, name, iteration_num=iteration_num)
+        # Double check the path's datastore leads to the target if it's already set.
+        elif not cls._check_datastore(output.path, target_datastore):
+            validation_result.append_warning(
+                yaml_path=name,
+                message=f"Output '{name}' has an undetermined datastore, or a datstore"
+                + f" that does not match the expected datastore for this output, which is '{target_datastore}'."
+                + " Make sure this is intended.",
+            )
+        return validation_result
+
+    # TODO 2293705: Add anchoring for more resource types.
+    @classmethod
+    def _anchor_step(
+        cls,
+        pipeline_step: Union[Pipeline, CommandComponent],
+        compute: str,
+        internal_datastore: str,
+        orchestrator_datastore: Optional[str],
+        iteration: Optional[int] = 0,
+        _path: str = "root",
+    ) -> MutableValidationResult:
+        """Take a pipeline step and recursively enforces the right compute/datastore config.
+
+        :param pipeline_step: a step to anchor
+        :type pipeline_step: Union[Pipeline, CommandComponent]
+        :param compute: name of the compute target
+        :type compute: str
+        :param internal_datastore: The name of the datastore that should be used for internal output anchoring.
+        :type internal_datastore: str
+        :param orchestrator_datastore: The name of the orchestrator/aggregation datastore that should be used for
+            'real' output anchoring.
+        :type orchestrator_datastore: str
+        :param iteration: The current iteration number in the scatter gather loop. Defaults to 0.
+        :type iteration: Optional[int]
+        :param _path: for recursive anchoring, codes the "path" inside the pipeline for messaging
+        :type _path: str
+        :return: A validation result containing any issues that were uncovered during anchoring. This function adds
+            warnings when outputs already have assigned paths which don't contain the expected datastore.
+        :rtype: MutableValidationResult
+        """
+
+        validation_result = cls._create_empty_validation_result()
+
+        # Current step is a pipeline, which means we need to inspect its steps (jobs) and
+        # potentially anchor those as well.
+        if pipeline_step.type == "pipeline":
+            if hasattr(pipeline_step, "component"):
+                # Current step is probably not the root of the graph
+                # its outputs should be anchored to the internal_datastore.
+                for name, output in pipeline_step.outputs.items():
+                    if not isinstance(output, str):
+                        if output.type in ANCHORABLE_OUTPUT_TYPES:
+                            validation_result.merge_with(
+                                cls._check_or_set_datastore(
+                                    name=name,
+                                    output=output,
+                                    target_datastore=orchestrator_datastore,
+                                    iteration_num=iteration,
+                                )
+                            )
+
+                # then we need to anchor the internal component of this step
+                # The outputs of this sub-component are a deep copy of the outputs of this step
+                # This is dangerous, and we need to make sure they both use the same datastore,
+                # so we keep datastore types identical across this recursive call.
+                cls._anchor_step(
+                    pipeline_step.component,  # type: ignore
+                    compute,
+                    internal_datastore=internal_datastore,
+                    orchestrator_datastore=orchestrator_datastore,
+                    _path=f"{_path}.component",
+                )
+
+            else:
+                # This is a pipeline step with multiple jobs beneath it.
+                # Anchor its outputs...
+                for name, output in pipeline_step.outputs.items():
+                    if not isinstance(output, str):
+                        if output.type in ANCHORABLE_OUTPUT_TYPES:
+                            validation_result.merge_with(
+                                cls._check_or_set_datastore(
+                                    name=name,
+                                    output=output,
+                                    target_datastore=orchestrator_datastore,
+                                    iteration_num=iteration,
+                                )
+                            )
+                # ...then recursively anchor each job inside the pipeline
+                if not isinstance(pipeline_step, CommandComponent):
+                    for job_key in pipeline_step.jobs:
+                        job = pipeline_step.jobs[job_key]
+                        # replace orchestrator with internal datastore, jobs components
+                        # should either use the local datastore
+                        # or have already had their outputs re-assigned.
+                        cls._anchor_step(
+                            job,
+                            compute,
+                            internal_datastore=internal_datastore,
+                            orchestrator_datastore=internal_datastore,
+                            _path=f"{_path}.jobs.{job_key}",
+                        )
+
+        elif pipeline_step.type == "command":
+            # if the current step is a command component
+            # make sure the compute corresponds to the silo
+            if not isinstance(pipeline_step, CommandComponent) and pipeline_step.compute is None:
+                pipeline_step.compute = compute
+            # then anchor each of the job's outputs
+            for name, output in pipeline_step.outputs.items():
+                if not isinstance(output, str):
+                    if output.type in ANCHORABLE_OUTPUT_TYPES:
+                        validation_result.merge_with(
+                            cls._check_or_set_datastore(
+                                name=name,
+                                output=output,
+                                target_datastore=orchestrator_datastore,
+                                iteration_num=iteration,
+                            )
+                        )
+        else:
+            # TODO revisit this and add support for anchoring more things
+            raise NotImplementedError(f"under path={_path}: step type={pipeline_step.type} is not supported")
+
+        return validation_result
+
+    # Making this a class method allows for easier, isolated testing, and allows careful
+    # users to call this as a pre-init step.
+    # TODO: Might be worth migrating this to a schema validation class, but out of scope for now.
+    # pylint: disable=too-many-statements,too-many-branches, too-many-locals
+    @classmethod
+    def validate_inputs(
+        cls,
+        *,
+        silo_configs: List[FederatedLearningSilo],
+        silo_component: Component,
+        aggregation_component: Component,
+        shared_silo_kwargs: Optional[Dict],
+        aggregation_compute: Optional[str],
+        aggregation_datastore: Optional[str],
+        aggregation_kwargs: Optional[Dict],
+        silo_to_aggregation_argument_map: Optional[Dict],
+        aggregation_to_silo_argument_map: Optional[Dict],
+        max_iterations: int,
+        raise_error: bool = False,
+    ) -> MutableValidationResult:
+        """Validates the inputs for the scatter-gather node.
+
+        :keyword silo_configs: List of federated learning silo configurations.
+        :paramtype silo_configs: List[~azure.ai.ml.entities._assets.federated_learning_silo.FederatedLearningSilo]
+        :keyword silo_component: Component representing the silo for federated learning.
+        :paramtype silo_component: ~azure.ai.ml.entities.Component
+        :keyword aggregation_component: Component representing the aggregation step.
+        :paramtype aggregation_component: ~azure.ai.ml.entities.Component
+        :keyword shared_silo_kwargs: Keyword arguments shared across all silos.
+        :paramtype shared_silo_kwargs: Dict
+        :keyword aggregation_compute: The compute resource for the aggregation step.
+        :paramtype aggregation_compute: str
+        :keyword aggregation_datastore: The datastore for the aggregation step.
+        :paramtype aggregation_datastore: str
+        :keyword aggregation_kwargs: Keyword arguments specific to the aggregation step.
+        :paramtype aggregation_kwargs: Dict
+        :keyword silo_to_aggregation_argument_map: Mapping of silo to aggregation arguments.
+        :paramtype silo_to_aggregation_argument_map: Dict
+        :keyword aggregation_to_silo_argument_map: Mapping of aggregation to silo arguments.
+        :paramtype aggregation_to_silo_argument_map: Dict
+        :keyword max_iterations: The maximum number of iterations for the scatter-gather loop.
+        :paramtype max_iterations: int
+        :keyword raise_error: Whether to raise an exception if validation fails. Defaults to False.
+        :paramtype raise_error: bool
+        :return: The validation result.
+        :rtype: ~azure.ai.ml.entities._validation.MutableValidationResult
+        """
+        validation_result = cls._create_empty_validation_result()
+
+        # saved values for validation later on
+        silo_inputs = None
+        silo_outputs = None
+        agg_inputs = None
+        agg_outputs = None
+        # validate silo component
+        if silo_component is None:
+            validation_result.append_error(
+                yaml_path="silo_component",
+                message="silo_component is a required argument for the scatter gather node.",
+            )
+        else:
+            # ensure that silo component has both inputs and outputs
+            if not hasattr(silo_component, "inputs"):
+                validation_result.append_error(
+                    yaml_path="silo_component",
+                    message="silo_component is missing 'inputs' attribute;"
+                    + "it does not appear to be a valid component that can be used in a scatter-gather loop.",
+                )
+            else:
+                silo_inputs = silo_component.inputs
+            if not hasattr(silo_component, "outputs"):
+                validation_result.append_error(
+                    yaml_path="silo_component",
+                    message="silo_component is missing 'outputs' attribute;"
+                    + "it does not appear to be a valid component that can be used in a scatter-gather loop.",
+                )
+            else:
+                silo_outputs = silo_component.outputs
+        # validate aggregation component
+        if aggregation_component is None:
+            validation_result.append_error(
+                yaml_path="aggregation_component",
+                message="aggregation_component is a required argument for the scatter gather node.",
+            )
+        else:
+            # ensure that aggregation component has both inputs and outputs
+            if not hasattr(aggregation_component, "inputs"):
+                validation_result.append_error(
+                    yaml_path="aggregation_component",
+                    message="aggregation_component is missing 'inputs' attribute;"
+                    + "it does not appear to be a valid component that can be used in a scatter-gather loop.",
+                )
+            else:
+                agg_inputs = aggregation_component.inputs
+            if not hasattr(aggregation_component, "outputs"):
+                validation_result.append_error(
+                    yaml_path="aggregation_component",
+                    message="aggregation_component is missing 'outputs' attribute;"
+                    + " it does not appear to be a valid component that can be used in a scatter-gather loop.",
+                )
+            else:
+                agg_outputs = aggregation_component.outputs
+
+        # validate silos configs
+        if silo_configs is None:
+            validation_result.append_error(
+                yaml_path="silo_configs",
+                message="silo_configs is a required argument for the scatter gather node.",
+            )
+        elif len(silo_configs) == 0:
+            validation_result.append_error(
+                yaml_path="silo_configs",
+                message="silo_configs cannot be an empty list.",
+            )
+        else:
+            first_silo = silo_configs[0]
+            expected_inputs: List = []
+            if hasattr(first_silo, "inputs"):
+                expected_inputs = first_silo.inputs.keys()  # type: ignore
+            num_expected_inputs = len(expected_inputs)
+            # pylint: disable=consider-using-enumerate
+            for i in range(len(silo_configs)):
+                silo = silo_configs[i]
+                if not hasattr(silo, "compute"):
+                    validation_result.append_error(
+                        yaml_path="silo_configs",
+                        message=f"Silo at index {i} in silo_configs is missing its compute value.",
+                    )
+                if not hasattr(silo, "datastore"):
+                    validation_result.append_error(
+                        yaml_path="silo_configs",
+                        message=f"Silo at index {i} in silo_configs is missing its datastore value.",
+                    )
+                silo_input_len = 0
+                if hasattr(silo, "inputs"):
+                    silo_input_len = len(silo.inputs)
+                    # if inputs exist, make sure the inputs names are consistent across silo configs
+                    for expected_input_name in expected_inputs:
+                        if expected_input_name not in silo.inputs:
+                            validation_result.append_error(
+                                yaml_path="silo_configs",
+                                message=f"Silo at index {i} has is missing inputs named '{expected_input_name}',"
+                                + "which was listed in the first silo config. "
+                                + "Silos must have consistent inputs names.",
+                            )
+                if silo_input_len != num_expected_inputs:
+                    validation_result.append_error(
+                        yaml_path="silo_configs",
+                        message=f"Silo at index {i} has {silo_input_len} inputs, but the first silo established that"
+                        + f"each silo would have {num_expected_inputs} silo-specific inputs.",
+                    )
+
+        # Make sure both aggregation overrides are set, or not
+        if aggregation_datastore is None and aggregation_compute is not None:
+            validation_result.append_error(
+                yaml_path="aggregation_datastore",
+                message="aggregation_datastore cannot be unset if aggregation_compute is set.",
+            )
+        elif aggregation_datastore is not None and aggregation_compute is None:
+            validation_result.append_error(
+                yaml_path="aggregation_compute",
+                message="aggregation_compute cannot be unset if aggregation_datastore is set.",
+            )
+
+        # validate component kwargs, ensuring that the relevant components contain the specified inputs
+        if shared_silo_kwargs is None:
+            validation_result.append_error(
+                yaml_path="shared_silo_kwargs",
+                message="shared_silo_kwargs should never be None. Input an empty dictionary instead.",
+            )
+        elif silo_inputs is not None:
+            for k in shared_silo_kwargs.keys():
+                if k not in silo_inputs:
+                    validation_result.append_error(
+                        yaml_path="shared_silo_kwargs",
+                        message=f"shared_silo_kwargs keyword {k} not listed in silo_component's inputs",
+                    )
+        if aggregation_kwargs is None:
+            validation_result.append_error(
+                yaml_path="aggregation_kwargs",
+                message="aggregation_kwargs should never be None. Input an empty dictionary instead.",
+            )
+        elif silo_inputs is not None:
+            for k in aggregation_kwargs.keys():
+                if agg_inputs is not None and k not in agg_inputs:
+                    validation_result.append_error(
+                        yaml_path="aggregation_kwargs",
+                        message=f"aggregation_kwargs keyword {k} not listed in aggregation_component's inputs",
+                    )
+
+        # validate that argument mappings leverage inputs and outputs that actually exist
+        if aggregation_to_silo_argument_map is None:
+            validation_result.append_error(
+                yaml_path="aggregation_to_silo_argument_map",
+                message="aggregation_to_silo_argument_map should never be None. Input an empty dictionary instead.",
+            )
+        elif silo_inputs is not None and agg_outputs is not None:
+            for k, v in aggregation_to_silo_argument_map.items():
+                if k not in agg_outputs:
+                    validation_result.append_error(
+                        yaml_path="aggregation_to_silo_argument_map",
+                        message=f"aggregation_to_silo_argument_map key {k} "
+                        + "is not a known output of the aggregation component.",
+                    )
+                if v not in silo_inputs:
+                    validation_result.append_error(
+                        yaml_path="aggregation_to_silo_argument_map",
+                        message=f"aggregation_to_silo_argument_map value {v} "
+                        + "is not a known input of the silo component.",
+                    )
+        # and check the other mapping
+        if silo_to_aggregation_argument_map is None:
+            validation_result.append_error(
+                yaml_path="silo_to_aggregation_argument_map",
+                message="silo_to_aggregation_argument_map should never be None. "
+                + "Input an empty dictionary instead.",
+            )
+        elif agg_inputs is not None and silo_outputs is not None:
+            for k, v in silo_to_aggregation_argument_map.items():
+                if k not in silo_outputs:
+                    validation_result.append_error(
+                        yaml_path="silo_to_aggregation_argument_map",
+                        message=f"silo_to_aggregation_argument_map key {k }"
+                        + " is not a known output of the silo component.",
+                    )
+                if v not in agg_inputs:
+                    validation_result.append_error(
+                        yaml_path="silo_to_aggregation_argument_map",
+                        message=f"silo_to_aggregation_argument_map value {v}"
+                        + " is not a known input of the aggregation component.",
+                    )
+
+        if max_iterations < 1:
+            validation_result.append_error(
+                yaml_path="max_iterations",
+                message=f"max_iterations must be a positive value, not '{max_iterations}'.",
+            )
+
+        return cls._try_raise(validation_result, raise_error=raise_error)
+
+    @classmethod
+    def _custom_fl_data_path(
+        cls,
+        datastore_name: str,
+        output_name: str,
+        unique_id: str = "${{name}}",
+        iteration_num: str = "${{iteration_num}}",
+    ) -> str:
+        """Produces a path to store the data during FL training.
+
+        :param datastore_name: name of the Azure ML datastore
+        :type datastore_name: str
+        :param output_name: a name unique to this output
+        :type output_name: str
+        :param unique_id: a unique id for the run (default: inject run id with ${{name}})
+        :type unique_id: str
+        :param iteration_num: an iteration number if relevant
+        :type iteration_num: str
+        :return: direct url to the data path to store the data
+        :rtype: str
+        """
+        data_path = f"azureml://datastores/{datastore_name}/paths/federated_learning/{output_name}/{unique_id}/"
+        if iteration_num is not None:
+            data_path += f"iteration_{iteration_num}/"
+
+        return data_path
+
+    def _get_aggregator_input_name(self, silo_output_name: str) -> Optional[str]:
+        """Retrieves the aggregator input name
+
+        :param silo_output_name: The silo output name
+        :type silo_output_name: str
+        :return:
+            * Returns aggregator input name that maps to silo_output.
+            * Returns None if silo_output_name not in silo_to_aggregation_argument_map
+        :rtype: Optional[str]
+        """
+        if self.silo_to_aggregation_argument_map is None:
+            return None
+
+        return self.silo_to_aggregation_argument_map.get(silo_output_name)
+
+    @classmethod
+    def _try_create_default_mappings(
+        cls,
+        silo_comp: Optional[Component],
+        agg_comp: Optional[Component],
+        silo_agg_map: Optional[Dict],
+        agg_silo_map: Optional[Dict],
+    ) -> Tuple[Optional[Dict], Optional[Dict]]:
+        """
+        This function tries to produce dictionaries that link the silo and aggregation
+        components' outputs to the other's inputs.
+        The mapping only occurs for inputted mappings that are None, otherwise
+        the inputted mapping is returned unchanged.
+        These auto-generated mappings are naive, and simply maps all outputs of a component that have a
+        identically-named input in the other component.
+
+        This function does nothing if either inputted component is None. This function will also do nothing
+        for a given mapping if either of the relevant inputs or outputs are None (but not empty).
+
+        Example inputs:
+            silo_comp.inputs = {"silo_input" : value }
+            silo_comp.outputs =  {"c" : ..., "silo_output2" : ... }
+            agg_comp.inputs = {"silo_output1" : ... }
+            agg_comp.outputs = {"agg_output" : ... }
+            silo_agg_map = None
+            agg_silo_map = {}
+
+        Example returns:
+            {"silo_output1" : "silo_output1"}, {}
+
+        :param silo_comp: The silo component
+        :type silo_comp: Optional[Component]
+        :param agg_comp: The aggregation component
+        :type agg_comp: Optional[Component]
+        :param silo_agg_map: Mapping of silo to aggregation arguments.
+        :type silo_agg_map: Optional[Dict]
+        :param agg_silo_map: Mapping of aggregation to silo arguments.
+        :type agg_silo_map: Optional[Dict]
+        :return: Returns a tuple of the potentially modified silo to aggregation mapping, followed by the aggregation
+            to silo mapping.
+        :rtype: Tuple[Optional[Dict], Optional[Dict]]
+        """
+        if silo_comp is None or agg_comp is None:
+            return silo_agg_map, agg_silo_map
+        if silo_agg_map is None and silo_comp.outputs is not None and agg_comp.inputs is not None:
+            silo_agg_map = {output: output for output in silo_comp.outputs.keys() if output in agg_comp.inputs}
+        if agg_silo_map is None:
+            agg_silo_map = {output: output for output in agg_comp.outputs.keys() if output in silo_comp.inputs}
+        return silo_agg_map, agg_silo_map
+
+    @staticmethod
+    # pylint: disable-next=docstring-missing-rtype
+    def _get_merge_component(output_type: str) -> Any:
+        """Gets the merge component to be used based on type of output
+
+        :param output_type: The output type
+        :type output_type: str
+        :return: The merge component
+        """
+        return MERGE_COMPONENT_MAPPING[output_type]
+
+    def _inject_merge_components(self, executed_silo_components: Any) -> Dict:
+        """Add a merge component for each silo output in the silo_to_aggregation_argument_map.
+            These merge components act as a mediator between the user silo and aggregation steps, reducing
+            the variable number of silo outputs into a single input for the aggergation step.
+
+        :param executed_silo_components: A list of executed silo steps to extract outputs from.
+        :type executed_silo_components:
+        :return: A mapping from silo output names to the corresponding newly created and executed merge component
+        :rtype: dict
+        """
+        executed_component = executed_silo_components[0]
+
+        merge_comp_mapping = {}
+        if self.silo_to_aggregation_argument_map is not None:
+            for (
+                silo_output_argument_name,
+                _,
+            ) in self.silo_to_aggregation_argument_map.items():
+                merge_comp = self._get_merge_component(executed_component.outputs[silo_output_argument_name].type)
+                merge_component_inputs = {
+                    silo_output_argument_name
+                    + "_silo_"
+                    + str(i): executed_silo_components[i].outputs[silo_output_argument_name]
+                    for i in range(0, len(executed_silo_components))
+                }
+                executed_merge_component = merge_comp(**merge_component_inputs)
+                for input_obj in executed_merge_component.inputs.values():
+                    input_obj.mode = "direct"
+                for output_obj in executed_merge_component.outputs.values():
+                    output_obj.type = "mltable"
+                merge_comp_mapping.update({silo_output_argument_name: executed_merge_component})
+
+        return merge_comp_mapping
+
+    # boilerplate functions - largely copied from other node builders
+
+    @property
+    def outputs(self) -> Dict[str, Union[str, Output]]:
+        """Get the outputs of the scatter-gather node.
+
+        :return: The outputs of the scatter-gather node.
+        :rtype: Dict[str, Union[str, ~azure.ai.ml.Output]]
+        """
+        return self._outputs
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
+        return FLScatterGatherSchema(context=context)
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        """Convert self to a rest object for remote call.
+
+        :return: The rest object
+        :rtype: dict
+        """
+        rest_node = super(FLScatterGather, self)._to_rest_object(**kwargs)
+        rest_node.update({"outputs": self._to_rest_outputs()})
+        # TODO: Bug Item number: 2897665
+        res: dict = convert_ordered_dict_to_dict(rest_node)  # type: ignore
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_func.py
new file mode 100644
index 00000000..c9ecabd8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_func.py
@@ -0,0 +1,93 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+from typing import Any, Dict, Optional
+
+from azure.ai.ml.constants._component import ComponentSource
+from azure.ai.ml.entities._component.import_component import ImportComponent
+from azure.ai.ml.entities._inputs_outputs import Output
+from azure.ai.ml.entities._job.import_job import ImportSource
+
+from .command_func import _parse_input, _parse_inputs_outputs, _parse_output
+from .import_node import Import
+
+
+def import_job(
+    *,
+    name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    display_name: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    source: Optional[ImportSource] = None,
+    output: Optional[Output] = None,
+    is_deterministic: bool = True,
+    **kwargs: Any,
+) -> Import:
+    """Create an Import object which can be used inside dsl.pipeline as a function
+    and can also be created as a standalone import job.
+
+    :keyword name: Name of the import job or component created.
+    :paramtype name: str
+    :keyword description: A friendly description of the import.
+    :paramtype description: str
+    :keyword tags: Tags to be attached to this import.
+    :paramtype tags: Dict
+    :keyword display_name: A friendly name.
+    :paramtype display_name: str
+    :keyword experiment_name: Name of the experiment the job will be created under.
+        If None is provided, the default will be set to the current directory name.
+        Will be ignored as a pipeline step.
+    :paramtype experiment_name: str
+    :keyword source: Input source parameters used by this import.
+    :paramtype source: ~azure.ai.ml.entities._job.import_job.ImportSource
+    :keyword output: The output of this import.
+    :paramtype output: ~azure.ai.ml.entities.Output
+    :keyword is_deterministic: Specify whether the command will return the same output given the same input.
+        If a command (component) is deterministic, when used as a node/step in a pipeline,
+        it will reuse results from a previously submitted job in the current workspace
+        which has the same inputs and settings.
+        In this case, this step will not use any compute resource.
+        Defaults to True.
+    :paramtype is_deterministic: bool
+    :returns: The Import object.
+    :rtype: ~azure.ai.ml.entities._builders.import_node.Import
+    """
+    inputs = source._to_job_inputs() if source else kwargs.pop("inputs")
+    outputs = {"output": output} if output else kwargs.pop("outputs")
+    component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+
+    component = kwargs.pop("component", None)
+
+    if component is None:
+        component = ImportComponent(
+            name=name,
+            tags=tags,
+            display_name=display_name,
+            description=description,
+            source=component_inputs,
+            output=component_outputs["output"],
+            _source=ComponentSource.BUILDER,
+            is_deterministic=is_deterministic,
+            **kwargs,
+        )
+
+    import_obj = Import(
+        component=component,
+        name=name,
+        description=description,
+        tags=tags,
+        display_name=display_name,
+        experiment_name=experiment_name,
+        inputs=job_inputs,
+        outputs=job_outputs,
+        **kwargs,
+    )
+
+    return import_obj
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_node.py
new file mode 100644
index 00000000..144753d5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_node.py
@@ -0,0 +1,205 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple, Type, Union
+
+from marshmallow import Schema
+
+from azure.ai.ml._restclient.v2022_02_01_preview.models import CommandJob as RestCommandJob
+from azure.ai.ml._restclient.v2022_02_01_preview.models import JobBaseData
+from azure.ai.ml._schema.job.import_job import ImportJobSchema
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.constants._component import ComponentSource, NodeType
+from azure.ai.ml.constants._compute import ComputeType
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._component.import_component import ImportComponent
+from azure.ai.ml.entities._inputs_outputs import Output
+from azure.ai.ml.entities._job._input_output_helpers import from_rest_data_outputs, from_rest_inputs_to_dataset_literal
+from azure.ai.ml.entities._job.import_job import ImportJob, ImportSource
+from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException
+
+from ..._schema import PathAwareSchema
+from .._inputs_outputs import Output
+from .._util import convert_ordered_dict_to_dict, load_from_dict, validate_attribute_type
+from .base_node import BaseNode
+
+module_logger = logging.getLogger(__name__)
+
+
+class Import(BaseNode):
+    """Base class for import node, used for import component version consumption.
+
+    You should not instantiate this class directly. Instead, you should
+    create from a builder function.
+
+    :param component: Id or instance of the import component/job to be run for the step.
+    :type component: ~azure.ai.ml.entities._component.import_component.ImportComponent
+    :param inputs: Input parameters to the import.
+    :type inputs: Dict[str, str]
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: Dict[str, Union[str, ~azure.ai.ml.entities.Output]]
+    :param name: Name of the import.
+    :type name: str
+    :param description: Description of the import.
+    :type description: str
+    :param display_name: Display name of the job.
+    :type display_name: str
+    :param experiment_name: Name of the experiment the job will be created under,
+        if None is provided, the default will be set to the current directory name.
+    :type experiment_name: str
+    """
+
+    def __init__(
+        self,
+        *,
+        component: Union[str, ImportComponent],
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+        kwargs.pop("type", None)
+        kwargs.pop("compute", None)
+
+        self._parameters = kwargs.pop("parameters", {})
+        BaseNode.__init__(
+            self,
+            type=NodeType.IMPORT,
+            inputs=inputs,
+            outputs=outputs,
+            component=component,
+            compute=ComputeType.ADF,
+            **kwargs,
+        )
+
+    @classmethod
+    def _get_supported_inputs_types(cls) -> Type[str]:
+        # import source parameters type, connection, query, path are always str
+        return str
+
+    @classmethod
+    def _get_supported_outputs_types(cls) -> Tuple:
+        return str, Output
+
+    @property
+    def component(self) -> Union[str, ImportComponent]:
+        res: Union[str, ImportComponent] = self._component
+        return res
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "component": (str, ImportComponent),
+        }
+
+    def _to_job(self) -> ImportJob:
+        return ImportJob(
+            id=self.id,
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            experiment_name=self.experiment_name,
+            status=self.status,
+            source=ImportSource._from_job_inputs(self._job_inputs),
+            output=self._job_outputs.get("output"),
+            creation_context=self.creation_context,
+        )
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return []
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj: dict = super()._to_rest_object(**kwargs)
+        rest_obj.update(
+            convert_ordered_dict_to_dict(
+                {
+                    "componentId": self._get_component_id(),
+                }
+            )
+        )
+        return rest_obj
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Import":
+        from .import_func import import_job
+
+        loaded_data = load_from_dict(ImportJobSchema, data, context, additional_message, **kwargs)
+
+        _import_job: Import = import_job(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
+
+        return _import_job
+
+    @classmethod
+    def _load_from_rest_job(cls, obj: JobBaseData) -> "Import":
+        from .import_func import import_job
+
+        rest_command_job: RestCommandJob = obj.properties
+        inputs = from_rest_inputs_to_dataset_literal(rest_command_job.inputs)
+        outputs = from_rest_data_outputs(rest_command_job.outputs)
+
+        _import_job: Import = import_job(
+            name=obj.name,
+            display_name=rest_command_job.display_name,
+            description=rest_command_job.description,
+            experiment_name=rest_command_job.experiment_name,
+            status=rest_command_job.status,
+            creation_context=obj.system_data,
+            inputs=inputs,
+            output=outputs["output"] if "output" in outputs else None,
+        )
+        _import_job._id = obj.id
+        if isinstance(_import_job.component, ImportComponent):
+            _import_job.component._source = (
+                ComponentSource.REMOTE_WORKSPACE_JOB
+            )  # This is used by pipeline job telemetries.
+
+        return _import_job
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline import ImportSchema
+
+        return ImportSchema(context=context)
+
+    # pylint: disable-next=docstring-missing-param
+    def __call__(self, *args: Any, **kwargs: Any) -> "Import":
+        """Call Import as a function will return a new instance each time.
+
+        :return: An Import node.
+        :rtype: Import
+        """
+        if isinstance(self._component, Component):
+            # call this to validate inputs
+            node: Import = self._component(*args, **kwargs)
+            # merge inputs
+            for name, original_input in self.inputs.items():
+                if name not in kwargs:
+                    # use setattr here to make sure owner of input won't change
+                    setattr(node.inputs, name, original_input._data)
+                    node._job_inputs[name] = original_input._data
+                # get outputs
+            for name, original_output in self.outputs.items():
+                # use setattr here to make sure owner of input won't change
+                if not isinstance(original_output, str):
+                    setattr(node.outputs, name, original_output._data)
+            self._refine_optional_inputs_with_no_value(node, kwargs)
+            # set default values: compute, environment_variables, outputs
+            node._name = self.name
+            node.compute = self.compute
+            node.tags = self.tags
+            # Pass through the display name only if the display name is not system generated.
+            node.display_name = self.display_name if self.display_name != self.name else None
+            return node
+        msg = "Import can be called as a function only when referenced component is {}, currently got {}."
+        raise ValidationException(
+            message=msg.format(type(Component), self._component),
+            no_personal_data_message=msg.format(type(Component), "self._component"),
+            target=ErrorTarget.COMMAND_JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel.py
new file mode 100644
index 00000000..db1de797
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel.py
@@ -0,0 +1,551 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import copy
+import json
+import logging
+import os
+import re
+from enum import Enum
+from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
+
+from marshmallow import INCLUDE, Schema
+
+from azure.ai.ml._schema.core.fields import NestedField, UnionField
+from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+    _BaseJobIdentityConfiguration,
+)
+from azure.ai.ml.entities._job.job import Job
+from azure.ai.ml.entities._job.parallel.run_function import RunFunction
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput
+from azure.ai.ml.exceptions import MlException
+
+from ..._schema import PathAwareSchema
+from ..._utils.utils import is_data_binding_expression
+from ...constants._common import ARM_ID_PREFIX
+from ...constants._component import NodeType
+from .._component.component import Component
+from .._component.flow import FlowComponent
+from .._component.parallel_component import ParallelComponent
+from .._inputs_outputs import Input, Output
+from .._job.job_resource_configuration import JobResourceConfiguration
+from .._job.parallel.parallel_job import ParallelJob
+from .._job.parallel.parallel_task import ParallelTask
+from .._job.parallel.retry_settings import RetrySettings
+from .._job.pipeline._io import NodeWithGroupInputMixin
+from .._util import convert_ordered_dict_to_dict, get_rest_dict_for_node_attrs, validate_attribute_type
+from .base_node import BaseNode
+
+module_logger = logging.getLogger(__name__)
+
+
+class Parallel(BaseNode, NodeWithGroupInputMixin):  # pylint: disable=too-many-instance-attributes
+    """Base class for parallel node, used for parallel component version consumption.
+
+    You should not instantiate this class directly. Instead, you should
+    create from builder function: parallel.
+
+    :param component: Id or instance of the parallel component/job to be run for the step
+    :type component: ~azure.ai.ml.entities._component.parallel_component.parallelComponent
+    :param name: Name of the parallel
+    :type name: str
+    :param description: Description of the commad
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated
+    :type tags: dict[str, str]
+    :param properties: The job property dictionary
+    :type properties: dict[str, str]
+    :param display_name: Display name of the job
+    :type display_name: str
+    :param retry_settings: Parallel job run failed retry
+    :type retry_settings: BatchRetrySettings
+    :param logging_level: A string of the logging level name
+    :type logging_level: str
+    :param max_concurrency_per_instance: The max parallellism that each compute instance has
+    :type max_concurrency_per_instance: int
+    :param error_threshold: The number of item processing failures should be ignored
+    :type error_threshold: int
+    :param mini_batch_error_threshold: The number of mini batch processing failures should be ignored
+    :type mini_batch_error_threshold: int
+    :param task: The parallel task
+    :type task: ParallelTask
+    :param mini_batch_size: For FileDataset input, this field is the number of files
+                            a user script can process in one run() call.
+                            For TabularDataset input, this field is the approximate size of data
+                            the user script can process in one run() call.
+                            Example values are 1024, 1024KB, 10MB, and 1GB. (optional, default value is 10 files
+                            for FileDataset and 1MB for TabularDataset.)
+                            This value could be set through PipelineParameter
+    :type mini_batch_size: str
+    :param partition_keys: The keys used to partition dataset into mini-batches. If specified,
+                           the data with the same key will be partitioned into the same mini-batch.
+                           If both partition_keys and mini_batch_size are specified,
+                           the partition keys will take effect.
+                           The input(s) must be partitioned dataset(s),
+                           and the partition_keys must be a subset of the keys of every input dataset for this to work.
+    :keyword identity: The identity that the command job will use while running on compute.
+    :paramtype identity: Optional[Union[
+        dict[str, str],
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration]]
+    :type partition_keys: List
+    :param input_data: The input data
+    :type input_data: str
+    :param inputs: Inputs of the component/job
+    :type inputs: dict
+    :param outputs: Outputs of the component/job
+    :type outputs: dict
+    """
+
+    # pylint: disable=too-many-statements
+    def __init__(
+        self,
+        *,
+        component: Union[ParallelComponent, str],
+        compute: Optional[str] = None,
+        inputs: Optional[Dict[str, Union[NodeOutput, Input, str, bool, int, float, Enum]]] = None,
+        outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
+        retry_settings: Optional[Union[RetrySettings, Dict[str, str]]] = None,
+        logging_level: Optional[str] = None,
+        max_concurrency_per_instance: Optional[int] = None,
+        error_threshold: Optional[int] = None,
+        mini_batch_error_threshold: Optional[int] = None,
+        input_data: Optional[str] = None,
+        task: Optional[Union[ParallelTask, RunFunction, Dict]] = None,
+        partition_keys: Optional[List] = None,
+        mini_batch_size: Optional[Union[str, int]] = None,
+        resources: Optional[JobResourceConfiguration] = None,
+        environment_variables: Optional[Dict] = None,
+        identity: Optional[
+            Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration, Dict]
+        ] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+        kwargs.pop("type", None)
+
+        if isinstance(component, FlowComponent):
+            # make input definition fit actual inputs for flow component
+            with component._inputs._fit_inputs(inputs):  # type: ignore[attr-defined]
+                BaseNode.__init__(
+                    self,
+                    type=NodeType.PARALLEL,
+                    component=component,
+                    inputs=inputs,
+                    outputs=outputs,
+                    compute=compute,
+                    **kwargs,
+                )
+        else:
+            BaseNode.__init__(
+                self,
+                type=NodeType.PARALLEL,
+                component=component,
+                inputs=inputs,
+                outputs=outputs,
+                compute=compute,
+                **kwargs,
+            )
+        # init mark for _AttrDict
+        self._init = True
+
+        self._task = task
+
+        if (
+            mini_batch_size is not None
+            and not isinstance(mini_batch_size, int)
+            and not is_data_binding_expression(mini_batch_size)
+        ):
+            """Convert str to int."""  # pylint: disable=pointless-string-statement
+            pattern = re.compile(r"^\d+([kKmMgG][bB])*$")
+            if not pattern.match(mini_batch_size):
+                raise ValueError(r"Parameter mini_batch_size must follow regex rule ^\d+([kKmMgG][bB])*$")
+
+            try:
+                mini_batch_size = int(mini_batch_size)
+            except ValueError as e:
+                if not isinstance(mini_batch_size, int):
+                    unit = mini_batch_size[-2:].lower()
+                    if unit == "kb":
+                        mini_batch_size = int(mini_batch_size[0:-2]) * 1024
+                    elif unit == "mb":
+                        mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024
+                    elif unit == "gb":
+                        mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024 * 1024
+                    else:
+                        raise ValueError("mini_batch_size unit must be kb, mb or gb") from e
+
+        self.mini_batch_size = mini_batch_size
+        self.partition_keys = partition_keys
+        self.input_data = input_data
+        self._retry_settings = retry_settings
+        self.logging_level = logging_level
+        self.max_concurrency_per_instance = max_concurrency_per_instance
+        self.error_threshold = error_threshold
+        self.mini_batch_error_threshold = mini_batch_error_threshold
+        self._resources = resources
+        self.environment_variables = {} if environment_variables is None else environment_variables
+        self._identity = identity
+        if isinstance(self.component, ParallelComponent):
+            self.resources = cast(JobResourceConfiguration, self.resources) or cast(
+                JobResourceConfiguration, copy.deepcopy(self.component.resources)
+            )
+            # TODO: Bug Item number: 2897665
+            self.retry_settings = self.retry_settings or copy.deepcopy(self.component.retry_settings)  # type: ignore
+            self.input_data = self.input_data or self.component.input_data
+            self.max_concurrency_per_instance = (
+                self.max_concurrency_per_instance or self.component.max_concurrency_per_instance
+            )
+            self.mini_batch_error_threshold = (
+                self.mini_batch_error_threshold or self.component.mini_batch_error_threshold
+            )
+            self.mini_batch_size = self.mini_batch_size or self.component.mini_batch_size
+            self.partition_keys = self.partition_keys or copy.deepcopy(self.component.partition_keys)
+
+            if not self.task:
+                self.task = self.component.task
+                # task.code is based on self.component.base_path
+                self._base_path = self.component.base_path
+
+        self._init = False
+
+    @classmethod
+    def _get_supported_outputs_types(cls) -> Tuple:
+        return str, Output
+
+    @property
+    def retry_settings(self) -> RetrySettings:
+        """Get the retry settings for the parallel job.
+
+        :return: The retry settings for the parallel job.
+        :rtype: ~azure.ai.ml.entities._job.parallel.retry_settings.RetrySettings
+        """
+        return self._retry_settings  # type: ignore
+
+    @retry_settings.setter
+    def retry_settings(self, value: Union[RetrySettings, Dict]) -> None:
+        """Set the retry settings for the parallel job.
+
+        :param value: The retry settings for the parallel job.
+        :type value: ~azure.ai.ml.entities._job.parallel.retry_settings.RetrySettings or dict
+        """
+        if isinstance(value, dict):
+            value = RetrySettings(**value)
+        self._retry_settings = value
+
+    @property
+    def resources(self) -> Optional[JobResourceConfiguration]:
+        """Get the resource configuration for the parallel job.
+
+        :return: The resource configuration for the parallel job.
+        :rtype: ~azure.ai.ml.entities._job.job_resource_configuration.JobResourceConfiguration
+        """
+        return self._resources
+
+    @resources.setter
+    def resources(self, value: Union[JobResourceConfiguration, Dict]) -> None:
+        """Set the resource configuration for the parallel job.
+
+        :param value: The resource configuration for the parallel job.
+        :type value: ~azure.ai.ml.entities._job.job_resource_configuration.JobResourceConfiguration or dict
+        """
+        if isinstance(value, dict):
+            value = JobResourceConfiguration(**value)
+        self._resources = value
+
+    @property
+    def identity(
+        self,
+    ) -> Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration, Dict]]:
+        """The identity that the job will use while running on compute.
+
+        :return: The identity that the job will use while running on compute.
+        :rtype: Optional[Union[~azure.ai.ml.ManagedIdentityConfiguration, ~azure.ai.ml.AmlTokenConfiguration,
+            ~azure.ai.ml.UserIdentityConfiguration]]
+        """
+        return self._identity
+
+    @identity.setter
+    def identity(
+        self,
+        value: Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration, None],
+    ) -> None:
+        """Sets the identity that the job will use while running on compute.
+
+        :param value: The identity that the job will use while running on compute.
+        :type value: Union[dict[str, str], ~azure.ai.ml.ManagedIdentityConfiguration,
+            ~azure.ai.ml.AmlTokenConfiguration, ~azure.ai.ml.UserIdentityConfiguration]
+        """
+        if isinstance(value, dict):
+            identity_schema = UnionField(
+                [
+                    NestedField(ManagedIdentitySchema, unknown=INCLUDE),
+                    NestedField(AMLTokenIdentitySchema, unknown=INCLUDE),
+                    NestedField(UserIdentitySchema, unknown=INCLUDE),
+                ]
+            )
+            value = identity_schema._deserialize(value=value, attr=None, data=None)
+        self._identity = value
+
+    @property
+    def component(self) -> Union[str, ParallelComponent]:
+        """Get the component of the parallel job.
+
+        :return: The component of the parallel job.
+        :rtype: str or ~azure.ai.ml.entities._component.parallel_component.ParallelComponent
+        """
+        res: Union[str, ParallelComponent] = self._component
+        return res
+
+    @property
+    def task(self) -> Optional[ParallelTask]:
+        """Get the parallel task.
+
+        :return: The parallel task.
+        :rtype: ~azure.ai.ml.entities._job.parallel.parallel_task.ParallelTask
+        """
+        return self._task  # type: ignore
+
+    @task.setter
+    def task(self, value: Union[ParallelTask, Dict]) -> None:
+        """Set the parallel task.
+
+        :param value: The parallel task.
+        :type value: ~azure.ai.ml.entities._job.parallel.parallel_task.ParallelTask or dict
+        """
+        # base path should be reset if task is set via sdk
+        self._base_path: Optional[Union[str, os.PathLike]] = None
+        if isinstance(value, dict):
+            value = ParallelTask(**value)
+        self._task = value
+
+    def _set_base_path(self, base_path: Optional[Union[str, os.PathLike]]) -> None:
+        if self._base_path:
+            return
+        super(Parallel, self)._set_base_path(base_path)
+
+    def set_resources(
+        self,
+        *,
+        instance_type: Optional[Union[str, List[str]]] = None,
+        instance_count: Optional[int] = None,
+        properties: Optional[Dict] = None,
+        docker_args: Optional[str] = None,
+        shm_size: Optional[str] = None,
+        # pylint: disable=unused-argument
+        **kwargs: Any,
+    ) -> None:
+        """Set the resources for the parallel job.
+
+        :keyword instance_type: The instance type or a list of instance types used as supported by the compute target.
+        :paramtype instance_type: Union[str, List[str]]
+        :keyword instance_count: The number of instances or nodes used by the compute target.
+        :paramtype instance_count: int
+        :keyword properties: The property dictionary for the resources.
+        :paramtype properties: dict
+        :keyword docker_args: Extra arguments to pass to the Docker run command.
+        :paramtype docker_args: str
+        :keyword shm_size: Size of the Docker container's shared memory block.
+        :paramtype shm_size: str
+        """
+        if self.resources is None:
+            self.resources = JobResourceConfiguration()
+
+        if instance_type is not None:
+            self.resources.instance_type = instance_type
+        if instance_count is not None:
+            self.resources.instance_count = instance_count
+        if properties is not None:
+            self.resources.properties = properties
+        if docker_args is not None:
+            self.resources.docker_args = docker_args
+        if shm_size is not None:
+            self.resources.shm_size = shm_size
+
+        # Save the resources to internal component as well, otherwise calling sweep() will loose the settings
+        if isinstance(self.component, Component):
+            self.component.resources = self.resources
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "component": (str, ParallelComponent, FlowComponent),
+            "retry_settings": (dict, RetrySettings),
+            "resources": (dict, JobResourceConfiguration),
+            "task": (dict, ParallelTask),
+            "logging_level": str,
+            "max_concurrency_per_instance": (str, int),
+            "error_threshold": (str, int),
+            "mini_batch_error_threshold": (str, int),
+            "environment_variables": dict,
+        }
+
+    def _to_job(self) -> ParallelJob:
+        return ParallelJob(
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            tags=self.tags,
+            properties=self.properties,
+            compute=self.compute,
+            resources=self.resources,
+            partition_keys=self.partition_keys,
+            mini_batch_size=self.mini_batch_size,
+            task=self.task,
+            retry_settings=self.retry_settings,
+            input_data=self.input_data,
+            logging_level=self.logging_level,
+            identity=self.identity,
+            max_concurrency_per_instance=self.max_concurrency_per_instance,
+            error_threshold=self.error_threshold,
+            mini_batch_error_threshold=self.mini_batch_error_threshold,
+            environment_variables=self.environment_variables,
+            inputs=self._job_inputs,
+            outputs=self._job_outputs,
+        )
+
+    def _parallel_attr_to_dict(self, attr: str, base_type: Type) -> dict:
+        # Convert parallel attribute to dict
+        rest_attr = {}
+        parallel_attr = getattr(self, attr)
+        if parallel_attr is not None:
+            if isinstance(parallel_attr, base_type):
+                rest_attr = parallel_attr._to_dict()
+            elif isinstance(parallel_attr, dict):
+                rest_attr = parallel_attr
+            else:
+                msg = f"Expecting {base_type} for {attr}, got {type(parallel_attr)} instead."
+                raise MlException(message=msg, no_personal_data_message=msg)
+        # TODO: Bug Item number: 2897665
+        res: dict = convert_ordered_dict_to_dict(rest_attr)  # type: ignore
+        return res
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return [
+            "type",
+            "resources",
+            "error_threshold",
+            "mini_batch_error_threshold",
+            "environment_variables",
+            "max_concurrency_per_instance",
+            "task",
+            "input_data",
+        ]
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj: Dict = super(Parallel, self)._to_rest_object(**kwargs)
+        rest_obj.update(
+            convert_ordered_dict_to_dict(
+                {
+                    "componentId": self._get_component_id(),
+                    "retry_settings": get_rest_dict_for_node_attrs(self.retry_settings),
+                    "logging_level": self.logging_level,
+                    "mini_batch_size": self.mini_batch_size,
+                    "partition_keys": (
+                        json.dumps(self.partition_keys) if self.partition_keys is not None else self.partition_keys
+                    ),
+                    "identity": get_rest_dict_for_node_attrs(self.identity),
+                    "resources": get_rest_dict_for_node_attrs(self.resources),
+                }
+            )
+        )
+        return rest_obj
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: dict) -> Dict:
+        obj = super()._from_rest_object_to_init_params(obj)
+        # retry_settings
+        if "retry_settings" in obj and obj["retry_settings"]:
+            obj["retry_settings"] = RetrySettings._from_dict(obj["retry_settings"])
+
+        if "task" in obj and obj["task"]:
+            obj["task"] = ParallelTask._from_dict(obj["task"])
+            task_code = obj["task"].code
+            task_env = obj["task"].environment
+            # remove azureml: prefix in code and environment which is added in _to_rest_object
+            if task_code and isinstance(task_code, str) and task_code.startswith(ARM_ID_PREFIX):
+                obj["task"].code = task_code[len(ARM_ID_PREFIX) :]
+            if task_env and isinstance(task_env, str) and task_env.startswith(ARM_ID_PREFIX):
+                obj["task"].environment = task_env[len(ARM_ID_PREFIX) :]
+
+        if "resources" in obj and obj["resources"]:
+            obj["resources"] = JobResourceConfiguration._from_rest_object(obj["resources"])
+
+        if "partition_keys" in obj and obj["partition_keys"]:
+            obj["partition_keys"] = json.dumps(obj["partition_keys"])
+        if "identity" in obj and obj["identity"]:
+            obj["identity"] = _BaseJobIdentityConfiguration._from_rest_object(obj["identity"])
+        return obj
+
+    def _build_inputs(self) -> Dict:
+        inputs = super(Parallel, self)._build_inputs()
+        built_inputs = {}
+        # Validate and remove non-specified inputs
+        for key, value in inputs.items():
+            if value is not None:
+                built_inputs[key] = value
+        return built_inputs
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline import ParallelSchema
+
+        return ParallelSchema(context=context)
+
+    # pylint: disable-next=docstring-missing-param
+    def __call__(self, *args: Any, **kwargs: Any) -> "Parallel":
+        """Call Parallel as a function will return a new instance each time.
+
+        :return: A Parallel node
+        :rtype: Parallel
+        """
+        if isinstance(self._component, Component):
+            # call this to validate inputs
+            node: Parallel = self._component(*args, **kwargs)
+            # merge inputs
+            for name, original_input in self.inputs.items():
+                if name not in kwargs:
+                    # use setattr here to make sure owner of input won't change
+                    setattr(node.inputs, name, original_input._data)
+                # get outputs
+            for name, original_output in self.outputs.items():
+                # use setattr here to make sure owner of input won't change
+                if not isinstance(original_output, str):
+                    setattr(node.outputs, name, original_output._data)
+            self._refine_optional_inputs_with_no_value(node, kwargs)
+            # set default values: compute, environment_variables, outputs
+            node._name = self.name
+            node.compute = self.compute
+            node.tags = self.tags
+            node.display_name = self.display_name
+            node.mini_batch_size = self.mini_batch_size
+            node.partition_keys = self.partition_keys
+            node.logging_level = self.logging_level
+            node.max_concurrency_per_instance = self.max_concurrency_per_instance
+            node.error_threshold = self.error_threshold
+            # deep copy for complex object
+            node.retry_settings = copy.deepcopy(self.retry_settings)
+            node.input_data = self.input_data
+            node.task = copy.deepcopy(self.task)
+            node._base_path = self.base_path
+            node.resources = copy.deepcopy(self.resources)
+            node.environment_variables = copy.deepcopy(self.environment_variables)
+            node.identity = copy.deepcopy(self.identity)
+            return node
+        msg = f"Parallel can be called as a function only when referenced component is {type(Component)}, \
+            currently got {self._component}."
+        raise MlException(message=msg, no_personal_data_message=msg)
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job":
+        raise NotImplementedError()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_for.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_for.py
new file mode 100644
index 00000000..1e888f50
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_for.py
@@ -0,0 +1,362 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import json
+import os
+from typing import Any, Dict, Optional, Union
+
+from azure.ai.ml import Input, Output
+from azure.ai.ml._schema import PathAwareSchema
+from azure.ai.ml._schema.pipeline.control_flow_job import ParallelForSchema
+from azure.ai.ml._utils.utils import is_data_binding_expression
+from azure.ai.ml.constants import AssetTypes
+from azure.ai.ml.constants._component import ComponentParameterTypes, ControlFlowType
+from azure.ai.ml.entities import Component, Pipeline
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.control_flow_node import LoopNode
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
+from azure.ai.ml.entities._job.pipeline._io.mixin import NodeIOMixin
+from azure.ai.ml.entities._util import convert_ordered_dict_to_dict, validate_attribute_type
+from azure.ai.ml.entities._validation import MutableValidationResult
+from azure.ai.ml.exceptions import UserErrorException
+
+
+class ParallelFor(LoopNode, NodeIOMixin):
+    """Parallel for loop node in the pipeline job. By specifying the loop body and aggregated items, a job-level
+    parallel for loop can be implemented. It will be initialized when calling dsl.parallel_for or when loading the
+    pipeline yml containing parallel_for node. Please do not manually initialize this class.
+
+    :param body: Pipeline job for the parallel for loop body.
+    :type body: ~azure.ai.ml.entities.Pipeline
+    :param items: The loop body's input which will bind to the loop node.
+    :type items: typing.Union[list, dict, str, ~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
+        ~azure.ai.ml.entities._job.pipeline._io.PipelineInput]
+    :param max_concurrency: Maximum number of concurrent iterations to run. All loop body nodes will be executed
+        in parallel if not specified.
+    :type max_concurrency: int
+    """
+
+    OUT_TYPE_MAPPING = {
+        AssetTypes.URI_FILE: AssetTypes.MLTABLE,
+        AssetTypes.URI_FOLDER: AssetTypes.MLTABLE,
+        AssetTypes.MLTABLE: AssetTypes.MLTABLE,
+        AssetTypes.MLFLOW_MODEL: AssetTypes.MLTABLE,
+        AssetTypes.TRITON_MODEL: AssetTypes.MLTABLE,
+        AssetTypes.CUSTOM_MODEL: AssetTypes.MLTABLE,
+        # legacy path support
+        "path": AssetTypes.MLTABLE,
+        ComponentParameterTypes.NUMBER: ComponentParameterTypes.STRING,
+        ComponentParameterTypes.STRING: ComponentParameterTypes.STRING,
+        ComponentParameterTypes.BOOLEAN: ComponentParameterTypes.STRING,
+        ComponentParameterTypes.INTEGER: ComponentParameterTypes.STRING,
+    }
+
+    def __init__(
+        self,
+        *,
+        body: "Pipeline",
+        items: Union[list, dict, str, PipelineInput, NodeOutput],
+        max_concurrency: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+        kwargs.pop("type", None)
+        super(ParallelFor, self).__init__(
+            type=ControlFlowType.PARALLEL_FOR,
+            body=body,
+            **kwargs,
+        )
+        # loop body is incomplete in submission time, so won't validate required inputs
+        self.body._validate_required_input_not_provided = False
+        self._outputs: dict = {}
+
+        actual_outputs = kwargs.get("outputs", {})
+        # parallel for node shares output meta with body
+        try:
+            outputs = self.body._component.outputs
+            # transform body outputs to aggregate types when available
+            self._outputs = self._build_outputs_dict(
+                outputs=actual_outputs, output_definition_dict=self._convert_output_meta(outputs)
+            )
+        except AttributeError:
+            # when body output not available, create default output builder without meta
+            self._outputs = self._build_outputs_dict(outputs=actual_outputs)
+
+        self._items = items
+
+        self.max_concurrency = max_concurrency
+
+    @property
+    def outputs(self) -> Dict[str, Union[str, Output]]:
+        """Get the outputs of the parallel for loop.
+
+        :return: The dictionary containing the outputs of the parallel for loop.
+        :rtype: dict[str, Union[str, ~azure.ai.ml.Output]]
+        """
+        return self._outputs
+
+    @property
+    def items(self) -> Union[list, dict, str, PipelineInput, NodeOutput]:
+        """Get the loop body's input which will bind to the loop node.
+
+        :return: The input for the loop body.
+        :rtype: typing.Union[list, dict, str, ~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
+            ~azure.ai.ml.entities._job.pipeline._io.PipelineInput]
+        """
+        return self._items
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
+        return ParallelForSchema(context=context)
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            **super(ParallelFor, cls)._attr_type_map(),
+            "items": (dict, list, str, PipelineInput, NodeOutput),
+        }
+
+    @classmethod
+    # pylint: disable-next=docstring-missing-param
+    def _to_rest_item(cls, item: dict) -> dict:
+        """Convert item to rest object.
+
+        :return: The rest object
+        :rtype: dict
+        """
+        primitive_inputs, asset_inputs = {}, {}
+        # validate item
+        for key, val in item.items():
+            if isinstance(val, Input):
+                asset_inputs[key] = val
+            elif isinstance(val, (PipelineInput, NodeOutput)):
+                # convert binding object to string
+                primitive_inputs[key] = str(val)
+            else:
+                primitive_inputs[key] = val
+        return {
+            # asset type inputs will be converted to JobInput dict:
+            # {"asset_param": {"uri": "xxx", "job_input_type": "uri_file"}}
+            **cls._input_entity_to_rest_inputs(input_entity=asset_inputs),
+            # primitive inputs has primitive type value like this
+            # {"int_param": 1}
+            **primitive_inputs,
+        }
+
+    @classmethod
+    # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+    def _to_rest_items(cls, items: Union[list, dict, str, NodeOutput, PipelineInput]) -> str:
+        """Convert items to rest object."""
+        # validate items.
+        cls._validate_items(items=items, raise_error=True, body_component=None)
+        result: str = ""
+        # convert items to rest object
+        if isinstance(items, list):
+            rest_items_list = [cls._to_rest_item(item=i) for i in items]
+            result = json.dumps(rest_items_list)
+        elif isinstance(items, dict):
+            rest_items_dict = {k: cls._to_rest_item(item=v) for k, v in items.items()}
+            result = json.dumps(rest_items_dict)
+        elif isinstance(items, (NodeOutput, PipelineInput)):
+            result = str(items)
+        elif isinstance(items, str):
+            result = items
+        else:
+            raise UserErrorException("Unsupported items type: {}".format(type(items)))
+        return result
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        """Convert self to a rest object for remote call.
+
+        :return: The rest object
+        :rtype: dict
+        """
+        rest_node = super(ParallelFor, self)._to_rest_object(**kwargs)
+        # convert items to rest object
+        rest_items = self._to_rest_items(items=self.items)
+        rest_node.update({"items": rest_items, "outputs": self._to_rest_outputs()})
+        # TODO: Bug Item number: 2897665
+        res: dict = convert_ordered_dict_to_dict(rest_node)  # type: ignore
+        return res
+
+    @classmethod
+    # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+    def _from_rest_item(cls, rest_item: Any) -> Dict:
+        """Convert rest item to item."""
+        primitive_inputs, asset_inputs = {}, {}
+        for key, val in rest_item.items():
+            if isinstance(val, dict) and val.get("job_input_type"):
+                asset_inputs[key] = val
+            else:
+                primitive_inputs[key] = val
+        return {**cls._from_rest_inputs(inputs=asset_inputs), **primitive_inputs}
+
+    @classmethod
+    # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+    def _from_rest_items(cls, rest_items: str) -> Union[dict, list, str]:
+        """Convert items from rest object."""
+        try:
+            items = json.loads(rest_items)
+        except json.JSONDecodeError:
+            # return original items when failed to load
+            return rest_items
+        if isinstance(items, list):
+            return [cls._from_rest_item(rest_item=i) for i in items]
+        if isinstance(items, dict):
+            return {k: cls._from_rest_item(rest_item=v) for k, v in items.items()}
+        return rest_items
+
+    @classmethod
+    def _from_rest_object(cls, obj: dict, pipeline_jobs: dict) -> "ParallelFor":
+        # pylint: disable=protected-access
+        obj = BaseNode._from_rest_object_to_init_params(obj)
+        obj["items"] = cls._from_rest_items(rest_items=obj.get("items", ""))
+        return cls._create_instance_from_schema_dict(pipeline_jobs=pipeline_jobs, loaded_data=obj)
+
+    @classmethod
+    def _create_instance_from_schema_dict(cls, pipeline_jobs: Dict, loaded_data: Dict, **kwargs: Any) -> "ParallelFor":
+        body_name = cls._get_data_binding_expression_value(loaded_data.pop("body"), regex=r"\{\{.*\.jobs\.(.*)\}\}")
+
+        loaded_data["body"] = cls._get_body_from_pipeline_jobs(pipeline_jobs=pipeline_jobs, body_name=body_name)
+        return cls(**loaded_data, **kwargs)
+
+    def _convert_output_meta(self, outputs: Dict[str, Union[NodeOutput, Output]]) -> Dict[str, Output]:
+        """Convert output meta to aggregate types.
+
+        :param outputs: Output meta
+        :type outputs: Dict[str, Union[NodeOutput, Output]]
+        :return: Dictionary of aggregate types
+        :rtype: Dict[str, Output]
+        """
+        # pylint: disable=protected-access
+        aggregate_outputs = {}
+        for name, output in outputs.items():
+            if output.type in self.OUT_TYPE_MAPPING:
+                new_type = self.OUT_TYPE_MAPPING[output.type]
+            else:
+                # when loop body introduces some new output type, this will be raised as a reminder to support is in
+                # parallel for
+                raise UserErrorException(
+                    "Referencing output with type {} is not supported in parallel_for node.".format(output.type)
+                )
+            if isinstance(output, NodeOutput):
+                output = output._to_job_output()  # type: ignore
+            if isinstance(output, Output):
+                out_dict = output._to_dict()
+                out_dict["type"] = new_type
+                resolved_output = Output(**out_dict)
+            else:
+                resolved_output = Output(type=new_type)
+            aggregate_outputs[name] = resolved_output
+        return aggregate_outputs
+
+    def _customized_validate(self) -> MutableValidationResult:
+        """Customized validation for parallel for node.
+
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        # pylint: disable=protected-access
+        validation_result = self._validate_body()
+        validation_result.merge_with(
+            self._validate_items(items=self.items, raise_error=False, body_component=self.body._component)
+        )
+        return validation_result
+
+    @classmethod
+    def _validate_items(
+        cls,
+        items: Union[list, dict, str, NodeOutput, PipelineInput],
+        raise_error: bool = True,
+        body_component: Optional[Union[str, Component]] = None,
+    ) -> MutableValidationResult:
+        validation_result = cls._create_empty_validation_result()
+        if items is not None:
+            if isinstance(items, str):
+                # TODO: remove the validation
+                # try to deserialize str if it's a json string
+                try:
+                    items = json.loads(items)
+                except json.JSONDecodeError as e:
+                    if not is_data_binding_expression(items, ["parent"]):
+                        validation_result.append_error(
+                            yaml_path="items",
+                            message=f"Items is neither a valid JSON string due to {e} or a binding string.",
+                        )
+            if isinstance(items, dict):
+                # Validate dict keys
+                items = list(items.values())
+            if isinstance(items, list):
+                if len(items) > 0:
+                    cls._validate_items_list(items, validation_result, body_component=body_component)
+                else:
+                    validation_result.append_error(yaml_path="items", message="Items is an empty list/dict.")
+        else:
+            validation_result.append_error(
+                yaml_path="items",
+                message="Items is required for parallel_for node",
+            )
+        return cls._try_raise(validation_result, raise_error=raise_error)
+
+    @classmethod
+    def _validate_items_list(
+        cls,
+        items: list,
+        validation_result: MutableValidationResult,
+        body_component: Optional[Union[str, Component]] = None,
+    ) -> None:
+        meta: dict = {}
+        # all items have to be dict and have matched meta
+        for item in items:
+            # item has to be dict
+            # Note: item can be empty dict when loop_body don't have foreach inputs.
+            if not isinstance(item, dict):
+                validation_result.append_error(
+                    yaml_path="items",
+                    message=f"Items has to be list/dict of dict as value, " f"but got {type(item)} for {item}.",
+                )
+            else:
+                # item has to have matched meta
+                if meta.keys() != item.keys():
+                    if not meta.keys():
+                        meta = item
+                    else:
+                        msg = f"Items should have same keys with body inputs, but got {item.keys()} and {meta.keys()}."
+                        validation_result.append_error(yaml_path="items", message=msg)
+                # items' keys should appear in body's inputs
+                if isinstance(body_component, Component) and (not item.keys() <= body_component.inputs.keys()):
+                    msg = f"Item {item} got unmatched inputs with loop body component inputs {body_component.inputs}."
+                    validation_result.append_error(yaml_path="items", message=msg)
+                # validate item value type
+                cls._validate_item_value_type(item=item, validation_result=validation_result)
+
+    @classmethod
+    def _validate_item_value_type(cls, item: dict, validation_result: MutableValidationResult) -> None:
+        supported_types = (Input, str, bool, int, float, PipelineInput)
+        for _, val in item.items():
+            if not isinstance(val, supported_types):
+                validation_result.append_error(
+                    yaml_path="items",
+                    message="Unsupported type {} in parallel_for items. Supported types are: {}".format(
+                        type(val), supported_types
+                    ),
+                )
+            if isinstance(val, Input):
+                cls._validate_input_item_value(entry=val, validation_result=validation_result)
+
+    @classmethod
+    def _validate_input_item_value(cls, entry: Input, validation_result: MutableValidationResult) -> None:
+        if not isinstance(entry, Input):
+            return
+        if not entry.path:
+            validation_result.append_error(
+                yaml_path="items",
+                message=f"Input path not provided for {entry}.",
+            )
+        if isinstance(entry.path, str) and os.path.exists(entry.path):
+            validation_result.append_error(
+                yaml_path="items",
+                message=f"Local file input {entry} is not supported, please create it as a dataset.",
+            )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py
new file mode 100644
index 00000000..a8f08d1e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py
@@ -0,0 +1,285 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os
+from typing import Any, Dict, List, Optional, Union
+
+from azure.ai.ml.constants._component import ComponentSource
+from azure.ai.ml.entities._component.parallel_component import ParallelComponent
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+)
+from azure.ai.ml.entities._deployment.deployment_settings import BatchRetrySettings
+from azure.ai.ml.entities._job.parallel.run_function import RunFunction
+
+from .command_func import _parse_input, _parse_inputs_outputs, _parse_output
+from .parallel import Parallel
+
+
+def parallel_run_function(
+    *,
+    name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    properties: Optional[Dict] = None,
+    display_name: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    compute: Optional[str] = None,
+    retry_settings: Optional[BatchRetrySettings] = None,
+    environment_variables: Optional[Dict] = None,
+    logging_level: Optional[str] = None,
+    max_concurrency_per_instance: Optional[int] = None,
+    error_threshold: Optional[int] = None,
+    mini_batch_error_threshold: Optional[int] = None,
+    task: Optional[RunFunction] = None,
+    mini_batch_size: Optional[str] = None,
+    partition_keys: Optional[List] = None,
+    input_data: Optional[str] = None,
+    inputs: Optional[Dict] = None,
+    outputs: Optional[Dict] = None,
+    instance_count: Optional[int] = None,
+    instance_type: Optional[str] = None,
+    docker_args: Optional[str] = None,
+    shm_size: Optional[str] = None,
+    identity: Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]] = None,
+    is_deterministic: bool = True,
+    **kwargs: Any,
+) -> Parallel:
+    """Create a Parallel object which can be used inside dsl.pipeline as a function and can also be created as a
+    standalone parallel job.
+
+    For an example of using ParallelRunStep, see the notebook
+    https://aka.ms/parallel-example-notebook
+
+    .. note::
+
+        To use parallel_run_function:
+
+        * Create a :class:`azure.ai.ml.entities._builders.Parallel` object to specify how parallel run is performed,
+          with parameters to control batch size,number of nodes per compute target, and a
+          reference to your custom Python script.
+
+        * Build pipeline with the parallel object as a function. defines inputs and
+          outputs for the step.
+
+        * Sumbit the pipeline to run.
+
+    .. code:: python
+
+        from azure.ai.ml import Input, Output, parallel
+
+        parallel_run = parallel_run_function(
+            name="batch_score_with_tabular_input",
+            display_name="Batch Score with Tabular Dataset",
+            description="parallel component for batch score",
+            inputs=dict(
+                job_data_path=Input(
+                    type=AssetTypes.MLTABLE,
+                    description="The data to be split and scored in parallel",
+                ),
+                score_model=Input(
+                    type=AssetTypes.URI_FOLDER, description="The model for batch score."
+                ),
+            ),
+            outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
+            input_data="${{inputs.job_data_path}}",
+            max_concurrency_per_instance=2,  # Optional, default is 1
+            mini_batch_size="100",  # optional
+            mini_batch_error_threshold=5,  # Optional, allowed failed count on mini batch items, default is -1
+            logging_level="DEBUG",  # Optional, default is INFO
+            error_threshold=5,  # Optional, allowed failed count totally, default is -1
+            retry_settings=dict(max_retries=2, timeout=60),  # Optional
+            task=RunFunction(
+                code="./src",
+                entry_script="tabular_batch_inference.py",
+                environment=Environment(
+                    image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
+                    conda_file="./src/environment_parallel.yml",
+                ),
+                program_arguments="--model ${{inputs.score_model}}",
+                append_row_to="${{outputs.job_output_path}}",  # Optional, if not set, summary_only
+            ),
+        )
+
+    :keyword name: Name of the parallel job or component created.
+    :paramtype name: str
+    :keyword description: A friendly description of the parallel.
+    :paramtype description: str
+    :keyword tags: Tags to be attached to this parallel.
+    :paramtype tags: Dict
+    :keyword properties: The asset property dictionary.
+    :paramtype properties: Dict
+    :keyword display_name: A friendly name.
+    :paramtype display_name: str
+    :keyword experiment_name: Name of the experiment the job will be created under,
+                            if None is provided, default will be set to current directory name.
+                            Will be ignored as a pipeline step.
+    :paramtype experiment_name: str
+    :keyword compute: The name of the compute where the parallel job is executed (will not be used
+                    if the parallel is used as a component/function).
+    :paramtype compute: str
+    :keyword retry_settings: Parallel component run failed retry
+    :paramtype retry_settings: ~azure.ai.ml.entities._deployment.deployment_settings.BatchRetrySettings
+    :keyword environment_variables: A dictionary of environment variables names and values.
+                                  These environment variables are set on the process
+                                  where user script is being executed.
+    :paramtype environment_variables: Dict[str, str]
+    :keyword logging_level: A string of the logging level name, which is defined in 'logging'.
+                          Possible values are 'WARNING', 'INFO', and 'DEBUG'. (optional, default value is 'INFO'.)
+                          This value could be set through PipelineParameter.
+    :paramtype logging_level: str
+    :keyword max_concurrency_per_instance: The max parallellism that each compute instance has.
+    :paramtype max_concurrency_per_instance: int
+    :keyword error_threshold: The number of record failures for Tabular Dataset and file failures for File Dataset
+                            that should be ignored during processing.
+                            If the error count goes above this value, then the job will be aborted.
+                            Error threshold is for the entire input rather
+                            than the individual mini-batch sent to run() method.
+                            The range is [-1, int.max]. -1 indicates ignore all failures during processing
+    :paramtype error_threshold: int
+    :keyword mini_batch_error_threshold: The number of mini batch processing failures should be ignored
+    :paramtype mini_batch_error_threshold: int
+    :keyword task: The parallel task
+    :paramtype task: ~azure.ai.ml.entities._job.parallel.run_function.RunFunction
+    :keyword mini_batch_size: For FileDataset input,
+                            this field is the number of files a user script can process in one run() call.
+                            For TabularDataset input, this field is the approximate size of data
+                            the user script can process in one run() call.
+                            Example values are 1024, 1024KB, 10MB, and 1GB.
+                            (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.)
+                            This value could be set through PipelineParameter.
+    :paramtype mini_batch_size: str
+    :keyword partition_keys: The keys used to partition dataset into mini-batches. If specified,
+                           the data with the same key will be partitioned into the same mini-batch.
+                           If both partition_keys and mini_batch_size are specified,
+                           the partition keys will take effect.
+                           The input(s) must be partitioned dataset(s),
+                           and the partition_keys must be a subset of the keys of every input dataset for this to work
+    :paramtype partition_keys: List
+    :keyword input_data: The input data.
+    :paramtype input_data: str
+    :keyword inputs: A dict of inputs used by this parallel.
+    :paramtype inputs: Dict
+    :keyword outputs: The outputs of this parallel
+    :paramtype outputs: Dict
+    :keyword instance_count: Optional number of instances or nodes used by the compute target.
+                           Defaults to 1
+    :paramtype instance_count: int
+    :keyword instance_type: Optional type of VM used as supported by the compute target..
+    :paramtype instance_type: str
+    :keyword docker_args: Extra arguments to pass to the Docker run command.
+                        This would override any parameters that have already been set by the system,
+                        or in this section.
+                        This parameter is only supported for Azure ML compute types.
+    :paramtype docker_args: str
+    :keyword shm_size: Size of the docker container's shared memory block.
+                     This should be in the format of (number)(unit) where number as to be greater than 0
+                     and the unit can be one of b(bytes), k(kilobytes), m(megabytes), or g(gigabytes).
+    :paramtype shm_size: str
+    :keyword identity: Identity that PRS job will use while running on compute.
+    :paramtype identity: Optional[Union[
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration]]
+    :keyword is_deterministic: Specify whether the parallel will return same output given same input.
+                             If a parallel (component) is deterministic, when use it as a node/step in a pipeline,
+                             it will reuse results from a previous submitted job in current workspace
+                             which has same inputs and settings.
+                             In this case, this step will not use any compute resource. Defaults to True,
+                             specify is_deterministic=False if you would like to avoid such reuse behavior,
+                             defaults to True.
+    :paramtype is_deterministic: bool
+    :return: The parallel node
+    :rtype: ~azure.ai.ml._builders.parallel.Parallel
+    """
+    # pylint: disable=too-many-locals
+    inputs = inputs or {}
+    outputs = outputs or {}
+    component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+
+    component = kwargs.pop("component", None)
+
+    if component is None:
+        if task is None:
+            component = ParallelComponent(
+                base_path=os.getcwd(),  # base path should be current folder
+                name=name,
+                tags=tags,
+                code=None,
+                display_name=display_name,
+                description=description,
+                inputs=component_inputs,
+                outputs=component_outputs,
+                retry_settings=retry_settings,  # type: ignore[arg-type]
+                logging_level=logging_level,
+                max_concurrency_per_instance=max_concurrency_per_instance,
+                error_threshold=error_threshold,
+                mini_batch_error_threshold=mini_batch_error_threshold,
+                task=task,
+                mini_batch_size=mini_batch_size,
+                partition_keys=partition_keys,
+                input_data=input_data,
+                _source=ComponentSource.BUILDER,
+                is_deterministic=is_deterministic,
+                **kwargs,
+            )
+        else:
+            component = ParallelComponent(
+                base_path=os.getcwd(),  # base path should be current folder
+                name=name,
+                tags=tags,
+                code=task.code,
+                display_name=display_name,
+                description=description,
+                inputs=component_inputs,
+                outputs=component_outputs,
+                retry_settings=retry_settings,  # type: ignore[arg-type]
+                logging_level=logging_level,
+                max_concurrency_per_instance=max_concurrency_per_instance,
+                error_threshold=error_threshold,
+                mini_batch_error_threshold=mini_batch_error_threshold,
+                task=task,
+                mini_batch_size=mini_batch_size,
+                partition_keys=partition_keys,
+                input_data=input_data,
+                _source=ComponentSource.BUILDER,
+                is_deterministic=is_deterministic,
+                **kwargs,
+            )
+
+    parallel_obj = Parallel(
+        component=component,
+        name=name,
+        description=description,
+        tags=tags,
+        properties=properties,
+        display_name=display_name,
+        experiment_name=experiment_name,
+        compute=compute,
+        inputs=job_inputs,
+        outputs=job_outputs,
+        identity=identity,
+        environment_variables=environment_variables,
+        retry_settings=retry_settings,  # type: ignore[arg-type]
+        logging_level=logging_level,
+        max_concurrency_per_instance=max_concurrency_per_instance,
+        error_threshold=error_threshold,
+        mini_batch_error_threshold=mini_batch_error_threshold,
+        task=task,
+        mini_batch_size=mini_batch_size,
+        partition_keys=partition_keys,
+        input_data=input_data,
+        **kwargs,
+    )
+
+    if instance_count is not None or instance_type is not None or docker_args is not None or shm_size is not None:
+        parallel_obj.set_resources(
+            instance_count=instance_count, instance_type=instance_type, docker_args=docker_args, shm_size=shm_size
+        )
+
+    return parallel_obj
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py
new file mode 100644
index 00000000..188d9044
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py
@@ -0,0 +1,225 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import logging
+from enum import Enum
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
+
+from marshmallow import Schema
+
+from azure.ai.ml.entities._component.component import Component, NodeType
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.job import Job
+from azure.ai.ml.entities._validation import MutableValidationResult
+
+from ..._schema import PathAwareSchema
+from .._job.pipeline.pipeline_job_settings import PipelineJobSettings
+from .._util import convert_ordered_dict_to_dict, copy_output_setting, validate_attribute_type
+from .base_node import BaseNode
+
+if TYPE_CHECKING:
+    from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
+
+module_logger = logging.getLogger(__name__)
+
+
+class Pipeline(BaseNode):
+    """Base class for pipeline node, used for pipeline component version consumption. You should not instantiate this
+    class directly. Instead, you should use @pipeline decorator to create a pipeline node.
+
+    :param component: Id or instance of the pipeline component/job to be run for the step.
+    :type component: Union[~azure.ai.ml.entities.Component, str]
+    :param inputs: Inputs of the pipeline node.
+    :type inputs: Optional[Dict[str, Union[
+                                    ~azure.ai.ml.entities.Input,
+                                    str, bool, int, float, Enum, "Input"]]].
+    :param outputs: Outputs of the pipeline node.
+    :type outputs: Optional[Dict[str, Union[str, ~azure.ai.ml.entities.Output, "Output"]]]
+    :param settings: Setting of pipeline node, only taking effect for root pipeline job.
+    :type settings: Optional[~azure.ai.ml.entities._job.pipeline.pipeline_job_settings.PipelineJobSettings]
+    """
+
+    def __init__(
+        self,
+        *,
+        component: Union[Component, str],
+        inputs: Optional[
+            Dict[
+                str,
+                Union[
+                    Input,
+                    str,
+                    bool,
+                    int,
+                    float,
+                    Enum,
+                    "Input",
+                ],
+            ]
+        ] = None,
+        outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
+        settings: Optional[PipelineJobSettings] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+        kwargs.pop("type", None)
+
+        BaseNode.__init__(
+            self,
+            type=NodeType.PIPELINE,
+            component=component,
+            inputs=inputs,
+            outputs=outputs,
+            **kwargs,
+        )
+        # copy pipeline component output's setting to node level
+        self._copy_pipeline_component_out_setting_to_node()
+        self._settings: Optional[PipelineJobSettings] = None
+        self.settings = settings
+
+    @property
+    def component(self) -> Optional[Union[str, Component]]:
+        """Id or instance of the pipeline component/job to be run for the step.
+
+        :return: Id or instance of the pipeline component/job.
+        :rtype: Union[str, ~azure.ai.ml.entities.Component]
+        """
+        res: Union[str, Component] = self._component
+        return res
+
+    @property
+    def settings(self) -> Optional[PipelineJobSettings]:
+        """Settings of the pipeline.
+
+        Note: settings is available only when create node as a job.
+            i.e. ml_client.jobs.create_or_update(node).
+
+        :return: Settings of the pipeline.
+        :rtype: ~azure.ai.ml.entities.PipelineJobSettings
+        """
+        if self._settings is None:
+            self._settings = PipelineJobSettings()
+        return self._settings
+
+    @settings.setter
+    def settings(self, value: Union[PipelineJobSettings, Dict]) -> None:
+        """Set the settings of the pipeline.
+
+        :param value: The settings of the pipeline.
+        :type value: Union[~azure.ai.ml.entities.PipelineJobSettings, dict]
+        :raises TypeError: If the value is not an instance of PipelineJobSettings or a dict.
+        """
+        if value is not None:
+            if isinstance(value, PipelineJobSettings):
+                # since PipelineJobSettings inherit _AttrDict, we need add this branch to distinguish with dict
+                pass
+            elif isinstance(value, dict):
+                value = PipelineJobSettings(**value)
+            else:
+                raise TypeError("settings must be PipelineJobSettings or dict but got {}".format(type(value)))
+        self._settings = value
+
+    @classmethod
+    def _get_supported_inputs_types(cls) -> None:
+        # Return None here to skip validation,
+        # as input could be custom class object(parameter group).
+        return None
+
+    @property
+    def _skip_required_compute_missing_validation(self) -> bool:
+        return True
+
+    @classmethod
+    def _get_skip_fields_in_schema_validation(cls) -> List[str]:
+        # pipeline component must be a file reference when loading from yaml,
+        # so the created object can't pass schema validation.
+        return ["component"]
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        # Use local import to avoid recursive reference as BaseNode is imported in PipelineComponent.
+        from azure.ai.ml.entities import PipelineComponent
+
+        return {
+            "component": (str, PipelineComponent),
+        }
+
+    def _to_job(self) -> "PipelineJob":
+        from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
+
+        return PipelineJob(
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            tags=self.tags,
+            properties=self.properties,
+            # Filter None out to avoid case below failed with conflict keys check:
+            # group: None (user not specified)
+            # group.xx: 1 (user specified
+            inputs={k: v for k, v in self._job_inputs.items() if v},
+            outputs=self._job_outputs,
+            component=self.component,
+            settings=self.settings,
+        )
+
+    def _customized_validate(self) -> MutableValidationResult:
+        """Check unsupported settings when use as a node.
+
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        # Note: settings is not supported on node,
+        # jobs.create_or_update(node) will call node._to_job() at first,
+        # thus won't reach here.
+        # pylint: disable=protected-access
+        from azure.ai.ml.entities import PipelineComponent
+
+        validation_result = super(Pipeline, self)._customized_validate()
+        ignored_keys = PipelineComponent._check_ignored_keys(self)
+        if ignored_keys:
+            validation_result.append_warning(message=f"{ignored_keys} ignored on node {self.name!r}.")
+        if isinstance(self.component, PipelineComponent):
+            validation_result.merge_with(self.component._customized_validate())
+        return validation_result
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj: Dict = super()._to_rest_object(**kwargs)
+        rest_obj.update(
+            convert_ordered_dict_to_dict(
+                {
+                    "componentId": self._get_component_id(),
+                }
+            )
+        )
+        return rest_obj
+
+    def _build_inputs(self) -> Dict:
+        inputs = super(Pipeline, self)._build_inputs()
+        built_inputs = {}
+        # Validate and remove non-specified inputs
+        for key, value in inputs.items():
+            if value is not None:
+                built_inputs[key] = value
+        return built_inputs
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline.pipeline_component import PipelineSchema
+
+        return PipelineSchema(context=context)
+
+    def _copy_pipeline_component_out_setting_to_node(self) -> None:
+        """Copy pipeline component output's setting to node level."""
+        from azure.ai.ml.entities import PipelineComponent
+        from azure.ai.ml.entities._job.pipeline._io import NodeOutput
+
+        if not isinstance(self.component, PipelineComponent):
+            return
+        for key, val in self.component.outputs.items():
+            node_output = cast(NodeOutput, self.outputs.get(key))
+            copy_output_setting(source=val, target=node_output)
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job":
+        raise NotImplementedError()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark.py
new file mode 100644
index 00000000..e72f1334
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark.py
@@ -0,0 +1,663 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, too-many-instance-attributes
+
+import copy
+import logging
+import re
+from enum import Enum
+from os import PathLike, path
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple, Union, cast
+
+from marshmallow import INCLUDE, Schema
+
+from ..._restclient.v2023_04_01_preview.models import JobBase as JobBaseData
+from ..._restclient.v2023_04_01_preview.models import SparkJob as RestSparkJob
+from ..._schema import NestedField, PathAwareSchema, UnionField
+from ..._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema
+from ..._schema.job.parameterized_spark import CONF_KEY_MAP
+from ..._schema.job.spark_job import SparkJobSchema
+from ..._utils.utils import is_url
+from ...constants._common import (
+    ARM_ID_PREFIX,
+    BASE_PATH_CONTEXT_KEY,
+    REGISTRY_URI_FORMAT,
+    SPARK_ENVIRONMENT_WARNING_MESSAGE,
+)
+from ...constants._component import NodeType
+from ...constants._job.job import SparkConfKey
+from ...entities._assets import Environment
+from ...entities._component.component import Component
+from ...entities._component.spark_component import SparkComponent
+from ...entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+    _BaseJobIdentityConfiguration,
+)
+from ...entities._inputs_outputs import Input, Output
+from ...entities._job._input_output_helpers import (
+    from_rest_data_outputs,
+    from_rest_inputs_to_dataset_literal,
+    validate_inputs_for_args,
+)
+from ...entities._job.spark_job import SparkJob
+from ...entities._job.spark_job_entry import SparkJobEntryType
+from ...entities._job.spark_resource_configuration import SparkResourceConfiguration
+from ...entities._validation import MutableValidationResult
+from ...exceptions import ErrorCategory, ErrorTarget, ValidationException
+from .._job.pipeline._io import NodeOutput
+from .._job.spark_helpers import (
+    _validate_compute_or_resources,
+    _validate_input_output_mode,
+    _validate_spark_configurations,
+)
+from .._job.spark_job_entry_mixin import SparkJobEntry, SparkJobEntryMixin
+from .._util import convert_ordered_dict_to_dict, get_rest_dict_for_node_attrs, load_from_dict, validate_attribute_type
+from .base_node import BaseNode
+
+module_logger = logging.getLogger(__name__)
+
+
+class Spark(BaseNode, SparkJobEntryMixin):
+    """Base class for spark node, used for spark component version consumption.
+
+    You should not instantiate this class directly. Instead, you should
+    create it from the builder function: spark.
+
+    :param component: The ID or instance of the Spark component or job to be run during the step.
+    :type component: Union[str, ~azure.ai.ml.entities.SparkComponent]
+    :param identity: The identity that the Spark job will use while running on compute.
+    :type identity: Union[Dict[str, str],
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration
+
+    ]
+
+    :param driver_cores: The number of cores to use for the driver process, only in cluster mode.
+    :type driver_cores: int
+    :param driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
+        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+    :type driver_memory: str
+    :param executor_cores: The number of cores to use on each executor.
+    :type executor_cores: int
+    :param executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
+        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+    :type executor_memory: str
+    :param executor_instances: The initial number of executors.
+    :type executor_instances: int
+    :param dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
+        executors registered with this application up and down based on the workload.
+    :type dynamic_allocation_enabled: bool
+    :param dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation
+        is enabled.
+    :type dynamic_allocation_min_executors: int
+    :param dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation
+        is enabled.
+    :type dynamic_allocation_max_executors: int
+    :param conf: A dictionary with pre-defined Spark configurations key and values.
+    :type conf: Dict[str, str]
+    :param inputs: A mapping of input names to input data sources used in the job.
+    :type inputs: Dict[str, Union[
+        str,
+        bool,
+        int,
+        float,
+        Enum,
+        ~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
+        ~azure.ai.ml.Input
+
+    ]]
+
+    :param outputs: A mapping of output names to output data sources used in the job.
+    :type outputs: Dict[str, Union[str, ~azure.ai.ml.Output]]
+    :param args: The arguments for the job.
+    :type args: str
+    :param compute: The compute resource the job runs on.
+    :type compute: str
+    :param resources: The compute resource configuration for the job.
+    :type resources: Union[Dict, ~azure.ai.ml.entities.SparkResourceConfiguration]
+    :param entry: The file or class entry point.
+    :type entry: Dict[str, str]
+    :param py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps.
+    :type py_files: List[str]
+    :param jars: The list of .JAR files to include on the driver and executor classpaths.
+    :type jars: List[str]
+    :param files: The list of files to be placed in the working directory of each executor.
+    :type files: List[str]
+    :param archives: The list of archives to be extracted into the working directory of each executor.
+    :type archives: List[str]
+    """
+
+    def __init__(
+        self,
+        *,
+        component: Union[str, SparkComponent],
+        identity: Optional[
+            Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
+        ] = None,
+        driver_cores: Optional[Union[int, str]] = None,
+        driver_memory: Optional[str] = None,
+        executor_cores: Optional[Union[int, str]] = None,
+        executor_memory: Optional[str] = None,
+        executor_instances: Optional[Union[int, str]] = None,
+        dynamic_allocation_enabled: Optional[Union[bool, str]] = None,
+        dynamic_allocation_min_executors: Optional[Union[int, str]] = None,
+        dynamic_allocation_max_executors: Optional[Union[int, str]] = None,
+        conf: Optional[Dict[str, str]] = None,
+        inputs: Optional[
+            Dict[
+                str,
+                Union[
+                    NodeOutput,
+                    Input,
+                    str,
+                    bool,
+                    int,
+                    float,
+                    Enum,
+                    "Input",
+                ],
+            ]
+        ] = None,
+        outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
+        compute: Optional[str] = None,
+        resources: Optional[Union[Dict, SparkResourceConfiguration]] = None,
+        entry: Union[Dict[str, str], SparkJobEntry, None] = None,
+        py_files: Optional[List[str]] = None,
+        jars: Optional[List[str]] = None,
+        files: Optional[List[str]] = None,
+        archives: Optional[List[str]] = None,
+        args: Optional[str] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+        kwargs.pop("type", None)
+
+        BaseNode.__init__(
+            self, type=NodeType.SPARK, inputs=inputs, outputs=outputs, component=component, compute=compute, **kwargs
+        )
+
+        # init mark for _AttrDict
+        self._init = True
+        SparkJobEntryMixin.__init__(self, entry=entry)
+        self.conf = conf
+        self.driver_cores = driver_cores
+        self.driver_memory = driver_memory
+        self.executor_cores = executor_cores
+        self.executor_memory = executor_memory
+        self.executor_instances = executor_instances
+        self.dynamic_allocation_enabled = dynamic_allocation_enabled
+        self.dynamic_allocation_min_executors = dynamic_allocation_min_executors
+        self.dynamic_allocation_max_executors = dynamic_allocation_max_executors
+
+        is_spark_component = isinstance(component, SparkComponent)
+        if is_spark_component:
+            # conf is dict and we need copy component conf here, otherwise node conf setting will affect component
+            # setting
+            _component = cast(SparkComponent, component)
+            self.conf = self.conf or copy.copy(_component.conf)
+            self.driver_cores = self.driver_cores or _component.driver_cores
+            self.driver_memory = self.driver_memory or _component.driver_memory
+            self.executor_cores = self.executor_cores or _component.executor_cores
+            self.executor_memory = self.executor_memory or _component.executor_memory
+            self.executor_instances = self.executor_instances or _component.executor_instances
+            self.dynamic_allocation_enabled = self.dynamic_allocation_enabled or _component.dynamic_allocation_enabled
+            self.dynamic_allocation_min_executors = (
+                self.dynamic_allocation_min_executors or _component.dynamic_allocation_min_executors
+            )
+            self.dynamic_allocation_max_executors = (
+                self.dynamic_allocation_max_executors or _component.dynamic_allocation_max_executors
+            )
+        if self.executor_instances is None and str(self.dynamic_allocation_enabled).lower() == "true":
+            self.executor_instances = self.dynamic_allocation_min_executors
+        # When create standalone job or pipeline job, following fields will always get value from component or get
+        # default None, because we will not pass those fields to Spark. But in following cases, we expect to get
+        # correct value from spark._from_rest_object() and then following fields will get from their respective
+        # keyword arguments.
+        # 1. when we call regenerated_spark_node=Spark._from_rest_object(spark_node._to_rest_object()) in local test,
+        # we expect regenerated_spark_node and spark_node are identical.
+        # 2.when get created remote job through Job._from_rest_object(result) in job operation where component is an
+        # arm_id, we expect get remote returned values.
+        # 3.when we load a remote job, component now is an arm_id, we need get entry from node level returned from
+        # service
+        self.entry = _component.entry if is_spark_component else entry
+        self.py_files = _component.py_files if is_spark_component else py_files
+        self.jars = _component.jars if is_spark_component else jars
+        self.files = _component.files if is_spark_component else files
+        self.archives = _component.archives if is_spark_component else archives
+        self.args = _component.args if is_spark_component else args
+        self.environment: Any = _component.environment if is_spark_component else None
+
+        self.resources = resources
+        self.identity = identity
+        self._swept = False
+        self._init = False
+
+    @classmethod
+    def _get_supported_outputs_types(cls) -> Tuple:
+        return str, Output
+
+    @property
+    def component(self) -> Union[str, SparkComponent]:
+        """The ID or instance of the Spark component or job to be run during the step.
+
+        :rtype: ~azure.ai.ml.entities.SparkComponent
+        """
+        res: Union[str, SparkComponent] = self._component
+        return res
+
+    @property
+    def resources(self) -> Optional[Union[Dict, SparkResourceConfiguration]]:
+        """The compute resource configuration for the job.
+
+        :rtype: ~azure.ai.ml.entities.SparkResourceConfiguration
+        """
+        return self._resources  # type: ignore
+
+    @resources.setter
+    def resources(self, value: Optional[Union[Dict, SparkResourceConfiguration]]) -> None:
+        """Sets the compute resource configuration for the job.
+
+        :param value: The compute resource configuration for the job.
+        :type value: Union[Dict[str, str], ~azure.ai.ml.entities.SparkResourceConfiguration]
+        """
+        if isinstance(value, dict):
+            value = SparkResourceConfiguration(**value)
+        self._resources = value
+
+    @property
+    def identity(
+        self,
+    ) -> Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]:
+        """The identity that the Spark job will use while running on compute.
+
+        :rtype: Union[~azure.ai.ml.entities.ManagedIdentityConfiguration, ~azure.ai.ml.entities.AmlTokenConfiguration,
+            ~azure.ai.ml.entities.UserIdentityConfiguration]
+        """
+        # If there is no identity from CLI/SDK input: for jobs running on synapse compute (MLCompute Clusters), the
+        # managed identity is the default; for jobs running on clusterless, the user identity should be the default,
+        # otherwise use user input identity.
+        if self._identity is None:
+            if self.compute is not None:
+                return ManagedIdentityConfiguration()
+            if self.resources is not None:
+                return UserIdentityConfiguration()
+        return self._identity
+
+    @identity.setter
+    def identity(
+        self,
+        value: Union[Dict[str, str], ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration],
+    ) -> None:
+        """Sets the identity that the Spark job will use while running on compute.
+
+        :param value: The identity that the Spark job will use while running on compute.
+        :type value: Union[Dict[str, str], ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+            ~azure.ai.ml.entities.AmlTokenConfiguration, ~azure.ai.ml.entities.UserIdentityConfiguration]
+        """
+        if isinstance(value, dict):
+            identify_schema = UnionField(
+                [
+                    NestedField(ManagedIdentitySchema, unknown=INCLUDE),
+                    NestedField(AMLTokenIdentitySchema, unknown=INCLUDE),
+                    NestedField(UserIdentitySchema, unknown=INCLUDE),
+                ]
+            )
+            value = identify_schema._deserialize(value=value, attr=None, data=None)
+        self._identity = value
+
+    @property
+    def code(self) -> Optional[Union[str, PathLike]]:
+        """The local or remote path pointing at source code.
+
+        :rtype: Union[str, PathLike]
+        """
+        if isinstance(self.component, Component):
+            _code: Optional[Union[str, PathLike]] = self.component.code
+            return _code
+        return None
+
+    @code.setter
+    def code(self, value: str) -> None:
+        """Sets the source code to be used for the job.
+
+        :param value: The local or remote path pointing at source code.
+        :type value: Union[str, PathLike]
+        """
+        if isinstance(self.component, Component):
+            self.component.code = value
+        else:
+            msg = "Can't set code property for a registered component {}"
+            raise ValidationException(
+                message=msg.format(self.component),
+                no_personal_data_message=msg.format(self.component),
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: dict) -> Dict:
+        obj = super()._from_rest_object_to_init_params(obj)
+
+        if "resources" in obj and obj["resources"]:
+            obj["resources"] = SparkResourceConfiguration._from_rest_object(obj["resources"])
+
+        if "identity" in obj and obj["identity"]:
+            obj["identity"] = _BaseJobIdentityConfiguration._from_rest_object(obj["identity"])
+
+        if "entry" in obj and obj["entry"]:
+            obj["entry"] = SparkJobEntry._from_rest_object(obj["entry"])
+        if "conf" in obj and obj["conf"]:
+            # get conf setting value from conf
+            for field_name, _ in CONF_KEY_MAP.items():
+                value = obj["conf"].get(field_name, None)
+                if value is not None:
+                    obj[field_name] = value
+
+        return obj
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Spark":
+        from .spark_func import spark
+
+        loaded_data = load_from_dict(SparkJobSchema, data, context, additional_message, **kwargs)
+        spark_job: Spark = spark(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
+
+        return spark_job
+
+    @classmethod
+    def _load_from_rest_job(cls, obj: JobBaseData) -> "Spark":
+        from .spark_func import spark
+
+        rest_spark_job: RestSparkJob = obj.properties
+        rest_spark_conf = copy.copy(rest_spark_job.conf) or {}
+
+        spark_job: Spark = spark(
+            name=obj.name,
+            id=obj.id,
+            entry=SparkJobEntry._from_rest_object(rest_spark_job.entry),
+            display_name=rest_spark_job.display_name,
+            description=rest_spark_job.description,
+            tags=rest_spark_job.tags,
+            properties=rest_spark_job.properties,
+            experiment_name=rest_spark_job.experiment_name,
+            services=rest_spark_job.services,
+            status=rest_spark_job.status,
+            creation_context=obj.system_data,
+            code=rest_spark_job.code_id,
+            compute=rest_spark_job.compute_id,
+            environment=rest_spark_job.environment_id,
+            identity=(
+                _BaseJobIdentityConfiguration._from_rest_object(rest_spark_job.identity)
+                if rest_spark_job.identity
+                else None
+            ),
+            args=rest_spark_job.args,
+            conf=rest_spark_conf,
+            driver_cores=rest_spark_conf.get(
+                SparkConfKey.DRIVER_CORES, None
+            ),  # copy fields from conf into the promote attribute in spark
+            driver_memory=rest_spark_conf.get(SparkConfKey.DRIVER_MEMORY, None),
+            executor_cores=rest_spark_conf.get(SparkConfKey.EXECUTOR_CORES, None),
+            executor_memory=rest_spark_conf.get(SparkConfKey.EXECUTOR_MEMORY, None),
+            executor_instances=rest_spark_conf.get(SparkConfKey.EXECUTOR_INSTANCES, None),
+            dynamic_allocation_enabled=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None),
+            dynamic_allocation_min_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None),
+            dynamic_allocation_max_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None),
+            resources=SparkResourceConfiguration._from_rest_object(rest_spark_job.resources),
+            inputs=from_rest_inputs_to_dataset_literal(rest_spark_job.inputs),
+            outputs=from_rest_data_outputs(rest_spark_job.outputs),
+        )
+        return spark_job
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            # hack: allow use InternalSparkComponent as component
+            # "component": (str, SparkComponent),
+            "environment": (str, Environment),
+            "resources": (dict, SparkResourceConfiguration),
+            "code": (str, PathLike),
+        }
+
+    @property
+    def _skip_required_compute_missing_validation(self) -> bool:
+        return self.resources is not None
+
+    def _to_job(self) -> SparkJob:
+        if isinstance(self.component, SparkComponent):
+            return SparkJob(
+                experiment_name=self.experiment_name,
+                name=self.name,
+                display_name=self.display_name,
+                description=self.description,
+                tags=self.tags,
+                code=self.component.code,
+                entry=self.entry,
+                py_files=self.py_files,
+                jars=self.jars,
+                files=self.files,
+                archives=self.archives,
+                identity=self.identity,
+                driver_cores=self.driver_cores,
+                driver_memory=self.driver_memory,
+                executor_cores=self.executor_cores,
+                executor_memory=self.executor_memory,
+                executor_instances=self.executor_instances,
+                dynamic_allocation_enabled=self.dynamic_allocation_enabled,
+                dynamic_allocation_min_executors=self.dynamic_allocation_min_executors,
+                dynamic_allocation_max_executors=self.dynamic_allocation_max_executors,
+                conf=self.conf,
+                environment=self.environment,
+                status=self.status,
+                inputs=self._job_inputs,
+                outputs=self._job_outputs,
+                services=self.services,
+                args=self.args,
+                compute=self.compute,
+                resources=self.resources,
+            )
+
+        return SparkJob(
+            experiment_name=self.experiment_name,
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            tags=self.tags,
+            code=self.component,
+            entry=self.entry,
+            py_files=self.py_files,
+            jars=self.jars,
+            files=self.files,
+            archives=self.archives,
+            identity=self.identity,
+            driver_cores=self.driver_cores,
+            driver_memory=self.driver_memory,
+            executor_cores=self.executor_cores,
+            executor_memory=self.executor_memory,
+            executor_instances=self.executor_instances,
+            dynamic_allocation_enabled=self.dynamic_allocation_enabled,
+            dynamic_allocation_min_executors=self.dynamic_allocation_min_executors,
+            dynamic_allocation_max_executors=self.dynamic_allocation_max_executors,
+            conf=self.conf,
+            environment=self.environment,
+            status=self.status,
+            inputs=self._job_inputs,
+            outputs=self._job_outputs,
+            services=self.services,
+            args=self.args,
+            compute=self.compute,
+            resources=self.resources,
+        )
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline import SparkSchema
+
+        return SparkSchema(context=context)
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return [
+            "type",
+            "resources",
+            "py_files",
+            "jars",
+            "files",
+            "archives",
+            "identity",
+            "conf",
+            "args",
+        ]
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj: dict = super()._to_rest_object(**kwargs)
+        rest_obj.update(
+            convert_ordered_dict_to_dict(
+                {
+                    "componentId": self._get_component_id(),
+                    "identity": get_rest_dict_for_node_attrs(self.identity),
+                    "resources": get_rest_dict_for_node_attrs(self.resources),
+                    "entry": get_rest_dict_for_node_attrs(self.entry),
+                }
+            )
+        )
+        return rest_obj
+
+    def _build_inputs(self) -> dict:
+        inputs = super(Spark, self)._build_inputs()
+        built_inputs = {}
+        # Validate and remove non-specified inputs
+        for key, value in inputs.items():
+            if value is not None:
+                built_inputs[key] = value
+        return built_inputs
+
+    def _customized_validate(self) -> MutableValidationResult:
+        result = super()._customized_validate()
+        if (
+            isinstance(self.component, SparkComponent)
+            and isinstance(self.component._environment, Environment)
+            and self.component._environment.image is not None
+        ):
+            result.append_warning(
+                yaml_path="environment.image",
+                message=SPARK_ENVIRONMENT_WARNING_MESSAGE,
+            )
+        result.merge_with(self._validate_entry_exist())
+        result.merge_with(self._validate_fields())
+        return result
+
+    def _validate_entry_exist(self) -> MutableValidationResult:
+        is_remote_code = isinstance(self.code, str) and (
+            self.code.startswith("git+")
+            or self.code.startswith(REGISTRY_URI_FORMAT)
+            or self.code.startswith(ARM_ID_PREFIX)
+            or is_url(self.code)
+            or bool(self.CODE_ID_RE_PATTERN.match(self.code))
+        )
+        validation_result = self._create_empty_validation_result()
+        # validate whether component entry exists to ensure code path is correct, especially when code is default value
+        if self.code is None or is_remote_code or not isinstance(self.entry, SparkJobEntry):
+            # skip validate when code is not a local path or code is None, or self.entry is not SparkJobEntry object
+            pass
+        else:
+            if not path.isabs(self.code):
+                _component: SparkComponent = self.component  # type: ignore
+                code_path = Path(_component.base_path) / self.code
+                if code_path.exists():
+                    code_path = code_path.resolve().absolute()
+                else:
+                    validation_result.append_error(
+                        message=f"Code path {code_path} doesn't exist.", yaml_path="component.code"
+                    )
+                entry_path = code_path / self.entry.entry
+            else:
+                entry_path = Path(self.code) / self.entry.entry
+
+            if (
+                isinstance(self.entry, SparkJobEntry)
+                and self.entry.entry_type == SparkJobEntryType.SPARK_JOB_FILE_ENTRY
+            ):
+                if not entry_path.exists():
+                    validation_result.append_error(
+                        message=f"Entry {entry_path} doesn't exist.", yaml_path="component.entry"
+                    )
+        return validation_result
+
+    def _validate_fields(self) -> MutableValidationResult:
+        validation_result = self._create_empty_validation_result()
+        try:
+            _validate_compute_or_resources(self.compute, self.resources)
+        except ValidationException as e:
+            validation_result.append_error(message=str(e), yaml_path="resources")
+            validation_result.append_error(message=str(e), yaml_path="compute")
+
+        try:
+            _validate_input_output_mode(self.inputs, self.outputs)
+        except ValidationException as e:
+            msg = str(e)
+            m = re.match(r"(Input|Output) '(\w+)'", msg)
+            if m:
+                io_type, io_name = m.groups()
+                if io_type == "Input":
+                    validation_result.append_error(message=msg, yaml_path=f"inputs.{io_name}")
+                else:
+                    validation_result.append_error(message=msg, yaml_path=f"outputs.{io_name}")
+
+        try:
+            _validate_spark_configurations(self)
+        except ValidationException as e:
+            validation_result.append_error(message=str(e), yaml_path="conf")
+
+        try:
+            self._validate_entry()
+        except ValidationException as e:
+            validation_result.append_error(message=str(e), yaml_path="entry")
+
+        if self.args:
+            try:
+                validate_inputs_for_args(self.args, self.inputs)
+            except ValidationException as e:
+                validation_result.append_error(message=str(e), yaml_path="args")
+        return validation_result
+
+    # pylint: disable-next=docstring-missing-param
+    def __call__(self, *args: Any, **kwargs: Any) -> "Spark":
+        """Call Spark as a function will return a new instance each time.
+
+        :return: A Spark object
+        :rtype: Spark
+        """
+        if isinstance(self._component, Component):
+            # call this to validate inputs
+            node: Spark = self._component(*args, **kwargs)
+            # merge inputs
+            for name, original_input in self.inputs.items():
+                if name not in kwargs:
+                    # use setattr here to make sure owner of input won't change
+                    setattr(node.inputs, name, original_input._data)
+                    node._job_inputs[name] = original_input._data
+                # get outputs
+            for name, original_output in self.outputs.items():
+                # use setattr here to make sure owner of output won't change
+                if not isinstance(original_output, str):
+                    setattr(node.outputs, name, original_output._data)
+            self._refine_optional_inputs_with_no_value(node, kwargs)
+            node._name = self.name
+            node.compute = self.compute
+            node.environment = copy.deepcopy(self.environment)
+            node.resources = copy.deepcopy(self.resources)
+            return node
+
+        msg = "Spark can be called as a function only when referenced component is {}, currently got {}."
+        raise ValidationException(
+            message=msg.format(type(Component), self._component),
+            no_personal_data_message=msg.format(type(Component), "self._component"),
+            target=ErrorTarget.SPARK_JOB,
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py
new file mode 100644
index 00000000..342f8c44
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py
@@ -0,0 +1,306 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, too-many-locals
+
+import os
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import AmlToken, ManagedIdentity, UserIdentity
+from azure.ai.ml.constants._common import AssetTypes
+from azure.ai.ml.constants._component import ComponentSource
+from azure.ai.ml.entities import Environment
+from azure.ai.ml.entities._component.spark_component import SparkComponent
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
+from azure.ai.ml.entities._job.spark_job_entry import SparkJobEntry
+from azure.ai.ml.entities._job.spark_resource_configuration import SparkResourceConfiguration
+from azure.ai.ml.exceptions import ErrorTarget, ValidationException
+
+from .spark import Spark
+
+SUPPORTED_INPUTS = [AssetTypes.URI_FILE, AssetTypes.URI_FOLDER, AssetTypes.MLTABLE]
+
+
+def _parse_input(input_value: Union[Input, dict, str, bool, int, float]) -> Tuple:
+    component_input = None
+    job_input: Union[Input, dict, str, bool, int, float] = ""
+
+    if isinstance(input_value, Input):
+        component_input = Input(**input_value._to_dict())
+        input_type = input_value.type
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value._to_dict())
+    elif isinstance(input_value, dict):
+        # if user provided dict, we try to parse it to Input.
+        # for job input, only parse for path type
+        input_type = input_value.get("type", None)
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value)
+        component_input = Input(**input_value)
+    elif isinstance(input_value, (str, bool, int, float)):
+        # Input bindings are not supported
+        component_input = ComponentTranslatableMixin._to_input_builder_function(input_value)
+        job_input = input_value
+    else:
+        msg = f"Unsupported input type: {type(input_value)}, only Input, dict, str, bool, int and float are supported."
+        raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB)
+    return component_input, job_input
+
+
+def _parse_output(output_value: Union[Output, dict]) -> Tuple:
+    component_output = None
+    job_output: Union[Output, dict] = {}
+
+    if isinstance(output_value, Output):
+        component_output = Output(**output_value._to_dict())
+        job_output = Output(**output_value._to_dict())
+    elif not output_value:
+        # output value can be None or empty dictionary
+        # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder
+        component_output = ComponentTranslatableMixin._to_output(output_value)
+        job_output = output_value
+    elif isinstance(output_value, dict):  # When output value is a non-empty dictionary
+        job_output = Output(**output_value)
+        component_output = Output(**output_value)
+    elif isinstance(output_value, str):  # When output is passed in from pipeline job yaml
+        job_output = output_value
+    else:
+        msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported."
+        raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB)
+    return component_output, job_output
+
+
+def _parse_inputs_outputs(io_dict: Dict, parse_func: Callable) -> Tuple[Dict, Dict]:
+    component_io_dict, job_io_dict = {}, {}
+    if io_dict:
+        for key, val in io_dict.items():
+            component_io, job_io = parse_func(val)
+            component_io_dict[key] = component_io
+            job_io_dict[key] = job_io
+    return component_io_dict, job_io_dict
+
+
+def spark(
+    *,
+    experiment_name: Optional[str] = None,
+    name: Optional[str] = None,
+    display_name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    code: Optional[Union[str, os.PathLike]] = None,
+    entry: Union[Dict[str, str], SparkJobEntry, None] = None,
+    py_files: Optional[List[str]] = None,
+    jars: Optional[List[str]] = None,
+    files: Optional[List[str]] = None,
+    archives: Optional[List[str]] = None,
+    identity: Optional[Union[Dict[str, str], ManagedIdentity, AmlToken, UserIdentity]] = None,
+    driver_cores: Optional[int] = None,
+    driver_memory: Optional[str] = None,
+    executor_cores: Optional[int] = None,
+    executor_memory: Optional[str] = None,
+    executor_instances: Optional[int] = None,
+    dynamic_allocation_enabled: Optional[bool] = None,
+    dynamic_allocation_min_executors: Optional[int] = None,
+    dynamic_allocation_max_executors: Optional[int] = None,
+    conf: Optional[Dict[str, str]] = None,
+    environment: Optional[Union[str, Environment]] = None,
+    inputs: Optional[Dict] = None,
+    outputs: Optional[Dict] = None,
+    args: Optional[str] = None,
+    compute: Optional[str] = None,
+    resources: Optional[Union[Dict, SparkResourceConfiguration]] = None,
+    **kwargs: Any,
+) -> Spark:
+    """Creates a Spark object which can be used inside a dsl.pipeline function or used as a standalone Spark job.
+
+    :keyword experiment_name:  The name of the experiment the job will be created under.
+    :paramtype experiment_name: Optional[str]
+    :keyword name: The name of the job.
+    :paramtype name: Optional[str]
+    :keyword display_name: The job display name.
+    :paramtype display_name: Optional[str]
+    :keyword description: The description of the job. Defaults to None.
+    :paramtype description: Optional[str]
+    :keyword tags: The dictionary of tags for the job. Tags can be added, removed, and updated. Defaults to None.
+    :paramtype tags: Optional[dict[str, str]]
+    :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url
+        pointing to a remote location.
+    :type code: Optional[Union[str, os.PathLike]]
+    :keyword entry: The file or class entry point.
+    :paramtype entry: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkJobEntry]]
+    :keyword py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps.
+        Defaults to None.
+    :paramtype py_files: Optional[List[str]]
+    :keyword jars: The list of .JAR files to include on the driver and executor classpaths. Defaults to None.
+    :paramtype jars: Optional[List[str]]
+    :keyword files: The list of files to be placed in the working directory of each executor. Defaults to None.
+    :paramtype files: Optional[List[str]]
+    :keyword archives: The list of archives to be extracted into the working directory of each executor.
+        Defaults to None.
+    :paramtype archives: Optional[List[str]]
+    :keyword identity: The identity that the Spark job will use while running on compute.
+    :paramtype identity: Optional[Union[
+        dict[str, str],
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration]]
+    :keyword driver_cores: The number of cores to use for the driver process, only in cluster mode.
+    :paramtype driver_cores: Optional[int]
+    :keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
+        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+    :paramtype driver_memory: Optional[str]
+    :keyword executor_cores: The number of cores to use on each executor.
+    :paramtype executor_cores: Optional[int]
+    :keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
+        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+    :paramtype executor_memory: Optional[str]
+    :keyword executor_instances: The initial number of executors.
+    :paramtype executor_instances: Optional[int]
+    :keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
+        executors registered with this application up and down based on the workload.
+    :paramtype dynamic_allocation_enabled: Optional[bool]
+    :keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is
+        enabled.
+    :paramtype dynamic_allocation_min_executors: Optional[int]
+    :keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is
+        enabled.
+    :paramtype dynamic_allocation_max_executors: Optional[int]
+    :keyword conf: A dictionary with pre-defined Spark configurations key and values. Defaults to None.
+    :paramtype conf: Optional[dict[str, str]]
+    :keyword environment: The Azure ML environment to run the job in.
+    :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+    :keyword inputs: A mapping of input names to input data used in the job. Defaults to None.
+    :paramtype inputs: Optional[dict[str, ~azure.ai.ml.Input]]
+    :keyword outputs: A mapping of output names to output data used in the job. Defaults to None.
+    :paramtype outputs: Optional[dict[str, ~azure.ai.ml.Output]]
+    :keyword args: The arguments for the job.
+    :paramtype args: Optional[str]
+    :keyword compute: The compute resource the job runs on.
+    :paramtype compute: Optional[str]
+    :keyword resources: The compute resource configuration for the job.
+    :paramtype resources: Optional[Union[dict, ~azure.ai.ml.entities.SparkResourceConfiguration]]
+    :return: A Spark object.
+    :rtype: ~azure.ai.ml.entities.Spark
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_spark_configurations.py
+            :start-after: [START spark_function_configuration_1]
+            :end-before: [END spark_function_configuration_1]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a SparkJob.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_spark_configurations.py
+            :start-after: [START spark_function_configuration_2]
+            :end-before: [END spark_function_configuration_2]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a SparkJob.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_spark_configurations.py
+            :start-after: [START spark_dsl_pipeline]
+            :end-before: [END spark_dsl_pipeline]
+            :language: python
+            :dedent: 8
+            :caption: Building a Spark pipeline using the DSL pipeline decorator
+    """
+
+    inputs = inputs or {}
+    outputs = outputs or {}
+    component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+    component = kwargs.pop("component", None)
+
+    if component is None:
+        component = SparkComponent(
+            name=name,
+            display_name=display_name,
+            tags=tags,
+            description=description,
+            code=code,
+            entry=entry,
+            py_files=py_files,
+            jars=jars,
+            files=files,
+            archives=archives,
+            driver_cores=driver_cores,
+            driver_memory=driver_memory,
+            executor_cores=executor_cores,
+            executor_memory=executor_memory,
+            executor_instances=executor_instances,
+            dynamic_allocation_enabled=dynamic_allocation_enabled,
+            dynamic_allocation_min_executors=dynamic_allocation_min_executors,
+            dynamic_allocation_max_executors=dynamic_allocation_max_executors,
+            conf=conf,
+            environment=environment,
+            inputs=component_inputs,
+            outputs=component_outputs,
+            args=args,
+            _source=ComponentSource.BUILDER,
+            **kwargs,
+        )
+    if isinstance(component, SparkComponent):
+        spark_obj = Spark(
+            experiment_name=experiment_name,
+            name=name,
+            display_name=display_name,
+            tags=tags,
+            description=description,
+            component=component,
+            identity=identity,
+            driver_cores=driver_cores,
+            driver_memory=driver_memory,
+            executor_cores=executor_cores,
+            executor_memory=executor_memory,
+            executor_instances=executor_instances,
+            dynamic_allocation_enabled=dynamic_allocation_enabled,
+            dynamic_allocation_min_executors=dynamic_allocation_min_executors,
+            dynamic_allocation_max_executors=dynamic_allocation_max_executors,
+            conf=conf,
+            inputs=job_inputs,
+            outputs=job_outputs,
+            compute=compute,
+            resources=resources,
+            **kwargs,
+        )
+    else:
+        # when we load a remote job, component now is an arm_id, we need get entry from node level returned from
+        # service
+        spark_obj = Spark(
+            experiment_name=experiment_name,
+            name=name,
+            display_name=display_name,
+            tags=tags,
+            description=description,
+            component=component,
+            identity=identity,
+            driver_cores=driver_cores,
+            driver_memory=driver_memory,
+            executor_cores=executor_cores,
+            executor_memory=executor_memory,
+            executor_instances=executor_instances,
+            dynamic_allocation_enabled=dynamic_allocation_enabled,
+            dynamic_allocation_min_executors=dynamic_allocation_min_executors,
+            dynamic_allocation_max_executors=dynamic_allocation_max_executors,
+            conf=conf,
+            inputs=job_inputs,
+            outputs=job_outputs,
+            compute=compute,
+            resources=resources,
+            entry=entry,
+            py_files=py_files,
+            jars=jars,
+            files=files,
+            archives=archives,
+            args=args,
+            **kwargs,
+        )
+    return spark_obj
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/subcomponents.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/subcomponents.py
new file mode 100644
index 00000000..9b9ed5d2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/subcomponents.py
@@ -0,0 +1,59 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# This file contains mldesigner decorator-produced components
+# that are used within node constructors. Keep imports and
+# general complexity in this file to a minimum.
+
+from typing import List
+
+from mldesigner import Output, command_component
+
+from azure.ai.ml.constants._common import DefaultOpenEncoding
+
+
+def save_mltable_yaml(path: str, mltable_paths: List[str]) -> None:
+    """Save MLTable YAML.
+
+    :param path: The path to save the MLTable YAML file.
+    :type path: str
+    :param mltable_paths: List of paths to be included in the MLTable.
+    :type mltable_paths: List[str]
+    :raises ValueError: If the given path points to a file.
+    """
+    import os
+
+    path = os.path.abspath(path)
+
+    if os.path.isfile(path):
+        raise ValueError(f"The given path {path} points to a file.")
+
+    if not os.path.exists(path):
+        os.makedirs(path, exist_ok=True)
+
+    save_path = os.path.join(path, "MLTable")
+    # Do not touch - this is MLTable syntax that is needed to mount these paths
+    # To the MLTable's inputs
+    mltable_file_content = "\n".join(["paths:"] + [f"- folder : {path}" for path in mltable_paths])
+
+    with open(save_path, "w", encoding=DefaultOpenEncoding.WRITE) as f:
+        f.write(mltable_file_content)
+
+
+# TODO 2293610: add support for more types of outputs besides uri_folder and mltable
+@command_component()
+def create_scatter_output_table(aggregated_output: Output(type="mltable"), **kwargs: str) -> Output:  # type: ignore
+    """Create scatter output table.
+
+    This function is used by the FL scatter gather node to reduce a dynamic number of silo outputs
+    into a single input for the user-supplied aggregation step.
+
+    :param aggregated_output: The aggregated output MLTable.
+    :type aggregated_output: ~mldesigner.Output(type="mltable")
+
+    Keyword arguments represent input names and URI folder paths.
+    """
+    # kwargs keys are inputs names (ex: silo_output_silo_1)
+    # values are uri_folder paths
+    save_mltable_yaml(aggregated_output, list(kwargs.values()))
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/sweep.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/sweep.py
new file mode 100644
index 00000000..603babbe
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/sweep.py
@@ -0,0 +1,454 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+import pydash
+from marshmallow import EXCLUDE, Schema
+
+from azure.ai.ml._schema._sweep.sweep_fields_provider import EarlyTerminationField
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.constants._job.sweep import SearchSpace
+from azure.ai.ml.entities._component.command_component import CommandComponent
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+)
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.job_limits import SweepJobLimits
+from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration
+from azure.ai.ml.entities._job.pipeline._io import NodeInput
+from azure.ai.ml.entities._job.queue_settings import QueueSettings
+from azure.ai.ml.entities._job.sweep.early_termination_policy import (
+    BanditPolicy,
+    EarlyTerminationPolicy,
+    MedianStoppingPolicy,
+    TruncationSelectionPolicy,
+)
+from azure.ai.ml.entities._job.sweep.objective import Objective
+from azure.ai.ml.entities._job.sweep.parameterized_sweep import ParameterizedSweep
+from azure.ai.ml.entities._job.sweep.sampling_algorithm import SamplingAlgorithm
+from azure.ai.ml.entities._job.sweep.search_space import (
+    Choice,
+    LogNormal,
+    LogUniform,
+    Normal,
+    QLogNormal,
+    QLogUniform,
+    QNormal,
+    QUniform,
+    Randint,
+    SweepDistribution,
+    Uniform,
+)
+from azure.ai.ml.exceptions import ErrorTarget, UserErrorException, ValidationErrorType, ValidationException
+from azure.ai.ml.sweep import SweepJob
+
+from ..._restclient.v2022_10_01.models import ComponentVersion
+from ..._schema import PathAwareSchema
+from ..._schema._utils.data_binding_expression import support_data_binding_expression_for_fields
+from ..._utils.utils import camel_to_snake
+from .base_node import BaseNode
+
+module_logger = logging.getLogger(__name__)
+
+
+class Sweep(ParameterizedSweep, BaseNode):
+    """Base class for sweep node.
+
+    This class should not be instantiated directly. Instead, it should be created via the builder function: sweep.
+
+    :param trial: The ID or instance of the command component or job to be run for the step.
+    :type trial: Union[~azure.ai.ml.entities.CommandComponent, str]
+    :param compute: The compute definition containing the compute information for the step.
+    :type compute: str
+    :param limits: The limits for the sweep node.
+    :type limits: ~azure.ai.ml.sweep.SweepJobLimits
+    :param sampling_algorithm: The sampling algorithm to use to sample inside the search space.
+        Accepted values are: "random", "grid", or "bayesian".
+    :type sampling_algorithm: str
+    :param objective: The objective used to determine the target run with the local optimal
+        hyperparameter in search space.
+    :type objective: ~azure.ai.ml.sweep.Objective
+    :param early_termination_policy: The early termination policy of the sweep node.
+    :type early_termination_policy: Union[
+
+        ~azure.mgmt.machinelearningservices.models.BanditPolicy,
+        ~azure.mgmt.machinelearningservices.models.MedianStoppingPolicy,
+        ~azure.mgmt.machinelearningservices.models.TruncationSelectionPolicy
+
+    ]
+
+    :param search_space: The hyperparameter search space to run trials in.
+    :type search_space: Dict[str, Union[
+
+        ~azure.ai.ml.entities.Choice,
+        ~azure.ai.ml.entities.LogNormal,
+        ~azure.ai.ml.entities.LogUniform,
+        ~azure.ai.ml.entities.Normal,
+        ~azure.ai.ml.entities.QLogNormal,
+        ~azure.ai.ml.entities.QLogUniform,
+        ~azure.ai.ml.entities.QNormal,
+        ~azure.ai.ml.entities.QUniform,
+        ~azure.ai.ml.entities.Randint,
+        ~azure.ai.ml.entities.Uniform
+
+    ]]
+
+    :param inputs: Mapping of input data bindings used in the job.
+    :type inputs: Dict[str, Union[
+
+        ~azure.ai.ml.Input,
+
+        str,
+        bool,
+        int,
+        float
+
+    ]]
+
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: Dict[str, Union[str, ~azure.ai.ml.Output]]
+    :param identity: The identity that the training job will use while running on compute.
+    :type identity: Union[
+
+        ~azure.ai.ml.ManagedIdentityConfiguration,
+        ~azure.ai.ml.AmlTokenConfiguration,
+        ~azure.ai.ml.UserIdentityConfiguration
+
+    ]
+
+    :param queue_settings: The queue settings for the job.
+    :type queue_settings: ~azure.ai.ml.entities.QueueSettings
+    :param resources: Compute Resource configuration for the job.
+    :type resources: Optional[Union[dict, ~azure.ai.ml.entities.ResourceConfiguration]]
+    """
+
+    def __init__(
+        self,
+        *,
+        trial: Optional[Union[CommandComponent, str]] = None,
+        compute: Optional[str] = None,
+        limits: Optional[SweepJobLimits] = None,
+        sampling_algorithm: Optional[Union[str, SamplingAlgorithm]] = None,
+        objective: Optional[Objective] = None,
+        early_termination: Optional[
+            Union[BanditPolicy, MedianStoppingPolicy, TruncationSelectionPolicy, EarlyTerminationPolicy, str]
+        ] = None,
+        search_space: Optional[
+            Dict[
+                str,
+                Union[
+                    Choice, LogNormal, LogUniform, Normal, QLogNormal, QLogUniform, QNormal, QUniform, Randint, Uniform
+                ],
+            ]
+        ] = None,
+        inputs: Optional[Dict[str, Union[Input, str, bool, int, float]]] = None,
+        outputs: Optional[Dict[str, Union[str, Output]]] = None,
+        identity: Optional[
+            Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
+        ] = None,
+        queue_settings: Optional[QueueSettings] = None,
+        resources: Optional[Union[dict, JobResourceConfiguration]] = None,
+        **kwargs: Any,
+    ) -> None:
+        # TODO: get rid of self._job_inputs, self._job_outputs once we have general Input
+        self._job_inputs, self._job_outputs = inputs, outputs
+
+        kwargs.pop("type", None)
+        BaseNode.__init__(
+            self,
+            type=NodeType.SWEEP,
+            component=trial,
+            inputs=inputs,
+            outputs=outputs,
+            compute=compute,
+            **kwargs,
+        )
+        # init mark for _AttrDict
+        self._init = True
+        ParameterizedSweep.__init__(
+            self,
+            sampling_algorithm=sampling_algorithm,
+            objective=objective,
+            limits=limits,
+            early_termination=early_termination,
+            search_space=search_space,
+            queue_settings=queue_settings,
+            resources=resources,
+        )
+
+        self.identity: Any = identity
+        self._init = False
+
+    @property
+    def trial(self) -> CommandComponent:
+        """The ID or instance of the command component or job to be run for the step.
+
+        :rtype: ~azure.ai.ml.entities.CommandComponent
+        """
+        res: CommandComponent = self._component
+        return res
+
+    @property
+    def search_space(
+        self,
+    ) -> Optional[
+        Dict[
+            str,
+            Union[Choice, LogNormal, LogUniform, Normal, QLogNormal, QLogUniform, QNormal, QUniform, Randint, Uniform],
+        ]
+    ]:
+        """Dictionary of the hyperparameter search space.
+
+        Each key is the name of a hyperparameter and its value is the parameter expression.
+
+        :rtype: Dict[str, Union[~azure.ai.ml.entities.Choice, ~azure.ai.ml.entities.LogNormal,
+            ~azure.ai.ml.entities.LogUniform, ~azure.ai.ml.entities.Normal, ~azure.ai.ml.entities.QLogNormal,
+            ~azure.ai.ml.entities.QLogUniform, ~azure.ai.ml.entities.QNormal, ~azure.ai.ml.entities.QUniform,
+            ~azure.ai.ml.entities.Randint, ~azure.ai.ml.entities.Uniform]]
+        """
+        return self._search_space
+
+    @search_space.setter
+    def search_space(self, values: Dict[str, Dict[str, Union[str, int, float, dict]]]) -> None:
+        """Sets the search space for the sweep job.
+
+        :param values: The search space to set.
+        :type values: Dict[str, Dict[str, Union[str, int, float, dict]]]
+        """
+        search_space: Dict = {}
+        for name, value in values.items():
+            # If value is a SearchSpace object, directly pass it to job.search_space[name]
+            search_space[name] = self._value_type_to_class(value) if isinstance(value, dict) else value
+        self._search_space = search_space
+
+    @classmethod
+    def _value_type_to_class(cls, value: Any) -> Dict:
+        value_type = value["type"]
+        search_space_dict = {
+            SearchSpace.CHOICE: Choice,
+            SearchSpace.RANDINT: Randint,
+            SearchSpace.LOGNORMAL: LogNormal,
+            SearchSpace.NORMAL: Normal,
+            SearchSpace.LOGUNIFORM: LogUniform,
+            SearchSpace.UNIFORM: Uniform,
+            SearchSpace.QLOGNORMAL: QLogNormal,
+            SearchSpace.QNORMAL: QNormal,
+            SearchSpace.QLOGUNIFORM: QLogUniform,
+            SearchSpace.QUNIFORM: QUniform,
+        }
+
+        res: dict = search_space_dict[value_type](**value)
+        return res
+
+    @classmethod
+    def _get_supported_inputs_types(cls) -> Tuple:
+        supported_types = super()._get_supported_inputs_types() or ()
+        return (
+            SweepDistribution,
+            *supported_types,
+        )
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Sweep":
+        raise NotImplementedError("Sweep._load_from_dict is not supported")
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return [
+            "limits",
+            "sampling_algorithm",
+            "objective",
+            "early_termination",
+            "search_space",
+            "queue_settings",
+            "resources",
+        ]
+
+    def _to_rest_object(self, **kwargs: Any) -> dict:
+        rest_obj: dict = super(Sweep, self)._to_rest_object(**kwargs)
+        # hack: ParameterizedSweep.early_termination is not allowed to be None
+        for key in ["early_termination"]:
+            if key in rest_obj and rest_obj[key] is None:
+                del rest_obj[key]
+
+        # hack: only early termination policy does not follow yaml schema now, should be removed after server-side made
+        # the change
+        if "early_termination" in rest_obj:
+            _early_termination: EarlyTerminationPolicy = self.early_termination  # type: ignore
+            rest_obj["early_termination"] = _early_termination._to_rest_object().as_dict()
+
+        rest_obj.update(
+            {
+                "type": self.type,
+                "trial": self._get_trial_component_rest_obj(),
+            }
+        )
+        return rest_obj
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: dict) -> Dict:
+        obj = super()._from_rest_object_to_init_params(obj)
+
+        # hack: only early termination policy does not follow yaml schema now, should be removed after server-side made
+        # the change
+        if "early_termination" in obj and "policy_type" in obj["early_termination"]:
+            # can't use _from_rest_object here, because obj is a dict instead of an EarlyTerminationPolicy rest object
+            obj["early_termination"]["type"] = camel_to_snake(obj["early_termination"].pop("policy_type"))
+
+        # TODO: use cls._get_schema() to load from rest object
+        from azure.ai.ml._schema._sweep.parameterized_sweep import ParameterizedSweepSchema
+
+        schema = ParameterizedSweepSchema(context={BASE_PATH_CONTEXT_KEY: "./"})
+        support_data_binding_expression_for_fields(schema, ["type", "component", "trial"])
+
+        base_sweep = schema.load(obj, unknown=EXCLUDE, partial=True)
+        for key, value in base_sweep.items():
+            obj[key] = value
+
+        # trial
+        trial_component_id = pydash.get(obj, "trial.componentId", None)
+        obj["trial"] = trial_component_id  # check this
+
+        return obj
+
+    def _get_trial_component_rest_obj(self) -> Union[Dict, ComponentVersion, None]:
+        # trial component to rest object is different from usual component
+        trial_component_id = self._get_component_id()
+        if trial_component_id is None:
+            return None
+        if isinstance(trial_component_id, str):
+            return {"componentId": trial_component_id}
+        if isinstance(trial_component_id, CommandComponent):
+            return trial_component_id._to_rest_object()
+        raise UserErrorException(f"invalid trial in sweep node {self.name}: {str(self.trial)}")
+
+    def _to_job(self) -> SweepJob:
+        command = self.trial.command
+        if self.search_space is not None:
+            for key, _ in self.search_space.items():
+                if command is not None:
+                    # Double curly brackets to escape
+                    command = command.replace(f"${{{{inputs.{key}}}}}", f"${{{{search_space.{key}}}}}")
+
+        # TODO: raise exception when the trial is a pre-registered component
+        if command != self.trial.command and isinstance(self.trial, CommandComponent):
+            self.trial.command = command
+
+        return SweepJob(
+            name=self.name,
+            display_name=self.display_name,
+            description=self.description,
+            properties=self.properties,
+            tags=self.tags,
+            experiment_name=self.experiment_name,
+            trial=self.trial,
+            compute=self.compute,
+            sampling_algorithm=self.sampling_algorithm,
+            search_space=self.search_space,
+            limits=self.limits,
+            early_termination=self.early_termination,  # type: ignore[arg-type]
+            objective=self.objective,
+            inputs=self._job_inputs,
+            outputs=self._job_outputs,
+            identity=self.identity,
+            queue_settings=self.queue_settings,
+            resources=self.resources,
+        )
+
+    @classmethod
+    def _get_component_attr_name(cls) -> str:
+        return "trial"
+
+    def _build_inputs(self) -> Dict:
+        inputs = super(Sweep, self)._build_inputs()
+        built_inputs = {}
+        # Validate and remove non-specified inputs
+        for key, value in inputs.items():
+            if value is not None:
+                built_inputs[key] = value
+
+        return built_inputs
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        from azure.ai.ml._schema.pipeline.component_job import SweepSchema
+
+        return SweepSchema(context=context)
+
+    @classmethod
+    def _get_origin_inputs_and_search_space(cls, built_inputs: Optional[Dict[str, NodeInput]]) -> Tuple:
+        """Separate mixed true inputs & search space definition from inputs of
+        this node and return them.
+
+        Input will be restored to Input/LiteralInput before returned.
+
+        :param built_inputs: The built inputs
+        :type built_inputs: Optional[Dict[str, NodeInput]]
+        :return: A tuple of the inputs and search space
+        :rtype: Tuple[
+                Dict[str, Union[Input, str, bool, int, float]],
+                Dict[str, SweepDistribution],
+            ]
+        """
+        search_space: Dict = {}
+        inputs: Dict = {}
+        if built_inputs is not None:
+            for input_name, input_obj in built_inputs.items():
+                if isinstance(input_obj, NodeInput):
+                    if isinstance(input_obj._data, SweepDistribution):
+                        search_space[input_name] = input_obj._data
+                    else:
+                        inputs[input_name] = input_obj._data
+                else:
+                    msg = "unsupported built input type: {}: {}"
+                    raise ValidationException(
+                        message=msg.format(input_name, type(input_obj)),
+                        no_personal_data_message=msg.format("[input_name]", type(input_obj)),
+                        target=ErrorTarget.SWEEP_JOB,
+                        error_type=ValidationErrorType.INVALID_VALUE,
+                    )
+        return inputs, search_space
+
+    def _is_input_set(self, input_name: str) -> bool:
+        if super(Sweep, self)._is_input_set(input_name):
+            return True
+        return self.search_space is not None and input_name in self.search_space
+
+    def __setattr__(self, key: Any, value: Any) -> None:
+        super(Sweep, self).__setattr__(key, value)
+        if key == "early_termination" and isinstance(self.early_termination, BanditPolicy):
+            # only one of slack_amount and slack_factor can be specified but default value is 0.0.
+            # Need to keep track of which one is null.
+            if self.early_termination.slack_amount == 0.0:
+                self.early_termination.slack_amount = None  # type: ignore[assignment]
+            if self.early_termination.slack_factor == 0.0:
+                self.early_termination.slack_factor = None  # type: ignore[assignment]
+
+    @property
+    def early_termination(self) -> Optional[Union[str, EarlyTerminationPolicy]]:
+        """The early termination policy for the sweep job.
+
+        :rtype: Union[str, ~azure.ai.ml.sweep.BanditPolicy, ~azure.ai.ml.sweep.MedianStoppingPolicy,
+            ~azure.ai.ml.sweep.TruncationSelectionPolicy]
+        """
+        return self._early_termination
+
+    @early_termination.setter
+    def early_termination(self, value: Optional[Union[str, EarlyTerminationPolicy]]) -> None:
+        """Sets the early termination policy for the sweep job.
+
+        :param value: The early termination policy for the sweep job.
+        :type value: Union[~azure.ai.ml.sweep.BanditPolicy, ~azure.ai.ml.sweep.MedianStoppingPolicy,
+            ~azure.ai.ml.sweep.TruncationSelectionPolicy, dict[str, Union[str, float, int, bool]]]
+        """
+        if isinstance(value, dict):
+            early_termination_schema = EarlyTerminationField()
+            value = early_termination_schema._deserialize(value=value, attr=None, data=None)
+        self._early_termination = value  # type: ignore[assignment]