about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py5
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py541
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py42
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py297
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py300
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py641
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py171
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py325
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py553
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py96
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py305
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py529
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py211
13 files changed, 4016 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py
new file mode 100644
index 00000000..fdf8caba
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py
@@ -0,0 +1,5 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py
new file mode 100644
index 00000000..85f609ca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py
@@ -0,0 +1,541 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import json
+import os
+import shutil
+import tempfile
+import zipfile
+from collections import defaultdict
+from concurrent.futures import ThreadPoolExecutor
+from contextlib import contextmanager
+from multiprocessing import cpu_count
+from pathlib import Path
+from typing import Any, Dict, Generator, List, Optional, Tuple, Union
+
+from azure.ai.ml.constants._common import AzureDevopsArtifactsType
+from azure.ai.ml.entities._validation import MutableValidationResult, ValidationResultBuilder
+
+from ..._utils._artifact_utils import ArtifactCache
+from ..._utils._asset_utils import IgnoreFile, get_upload_files_from_folder
+from ..._utils.utils import is_concurrent_component_registration_enabled, is_private_preview_enabled
+from ...entities._util import _general_copy
+from .._assets import Code
+from .code import ComponentCodeMixin, ComponentIgnoreFile
+
+PLACEHOLDER_FILE_NAME = "_placeholder_spec.yaml"
+
+
+class AdditionalIncludes:
+    """Initialize the AdditionalIncludes object.
+
+    :param origin_code_value: The origin code value.
+    :type origin_code_value: Optional[str]
+    :param base_path: The base path for origin code path and additional include configs.
+    :type base_path: Path
+    :param configs: The additional include configs.
+    :type configs: List[Union[str, dict]]
+    """
+
+    def __init__(
+        self,
+        *,
+        origin_code_value: Optional[str],
+        base_path: Path,
+        configs: Optional[List[Union[str, dict]]] = None,
+    ) -> None:
+        self._base_path = base_path
+        self._origin_code_value = origin_code_value
+        self._origin_configs = configs
+
+    @property
+    def origin_configs(self) -> List:
+        """The origin additional include configs.
+        Artifact additional include configs haven't been resolved in this property.
+
+        :return: The origin additional include configs.
+        :rtype: List[Union[str, dict]]
+        """
+        return self._origin_configs or []
+
+    @property
+    def resolved_code_path(self) -> Union[None, Path]:
+        """The resolved origin code path based on base path, if code path is not specified, return None.
+        We shouldn't change this property name given it's referenced in mldesigner.
+
+        :return: The resolved origin code path.
+        :rtype: Union[None, Path]
+        """
+        if self._origin_code_value is None:
+            return None
+        if os.path.isabs(self._origin_code_value):
+            return Path(self._origin_code_value)
+        return (self.base_path / self._origin_code_value).resolve()
+
+    @property
+    def base_path(self) -> Path:
+        """Base path for origin code path and additional include configs.
+
+        :return: The base path.
+        :rtype: Path
+        """
+        return self._base_path
+
+    @property
+    def with_includes(self) -> bool:
+        """Whether the additional include configs have been provided.
+
+        :return: True if additional include configs have been provided, False otherwise.
+        :rtype: bool
+        """
+        return len(self.origin_configs) != 0
+
+    @classmethod
+    def _get_artifacts_by_config(cls, artifact_config: Dict[str, str]) -> Union[str, os.PathLike]:
+        # config key existence has been validated in _validate_additional_include_config
+        res: Union[str, os.PathLike] = ArtifactCache().get(
+            organization=artifact_config.get("organization", None),
+            project=artifact_config.get("project", None),
+            feed=artifact_config["feed"],
+            name=artifact_config["name"],
+            version=artifact_config["version"],
+            scope=artifact_config.get("scope", "organization"),
+            resolve=True,
+        )
+        return res
+
+    def _validate_additional_include_config(
+        self, additional_include_config: Union[Dict, str]
+    ) -> MutableValidationResult:
+        validation_result = ValidationResultBuilder.success()
+        if (
+            isinstance(additional_include_config, dict)
+            and additional_include_config.get("type") == AzureDevopsArtifactsType.ARTIFACT
+        ):
+            # for artifact additional include, we validate the required fields in config but won't validate the
+            # artifact content to avoid downloading it in validation stage
+            # note that runtime error will be thrown when loading the artifact
+            for item in ["feed", "name", "version"]:
+                if item not in additional_include_config:
+                    # TODO: add yaml path after we support list index in yaml path
+                    validation_result.append_error(
+                        "{} are required for artifacts config but got {}.".format(
+                            item, json.dumps(additional_include_config)
+                        )
+                    )
+        elif isinstance(additional_include_config, str):
+            validation_result.merge_with(self._validate_local_additional_include_config(additional_include_config))
+        else:
+            validation_result.append_error(
+                message=f"Unexpected format in additional_includes, {additional_include_config}"
+            )
+        return validation_result
+
+    @classmethod
+    def _resolve_artifact_additional_include_config(
+        cls, artifact_additional_include_config: Dict[str, str]
+    ) -> List[Tuple[str, str]]:
+        """Resolve an artifact additional include config into a list of (local_path, config_info) tuples.
+
+        Configured artifact will be downloaded to local path first; the config_info will be in below format:
+        %name%:%version% in %feed%
+
+        :param artifact_additional_include_config: Additional include config for an artifact
+        :type artifact_additional_include_config: Dict[str, str]
+        :return: A list of 2-tuples of local_path and config_info
+        :rtype: List[Tuple[str, str]]
+        """
+        result = []
+        # Note that we don't validate the artifact config here, since it has already been validated in
+        # _validate_additional_include_config
+        artifact_path = cls._get_artifacts_by_config(artifact_additional_include_config)
+        for item in os.listdir(artifact_path):
+            config_info = (
+                f"{artifact_additional_include_config['name']}:{artifact_additional_include_config['version']} in "
+                f"{artifact_additional_include_config['feed']}"
+            )
+            result.append((os.path.join(artifact_path, item), config_info))
+        return result
+
+    def _resolve_artifact_additional_include_configs(
+        self, artifact_additional_includes_configs: List[Dict[str, str]]
+    ) -> List:
+        additional_include_info_tuples = []
+        # Unlike component registration, artifact downloading is a pure download progress; so we can use
+        # more threads to speed up the downloading process.
+        # We use 5 threads per CPU core plus 5 extra threads, and the max number of threads is 64.
+        num_threads = min(64, (int(cpu_count()) * 5) + 5)
+        if (
+            len(artifact_additional_includes_configs) > 1
+            and is_concurrent_component_registration_enabled()
+            and is_private_preview_enabled()
+        ):
+            with ThreadPoolExecutor(max_workers=num_threads) as executor:
+                all_artifact_pairs_itr = executor.map(
+                    self._resolve_artifact_additional_include_config, artifact_additional_includes_configs
+                )
+
+            for artifact_pairs in all_artifact_pairs_itr:
+                additional_include_info_tuples.extend(artifact_pairs)
+        else:
+            all_artifact_pairs_list = list(
+                map(self._resolve_artifact_additional_include_config, artifact_additional_includes_configs)
+            )
+
+            for artifact_pairs in all_artifact_pairs_list:
+                additional_include_info_tuples.extend(artifact_pairs)
+
+        return additional_include_info_tuples
+
+    @staticmethod
+    def _copy(src: Path, dst: Path, *, ignore_file: Optional[Any] = None) -> None:
+        if ignore_file and ignore_file.is_file_excluded(src):
+            return
+        if not src.exists():
+            raise ValueError(f"Path {src} does not exist.")
+        if src.is_file():
+            _general_copy(src, dst)
+        if src.is_dir():
+            # TODO: should we cover empty folder?
+            # use os.walk to replace shutil.copytree, which may raise FileExistsError
+            # for same folder, the expected behavior is merging
+            # ignore will be also applied during this process
+            for name in src.glob("*"):
+                if ignore_file is not None:
+                    AdditionalIncludes._copy(name, dst / name.name, ignore_file=ignore_file.merge(name))
+
+    @staticmethod
+    def _is_folder_to_compress(path: Path) -> bool:
+        """Check if the additional include needs to compress corresponding folder as a zip.
+
+        For example, given additional include /mnt/c/hello.zip
+          1) if a file named /mnt/c/hello.zip already exists, return False (simply copy)
+          2) if a folder named /mnt/c/hello exists, return True (compress as a zip and copy)
+
+        :param path: Given path in additional include.
+        :type path: Path
+        :return: If the path need to be compressed as a zip file.
+        :rtype: bool
+        """
+        if path.suffix != ".zip":
+            return False
+        # if zip file exists, simply copy as other additional includes
+        if path.exists():
+            return False
+        # remove .zip suffix and check whether the folder exists
+        stem_path = path.parent / path.stem
+        return stem_path.is_dir()
+
+    def _resolve_folder_to_compress(self, include: str, dst_path: Path, ignore_file: IgnoreFile) -> None:
+        """resolve the zip additional include, need to compress corresponding folder.
+
+        :param include: The path, relative to :attr:`AdditionalIncludes.base_path`, to zip
+        :type include: str
+        :param dst_path: The path to write the zipfile to
+        :type dst_path: Path
+        :param ignore_file: The ignore file to use to filter files
+        :type ignore_file: IgnoreFile
+        """
+        zip_additional_include = (self.base_path / include).resolve()
+        folder_to_zip = zip_additional_include.parent / zip_additional_include.stem
+        zip_file = dst_path / zip_additional_include.name
+        with zipfile.ZipFile(zip_file, "w") as zf:
+            zf.write(folder_to_zip, os.path.relpath(folder_to_zip, folder_to_zip.parent))  # write root in zip
+            paths = [path for path, _ in get_upload_files_from_folder(folder_to_zip, ignore_file=ignore_file)]
+            # sort the paths to make sure the zip file (namelist) is deterministic
+            for path in sorted(paths):
+                zf.write(path, os.path.relpath(path, folder_to_zip.parent))
+
+    def _get_resolved_additional_include_configs(self) -> List[str]:
+        """
+        Resolve additional include configs to a list of local_paths and return it.
+
+        Addition includes is a list of include files, including local paths and Azure Devops Artifacts.
+        Yaml format of additional_includes looks like below:
+            additional_includes:
+             - your/local/path
+             - type: artifact
+               organization: devops_organization
+               project: devops_project
+               feed: artifacts_feed_name
+               name: universal_package_name
+               version: package_version
+               scope: scope_type
+        The artifacts package will be downloaded from devops to the local in this function and transferred to
+        the local paths of downloaded artifacts;
+        The local paths will be returned directly.
+        If there are conflicts among artifacts, runtime error will be raised. Note that we won't check the
+        conflicts between artifacts and local paths and conflicts among local paths. Reasons are:
+        1. There can be ignore_file in local paths, which makes it hard to check the conflict and may lead to breaking
+        changes;
+        2. Conflicts among artifacts are more likely to happen, since user may refer to 2 artifacts of the same name
+        but with different version & feed.
+        3. According to current design, folders in local paths will be merged; while artifact conflicts can be
+        identified by folder name conflicts and are not allowed.
+
+        :return additional_includes: Path list of additional_includes
+        :rtype additional_includes: List[str]
+        """
+        additional_include_configs_in_local_path = []
+
+        artifact_additional_include_configs = []
+        for additional_include_config in self.origin_configs:
+            if isinstance(additional_include_config, str):
+                # add local additional include configs directly
+                additional_include_configs_in_local_path.append(additional_include_config)
+            else:
+                # artifact additional include config will be downloaded and resolved to a local path later
+                # note that there is no more validation for artifact additional include config here, since it has
+                # already been validated in _validate_additional_include_config
+                artifact_additional_include_configs.append(additional_include_config)
+
+        artifact_additional_include_info_tuples = self._resolve_artifact_additional_include_configs(
+            artifact_additional_include_configs
+        )
+        additional_include_configs_in_local_path.extend(
+            local_path for local_path, _ in artifact_additional_include_info_tuples
+        )
+
+        # check file conflicts among artifact package
+        # given this is not in validate stage, we will raise error if there are conflict files
+        conflict_files: dict = defaultdict(set)
+        for local_path, config_info in artifact_additional_include_info_tuples:
+            file_name = Path(local_path).name
+            conflict_files[file_name].add(config_info)
+
+        conflict_files = {k: v for k, v in conflict_files.items() if len(v) > 1}
+        if conflict_files:
+            raise RuntimeError(f"There are conflict files in additional include: {conflict_files}")
+
+        return additional_include_configs_in_local_path
+
+    def _validate_local_additional_include_config(
+        self, local_path: str, config_info: Optional[str] = None
+    ) -> MutableValidationResult:
+        """Validate local additional include config.
+
+        Note that we will check the file conflicts between each local additional includes and origin code, but
+        won't check the file conflicts among local additional includes fo now.
+
+        :param local_path: The local path
+        :type local_path: str
+        :param config_info: The config info
+        :type config_info: Optional[str]
+        :return: The validation result.
+        :rtype: ~azure.ai.ml.entities._validation.MutableValidationResult
+        """
+        validation_result = ValidationResultBuilder.success()
+        include_path = self.base_path / local_path
+        # if additional include has not supported characters, resolve will fail and raise OSError
+        try:
+            src_path = include_path.resolve()
+        except OSError:
+            # no need to include potential yaml file name in error message as it will be covered by
+            # validation message construction.
+            error_msg = (
+                f"Failed to resolve additional include " f"{config_info or local_path} " f"based on {self.base_path}."
+            )
+            validation_result.append_error(message=error_msg)
+            return validation_result
+
+        if not src_path.exists() and not self._is_folder_to_compress(src_path):
+            error_msg = f"Unable to find additional include {config_info or local_path}"
+            validation_result.append_error(message=error_msg)
+            return validation_result
+
+        if len(src_path.parents) == 0:
+            error_msg = "Root directory is not supported for additional includes."
+            validation_result.append_error(message=error_msg)
+            return validation_result
+
+        dst_path = Path(self.resolved_code_path) / src_path.name if self.resolved_code_path else None
+        if dst_path:
+            if dst_path.is_symlink():
+                # if destination path is symbolic link, check if it points to the same file/folder as source path
+                if dst_path.resolve() != src_path.resolve():
+                    error_msg = f"A symbolic link already exists for additional include {config_info or local_path}."
+                    validation_result.append_error(message=error_msg)
+                    return validation_result
+            elif dst_path.exists():
+                error_msg = f"A file already exists for additional include {config_info or local_path}."
+                validation_result.append_error(message=error_msg)
+        return validation_result
+
+    def validate(self) -> MutableValidationResult:
+        """Validate the AdditionalIncludes object.
+
+        :return: The validation result.
+        :rtype: ~azure.ai.ml.entities._validation.MutableValidationResult
+        """
+        validation_result = ValidationResultBuilder.success()
+        for additional_include_config in self.origin_configs:
+            validation_result.merge_with(self._validate_additional_include_config(additional_include_config))
+        return validation_result
+
+    def _copy_origin_code(self, target_path: Path) -> ComponentIgnoreFile:
+        """Copy origin code to target path.
+
+        :param target_path: The destination to copy to
+        :type target_path: Path
+        :return: The component ignore file for the origin path
+        :rtype: ComponentIgnoreFile
+        """
+        # code can be either file or folder, as additional includes exists, need to copy to temporary folder
+        if self.resolved_code_path is None:
+            # if additional include configs exist but no origin code path, return a dummy ignore file
+            return ComponentIgnoreFile(
+                self.base_path,
+            )
+
+        if Path(self.resolved_code_path).is_file():
+            # use a dummy ignore file to save base path
+            root_ignore_file = ComponentIgnoreFile(
+                Path(self.resolved_code_path).parent,
+                skip_ignore_file=True,
+            )
+            self._copy(
+                Path(self.resolved_code_path),
+                target_path / Path(self.resolved_code_path).name,
+                ignore_file=root_ignore_file,
+            )
+        else:
+            # current implementation of ignore file is based on absolute path, so it cannot be shared
+            root_ignore_file = ComponentIgnoreFile(self.resolved_code_path)
+            self._copy(self.resolved_code_path, target_path, ignore_file=root_ignore_file)
+        return root_ignore_file
+
+    @contextmanager
+    def merge_local_code_and_additional_includes(self) -> Generator:
+        """Merge code and potential additional includes into a temporary folder and return the absolute path of it.
+
+        If no additional includes are specified, just return the absolute path of the original code path.
+        If no original code path is specified, return None.
+
+        :return: The absolute path of the merged code and additional includes.
+        :rtype: Path
+        """
+        if not self.with_includes:
+            if self.resolved_code_path is None:
+                yield None
+            else:
+                yield self.resolved_code_path.absolute()
+            return
+
+        # for now, upload path of a code asset will include the folder name of the code path (name of folder or
+        # parent name of file). For example, if code path is /mnt/c/code-a, upload path will be xxx/code-a
+        # which means that the upload path will change every time as we will merge additional includes into a temp
+        # folder. To avoid this, we will copy the code path to a child folder with a fixed name under the temp folder,
+        # then the child folder will be used in upload path.
+        # This issue shouldn't impact users as there is a separate asset existence check before uploading.
+        # We still make this change as:
+        # 1. We will always need to record for twice as upload path will be changed for first time uploading
+        # 2. This will improve the stability of the code asset existence check - AssetNotChanged check in
+        #    BlobStorageClient will be a backup check
+        tmp_folder_path = Path(tempfile.mkdtemp(), "code_with_additional_includes")
+        tmp_folder_path.mkdir(parents=True, exist_ok=True)
+
+        root_ignore_file = self._copy_origin_code(tmp_folder_path)
+
+        # resolve additional includes
+        base_path = self.base_path
+        # additional includes from artifact will be downloaded to a temp local path on calling
+        # self.includes, so no need to add specific logic for artifact
+
+        # TODO: skip ignored files defined in code when copying additional includes
+        # copy additional includes disregarding ignore files as current ignore file implementation
+        # is based on absolute path, which is not suitable for additional includes
+        for additional_include_local_path in self._get_resolved_additional_include_configs():
+            src_path = Path(additional_include_local_path)
+            if not src_path.is_absolute():
+                src_path = (base_path / additional_include_local_path).resolve()
+            dst_path = (tmp_folder_path / src_path.name).resolve()
+
+            root_ignore_file.rebase(src_path.parent)
+            if self._is_folder_to_compress(src_path):
+                self._resolve_folder_to_compress(
+                    additional_include_local_path,
+                    Path(tmp_folder_path),
+                    # actual src path is without .zip suffix
+                    ignore_file=root_ignore_file.merge(src_path.parent / src_path.stem),
+                )
+                # early continue as the folder is compressed as a zip file
+                continue
+
+            # no need to check if src_path exists as it is already validated
+            if src_path.is_file():
+                self._copy(src_path, dst_path, ignore_file=root_ignore_file)
+            elif src_path.is_dir():
+                self._copy(
+                    src_path,
+                    dst_path,
+                    # root ignore file on parent + ignore file on src_path
+                    ignore_file=root_ignore_file.merge(src_path),
+                )
+            else:
+                raise ValueError(f"Unable to find additional include {additional_include_local_path}.")
+        try:
+            yield tmp_folder_path.absolute()
+
+        finally:
+            # clean up tmp folder as it can be very disk space consuming
+            shutil.rmtree(tmp_folder_path, ignore_errors=True)
+
+
+class AdditionalIncludesMixin(ComponentCodeMixin):
+    @classmethod
+    def _get_additional_includes_field_name(cls) -> str:
+        """Get the field name for additional includes.
+
+        :return: The field name
+        :rtype: str
+        """
+        return "additional_includes"
+
+    def _get_all_additional_includes_configs(self) -> List:
+        return getattr(self, self._get_additional_includes_field_name(), [])
+
+    def _append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+        self, base_validation_result: Optional[MutableValidationResult] = None
+    ) -> bool:
+        is_reliable: bool = super()._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+            base_validation_result
+        )
+        additional_includes_obj = self._generate_additional_includes_obj()
+
+        if base_validation_result is not None:
+            base_validation_result.merge_with(
+                additional_includes_obj.validate(), field_name=self._get_additional_includes_field_name()
+            )
+        # if additional includes is specified, origin code will be merged with additional includes into a temp folder
+        # before registered as a code asset, so origin code value is not reliable for local path validation
+        if additional_includes_obj.with_includes:
+            return False
+        return is_reliable
+
+    def _generate_additional_includes_obj(self) -> AdditionalIncludes:
+        return AdditionalIncludes(
+            base_path=self._get_base_path_for_code(),
+            configs=self._get_all_additional_includes_configs(),
+            origin_code_value=self._get_origin_code_in_str(),
+        )
+
+    @contextmanager
+    def _try_build_local_code(self) -> Generator:
+        """Build final code when origin code is a local code.
+
+        Will merge code path with additional includes into a temp folder if additional includes is specified.
+
+        :return: The built Code object
+        :rtype: Iterable[Optional[Code]]
+        """
+        # will try to merge code and additional includes even if code is None
+        tmp_code_dir: Any
+        with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir:
+            if tmp_code_dir is None:
+                yield None
+            else:
+                yield Code(
+                    base_path=self._get_base_path_for_code(),
+                    path=tmp_code_dir,
+                    ignore_file=ComponentIgnoreFile(tmp_code_dir),
+                )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py
new file mode 100644
index 00000000..3e7be727
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py
@@ -0,0 +1,42 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Optional
+
+from azure.ai.ml._schema import PathAwareSchema
+from azure.ai.ml._schema.component.automl_component import AutoMLComponentSchema
+from azure.ai.ml.constants._common import COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.entities._component.component import Component
+
+
+class AutoMLComponent(Component):
+    """AutoML component entity, used to define an automl component.
+
+    AutoML Component will only be used "internally" for the mentioned scenarios that need it. AutoML Component schema is
+    not intended to be used by the end users and therefore it won't be provided to the end users and it won't have
+    public documentation for the users.
+
+    :param task: Task of the component.
+    :type task: str
+    """
+
+    def __init__(
+        self,
+        *,
+        task: Optional[str] = None,
+        **kwargs: Any,
+    ) -> None:
+        kwargs[COMPONENT_TYPE] = NodeType.AUTOML
+        super(AutoMLComponent, self).__init__(**kwargs)
+        self._task = task
+
+    @property
+    def task(self) -> Optional[str]:
+        """Returns task of the component."""
+        return self._task
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
+        return AutoMLComponentSchema(context=context)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py
new file mode 100644
index 00000000..1f838bec
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py
@@ -0,0 +1,297 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os
+from contextlib import contextmanager
+from enum import Enum
+from pathlib import Path
+from typing import Any, Generator, List, Optional, Union
+
+from azure.ai.ml._utils._arm_id_utils import is_ARM_id_for_resource, is_registry_id_for_resource
+from azure.ai.ml._utils._asset_utils import IgnoreFile, get_ignore_file
+from azure.ai.ml._utils.utils import is_private_preview_enabled
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AzureMLResourceType
+from azure.ai.ml.entities._assets import Code
+from azure.ai.ml.entities._validation import MutableValidationResult
+
+
+class ComponentIgnoreFile(IgnoreFile):
+    _COMPONENT_CODE_IGNORES = ["__pycache__"]
+    """Component-specific ignore file used for ignoring files in a component directory.
+
+    :param directory_path: The directory path for the ignore file.
+    :type directory_path: Union[str, Path]
+    :param additional_includes_file_name: Name of the additional includes file in the root directory to be ignored.
+    :type additional_includes_file_name: str
+    :param skip_ignore_file: Whether to skip the ignore file, defaults to False.
+    :type skip_ignore_file: bool
+    :param extra_ignore_list: List of additional ignore files to be considered during file exclusion.
+    :type extra_ignore_list: List[~azure.ai.ml._utils._asset_utils.IgnoreFile]
+    :raises ValueError: If additional include file is not found.
+    :return: The ComponentIgnoreFile object.
+    :rtype: ComponentIgnoreFile
+    """
+
+    def __init__(
+        self,
+        directory_path: Union[str, Path],
+        *,
+        additional_includes_file_name: Optional[str] = None,
+        skip_ignore_file: bool = False,
+        extra_ignore_list: Optional[List[IgnoreFile]] = None,
+    ):
+        self._base_path: Union[str, Path] = Path(directory_path)
+        self._extra_ignore_list: List[IgnoreFile] = extra_ignore_list or []
+        # only the additional include file in root directory is ignored
+        # additional include files in subdirectories are not processed so keep them
+        self._additional_includes_file_name = additional_includes_file_name
+        # note: the parameter changes to directory path in this class, rather than file path
+        file_path = None if skip_ignore_file else get_ignore_file(directory_path).path
+        super(ComponentIgnoreFile, self).__init__(file_path=file_path)
+
+    def exists(self) -> bool:
+        """Check if the ignore file exists.
+
+        :return: True
+        :rtype: bool
+        """
+        return True
+
+    @property
+    def base_path(self) -> Union[str, Path]:
+        """Get the base path of the ignore file.
+
+        :return: The base path.
+        :rtype: Path
+        """
+        # for component ignore file, the base path can be different from file.parent
+        return self._base_path
+
+    def rebase(self, directory_path: Union[str, Path]) -> "ComponentIgnoreFile":
+        """Rebase the ignore file to a new directory.
+
+        :param directory_path: The new directory path.
+        :type directory_path: Union[str, Path]
+        :return: The rebased ComponentIgnoreFile object.
+        :rtype: ComponentIgnoreFile
+        """
+        self._base_path = directory_path
+        return self
+
+    def is_file_excluded(self, file_path: Union[str, Path]) -> bool:
+        """Check if a file should be excluded based on the ignore file rules.
+
+        :param file_path: The file path.
+        :type file_path: Union[str, Path]
+        :return: True if the file should be excluded, False otherwise.
+        :rtype: bool
+        """
+        if self._additional_includes_file_name and self._get_rel_path(file_path) == self._additional_includes_file_name:
+            return True
+        for ignore_file in self._extra_ignore_list:
+            if ignore_file.is_file_excluded(file_path):
+                return True
+        res: bool = super(ComponentIgnoreFile, self).is_file_excluded(file_path)
+        return res
+
+    def merge(self, other_path: Path) -> "ComponentIgnoreFile":
+        """Merge the ignore list from another ComponentIgnoreFile object.
+
+        :param other_path: The path of the other ignore file.
+        :type other_path: Path
+        :return: The merged ComponentIgnoreFile object.
+        :rtype: ComponentIgnoreFile
+        """
+        if other_path.is_file():
+            return self
+        return ComponentIgnoreFile(other_path, extra_ignore_list=self._extra_ignore_list + [self])
+
+    def _get_ignore_list(self) -> List[str]:
+        """Retrieves the list of ignores from ignore file
+
+        Override to add custom ignores.
+
+        :return: The ignore rules
+        :rtype: List[str]
+        """
+        if not super(ComponentIgnoreFile, self).exists():
+            return self._COMPONENT_CODE_IGNORES
+        res: list = super(ComponentIgnoreFile, self)._get_ignore_list() + self._COMPONENT_CODE_IGNORES
+        return res
+
+
+class CodeType(Enum):
+    """Code type."""
+
+    LOCAL = "local"
+    NONE = "none"
+    GIT = "git"
+    ARM_ID = "arm_id"
+    UNKNOWN = "unknown"
+
+
+def _get_code_type(origin_code_value: Optional[str]) -> CodeType:
+    if origin_code_value is None:
+        return CodeType.NONE
+    if not isinstance(origin_code_value, str):
+        # note that:
+        # 1. Code & CodeOperation are not public for now
+        # 2. AnonymousCodeSchema is not within CodeField
+        # 3. Code will be returned as an arm id as an attribute of a component when getting a component from remote
+        # So origin_code_value should never be a Code object, or an exception will be raised
+        # in validation stage.
+        return CodeType.UNKNOWN
+    if is_ARM_id_for_resource(origin_code_value, AzureMLResourceType.CODE) or is_registry_id_for_resource(
+        origin_code_value
+    ):
+        return CodeType.ARM_ID
+    if origin_code_value.startswith("git+"):
+        return CodeType.GIT
+    return CodeType.LOCAL
+
+
+class ComponentCodeMixin:
+    """Mixin class for components with local files as part of the component. Those local files will be uploaded to
+    blob storage and further referenced as a code asset in arm id. In below docstring, we will refer to those local
+    files as "code".
+
+    The major interface of this mixin is self._customized_code_validate and self._build_code.
+    self._customized_code_validate will return a validation result indicating whether the code is valid.
+    self._build_code will return a temp Code object for server-side code asset creation.
+    """
+
+    def _get_base_path_for_code(self) -> Path:
+        """Get base path for additional includes.
+
+        :return: The base path
+        :rtype: Path
+        """
+        if hasattr(self, BASE_PATH_CONTEXT_KEY):
+            return Path(getattr(self, BASE_PATH_CONTEXT_KEY))
+        raise NotImplementedError(
+            "Component must have a base_path attribute to use ComponentCodeMixin. "
+            "Please set base_path in __init__ or override _get_base_path_for_code."
+        )
+
+    @classmethod
+    def _get_code_field_name(cls) -> str:
+        """Get the field name for code.
+
+        Will be used to get origin code value by default and will be used as field name of validation diagnostics.
+
+        :return: Code field name
+        :rtype: str
+        """
+        return "code"
+
+    def _get_origin_code_value(self) -> Union[str, os.PathLike, None]:
+        """Get origin code value.
+        Origin code value is either an absolute path or a relative path to base path if it's a local path.
+        Additional includes are only supported for component types with code attribute. Origin code path will be copied
+        to a temp folder along with additional includes to form a new code content.
+        """
+        return getattr(self, self._get_code_field_name(), None)
+
+    def _fill_back_code_value(self, value: str) -> None:
+        """Fill resolved code value back to the component.
+
+        :param value: resolved code value
+        :type value: str
+        :return: no return
+        :rtype: None
+        """
+        return setattr(self, self._get_code_field_name(), value)
+
+    def _get_origin_code_in_str(self) -> Optional[str]:
+        """Get origin code value in str to simplify following logic."""
+        origin_code_value = self._get_origin_code_value()
+        if origin_code_value is None:
+            return None
+        if isinstance(origin_code_value, Path):
+            return origin_code_value.as_posix()
+        return str(origin_code_value)
+
+    def _append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+        self, base_validation_result: Optional[MutableValidationResult] = None
+    ) -> bool:
+        """Append diagnostics from customized validation logic to the base validation result and check if origin code
+        value is valid for path validation.
+
+        For customized validation logic, this method shouldn't cover the validation logic duplicated with schema
+        validation, like local code existence check.
+        For the check, as "code" includes file dependencies of a component, other fields may depend on those files.
+        However, the origin code value may not be reliable for validation of those fields. For example:
+        1. origin code value can be a remote git path or an arm id of a code asset.
+        2. some file operations may be done during build_code, which makes final code content different from what we can
+        get from origin code value.
+        So, we use this function to check if origin code value is reliable for further local path validation.
+
+        :param base_validation_result: base validation result to append diagnostics to.
+        :type base_validation_result: MutableValidationResult
+        :return: whether origin code value is reliable for further local path validation.
+        :rtype: bool
+        """
+        # If private features are enable and component has code value of type str we need to check
+        # that it is a valid git path case. Otherwise, we should throw a ValidationError
+        # saying that the code value is not valid
+        code_type = _get_code_type(self._get_origin_code_in_str())
+        if code_type == CodeType.GIT and not is_private_preview_enabled():
+            if base_validation_result is not None:
+                base_validation_result.append_error(
+                    message="Not a valid code value: git paths are not supported.",
+                    yaml_path=self._get_code_field_name(),
+                )
+        return code_type == CodeType.LOCAL
+
+    @contextmanager
+    def _build_code(self) -> Generator:
+        """Create a Code object if necessary based on origin code value and yield it.
+
+        :return: If built code is the same as its origin value, do nothing and yield None.
+           Otherwise, yield a Code object pointing to the code.
+        :rtype: Iterable[Optional[Code]]
+        """
+        origin_code_value = self._get_origin_code_in_str()
+        code_type = _get_code_type(origin_code_value)
+
+        if code_type == CodeType.GIT:
+            # git also need to be resolved into arm id
+            yield Code(path=origin_code_value)
+        elif code_type in [CodeType.LOCAL, CodeType.NONE]:
+            code: Any
+            # false-positive by pylint, hence disable it
+            # (https://github.com/pylint-dev/pylint/blob/main/doc/data/messages
+            # /c/contextmanager-generator-missing-cleanup/details.rst)
+            with self._try_build_local_code() as code:  # pylint:disable=contextmanager-generator-missing-cleanup
+                yield code
+        else:
+            # arm id, None and unknown need no extra resolution
+            yield None
+
+    @contextmanager
+    def _try_build_local_code(self) -> Generator:
+        """Extract the logic of _build_code for local code for further override.
+
+        :return: The Code object if could be constructed, None otherwise
+        :rtype: Iterable[Optional[Code]]
+        """
+        origin_code_value = self._get_origin_code_in_str()
+        if origin_code_value is None:
+            yield None
+        else:
+            base_path = self._get_base_path_for_code()
+            absolute_path: Union[str, Path] = (
+                origin_code_value if os.path.isabs(origin_code_value) else base_path / origin_code_value
+            )
+
+            yield Code(
+                base_path=base_path,
+                path=origin_code_value,
+                ignore_file=ComponentIgnoreFile(absolute_path),
+            )
+
+    def _with_local_code(self) -> bool:
+        # TODO: remove this method after we have a better way to do this judge in cache_utils
+        origin_code_value = self._get_origin_code_in_str()
+        code_type = _get_code_type(origin_code_value)
+        return code_type == CodeType.LOCAL
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py
new file mode 100644
index 00000000..9bdcd3d1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py
@@ -0,0 +1,300 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os
+from typing import Any, Dict, List, Optional, Union, cast
+
+from marshmallow import Schema
+
+from azure.ai.ml._schema.component.command_component import CommandComponentSchema
+from azure.ai.ml.constants._common import COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.entities._assets import Environment
+from azure.ai.ml.entities._job.distribution import (
+    DistributionConfiguration,
+    MpiDistribution,
+    PyTorchDistribution,
+    RayDistribution,
+    TensorFlowDistribution,
+)
+from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration
+from azure.ai.ml.entities._job.parameterized_command import ParameterizedCommand
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+from ..._restclient.v2022_10_01.models import ComponentVersion
+from ..._schema import PathAwareSchema
+from ..._utils.utils import get_all_data_binding_expressions, parse_args_description_from_docstring
+from .._util import convert_ordered_dict_to_dict, validate_attribute_type
+from .._validation import MutableValidationResult
+from ._additional_includes import AdditionalIncludesMixin
+from .component import Component
+
+# pylint: disable=protected-access
+
+
+class CommandComponent(Component, ParameterizedCommand, AdditionalIncludesMixin):
+    """Command component version, used to define a Command Component or Job.
+
+    :keyword name: The name of the Command job or component.
+    :paramtype name: Optional[str]
+    :keyword version: The version of the Command job or component.
+    :paramtype version: Optional[str]
+    :keyword description: The description of the component. Defaults to None.
+    :paramtype description: Optional[str]
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None.
+    :paramtype tags: Optional[dict]
+    :keyword display_name: The display name of the component.
+    :paramtype display_name: Optional[str]
+    :keyword command: The command to be executed.
+    :paramtype command: Optional[str]
+    :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing
+        to a remote location.
+    :type code: Optional[str]
+    :keyword environment: The environment that the job will run in.
+    :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+    :keyword distribution: The configuration for distributed jobs. Defaults to None.
+    :paramtype distribution: Optional[Union[~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
+        ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]]
+    :keyword resources: The compute resource configuration for the command.
+    :paramtype resources: Optional[~azure.ai.ml.entities.JobResourceConfiguration]
+    :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None.
+    :paramtype inputs: Optional[dict[str, Union[
+        ~azure.ai.ml.Input,
+        str,
+        bool,
+        int,
+        float,
+        Enum,
+        ]]]
+    :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None.
+    :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
+    :keyword instance_count: The number of instances or nodes to be used by the compute target. Defaults to 1.
+    :paramtype instance_count: Optional[int]
+    :keyword is_deterministic: Specifies whether the Command will return the same output given the same input.
+        Defaults to True. When True, if a Command (component) is deterministic and has been run before in the
+        current workspace with the same input and settings, it will reuse results from a previous submitted job
+        when used as a node or step in a pipeline. In that scenario, no compute resources will be used.
+    :paramtype is_deterministic: Optional[bool]
+    :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None.
+    :paramtype additional_includes: Optional[List[str]]
+    :keyword properties: The job property dictionary. Defaults to None.
+    :paramtype properties: Optional[dict[str, str]]
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if CommandComponent cannot be successfully validated.
+        Details will be provided in the error message.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_command_configurations.py
+            :start-after: [START command_component_definition]
+            :end-before: [END command_component_definition]
+            :language: python
+            :dedent: 8
+            :caption: Creating a CommandComponent.
+    """
+
+    def __init__(
+        self,
+        *,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        display_name: Optional[str] = None,
+        command: Optional[str] = None,
+        code: Optional[Union[str, os.PathLike]] = None,
+        environment: Optional[Union[str, Environment]] = None,
+        distribution: Optional[
+            Union[
+                Dict,
+                MpiDistribution,
+                TensorFlowDistribution,
+                PyTorchDistribution,
+                RayDistribution,
+                DistributionConfiguration,
+            ]
+        ] = None,
+        resources: Optional[JobResourceConfiguration] = None,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        instance_count: Optional[int] = None,  # promoted property from resources.instance_count
+        is_deterministic: bool = True,
+        additional_includes: Optional[List] = None,
+        properties: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+        kwargs[COMPONENT_TYPE] = NodeType.COMMAND
+
+        # Component backend doesn't support environment_variables yet,
+        # this is to support the case of CommandComponent being the trial of
+        # a SweepJob, where environment_variables is stored as part of trial
+        environment_variables = kwargs.pop("environment_variables", None)
+        super().__init__(
+            name=name,
+            version=version,
+            description=description,
+            tags=tags,
+            display_name=display_name,
+            inputs=inputs,
+            outputs=outputs,
+            is_deterministic=is_deterministic,
+            properties=properties,
+            **kwargs,
+        )
+
+        # No validation on value passed here because in pipeline job, required code&environment maybe absent
+        # and fill in later with job defaults.
+        self.command = command
+        self.code = code
+        self.environment_variables = environment_variables
+        self.environment = environment
+        self.resources = resources  # type: ignore[assignment]
+        self.distribution = distribution
+
+        # check mutual exclusivity of promoted properties
+        if self.resources is not None and instance_count is not None:
+            msg = "instance_count and resources are mutually exclusive"
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.COMPONENT,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        self.instance_count = instance_count
+        self.additional_includes = additional_includes or []
+
+    def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+        """Dump the component content into a sorted yaml string.
+
+        :return: The ordered dict
+        :rtype: Dict
+        """
+
+        obj: dict = super()._to_ordered_dict_for_yaml_dump()
+        # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+        if self.code and isinstance(self.code, str):
+            obj["code"] = self.code
+        return obj
+
+    @property
+    def instance_count(self) -> Optional[int]:
+        """The number of instances or nodes to be used by the compute target.
+
+        :return: The number of instances or nodes.
+        :rtype: int
+        """
+        return self.resources.instance_count if self.resources and not isinstance(self.resources, dict) else None
+
+    @instance_count.setter
+    def instance_count(self, value: int) -> None:
+        """Sets the number of instances or nodes to be used by the compute target.
+
+        :param value: The number of instances of nodes to be used by the compute target. Defaults to 1.
+        :type value: int
+        """
+        if not value:
+            return
+        if not self.resources:
+            self.resources = JobResourceConfiguration(instance_count=value)
+        else:
+            if not isinstance(self.resources, dict):
+                self.resources.instance_count = value
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "environment": (str, Environment),
+            "environment_variables": dict,
+            "resources": (dict, JobResourceConfiguration),
+            "code": (str, os.PathLike),
+        }
+
+    def _to_dict(self) -> Dict:
+        return cast(
+            dict, convert_ordered_dict_to_dict({**self._other_parameter, **super(CommandComponent, self)._to_dict()})
+        )
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+        # put it here as distribution is shared by some components, e.g. command
+        distribution = obj.properties.component_spec.pop("distribution", None)
+        init_kwargs: dict = super()._from_rest_object_to_init_params(obj)
+        if distribution:
+            init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution)
+        return init_kwargs
+
+    def _get_environment_id(self) -> Union[str, None]:
+        # Return environment id of environment
+        # handle case when environment is defined inline
+        if isinstance(self.environment, Environment):
+            _id: Optional[str] = self.environment.id
+            return _id
+        return self.environment
+
+    # region SchemaValidatableMixin
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return CommandComponentSchema(context=context)
+
+    def _customized_validate(self) -> MutableValidationResult:
+        validation_result = super(CommandComponent, self)._customized_validate()
+        self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
+        validation_result.merge_with(self._validate_command())
+        validation_result.merge_with(self._validate_early_available_output())
+        return validation_result
+
+    def _validate_command(self) -> MutableValidationResult:
+        validation_result = self._create_empty_validation_result()
+        # command
+        if self.command:
+            invalid_expressions = []
+            for data_binding_expression in get_all_data_binding_expressions(self.command, is_singular=False):
+                if not self._is_valid_data_binding_expression(data_binding_expression):
+                    invalid_expressions.append(data_binding_expression)
+
+            if invalid_expressions:
+                validation_result.append_error(
+                    yaml_path="command",
+                    message="Invalid data binding expression: {}".format(", ".join(invalid_expressions)),
+                )
+        return validation_result
+
+    def _validate_early_available_output(self) -> MutableValidationResult:
+        validation_result = self._create_empty_validation_result()
+        for name, output in self.outputs.items():
+            if output.early_available is True and output._is_primitive_type is not True:
+                msg = (
+                    f"Early available output {name!r} requires output is primitive type, "
+                    f"got {output._is_primitive_type!r}."
+                )
+                validation_result.append_error(message=msg, yaml_path=f"outputs.{name}")
+        return validation_result
+
+    def _is_valid_data_binding_expression(self, data_binding_expression: str) -> bool:
+        current_obj: Any = self
+        for item in data_binding_expression.split("."):
+            if hasattr(current_obj, item):
+                current_obj = getattr(current_obj, item)
+            else:
+                try:
+                    current_obj = current_obj[item]
+                except Exception:  # pylint: disable=W0718
+                    return False
+        return True
+
+    # endregion
+
+    @classmethod
+    def _parse_args_description_from_docstring(cls, docstring: str) -> Dict:
+        res: dict = parse_args_description_from_docstring(docstring)
+        return res
+
+    def __str__(self) -> str:
+        try:
+            toYaml: str = self._to_yaml()
+            return toYaml
+        except BaseException:  # pylint: disable=W0718
+            toStr: str = super(CommandComponent, self).__str__()
+            return toStr
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py
new file mode 100644
index 00000000..c02a3a33
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py
@@ -0,0 +1,641 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import re
+import uuid
+from os import PathLike
+from pathlib import Path
+from typing import IO, TYPE_CHECKING, Any, AnyStr, Callable, Dict, Iterable, Optional, Tuple, Union
+
+from marshmallow import INCLUDE
+
+from ..._restclient.v2024_01_01_preview.models import (
+    ComponentContainer,
+    ComponentContainerProperties,
+    ComponentVersion,
+    ComponentVersionProperties,
+)
+from ..._schema import PathAwareSchema
+from ..._schema.component import ComponentSchema
+from ..._utils.utils import dump_yaml_to_file, hash_dict
+from ...constants._common import (
+    ANONYMOUS_COMPONENT_NAME,
+    BASE_PATH_CONTEXT_KEY,
+    PARAMS_OVERRIDE_KEY,
+    REGISTRY_URI_FORMAT,
+    SOURCE_PATH_CONTEXT_KEY,
+    CommonYamlFields,
+    SchemaUrl,
+)
+from ...constants._component import ComponentSource, IOConstants, NodeType
+from ...entities._assets.asset import Asset
+from ...entities._inputs_outputs import Input, Output
+from ...entities._mixins import LocalizableMixin, TelemetryMixin, YamlTranslatableMixin
+from ...entities._system_data import SystemData
+from ...entities._util import find_type_in_override
+from ...entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin, RemoteValidatableMixin
+from ...exceptions import ErrorCategory, ErrorTarget, ValidationException
+from .._inputs_outputs import GroupInput
+
+if TYPE_CHECKING:
+    from ...entities.builders import BaseNode
+# pylint: disable=protected-access, redefined-builtin
+# disable redefined-builtin to use id/type as argument name
+
+
+COMPONENT_PLACEHOLDER = "COMPONENT_PLACEHOLDER"
+
+
+class Component(
+    Asset,
+    RemoteValidatableMixin,
+    TelemetryMixin,
+    YamlTranslatableMixin,
+    PathAwareSchemaValidatableMixin,
+    LocalizableMixin,
+):
+    """Base class for component version, used to define a component. Can't be instantiated directly.
+
+    :param name: Name of the resource.
+    :type name: str
+    :param version: Version of the resource.
+    :type version: str
+    :param id: Global ID of the resource, Azure Resource Manager ID.
+    :type id: str
+    :param type: Type of the command, supported is 'command'.
+    :type type: str
+    :param description: Description of the resource.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict
+    :param properties: Internal use only.
+    :type properties: dict
+    :param display_name: Display name of the component.
+    :type display_name: str
+    :param is_deterministic: Whether the component is deterministic. Defaults to True.
+    :type is_deterministic: bool
+    :param inputs: Inputs of the component.
+    :type inputs: dict
+    :param outputs: Outputs of the component.
+    :type outputs: dict
+    :param yaml_str: The YAML string of the component.
+    :type yaml_str: str
+    :param _schema: Schema of the component.
+    :type _schema: str
+    :param creation_context: Creation metadata of the component.
+    :type creation_context: ~azure.ai.ml.entities.SystemData
+    :param kwargs: Additional parameters for the component.
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    # pylint: disable=too-many-instance-attributes
+    def __init__(
+        self,
+        *,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        id: Optional[str] = None,
+        type: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        properties: Optional[Dict] = None,
+        display_name: Optional[str] = None,
+        is_deterministic: bool = True,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        yaml_str: Optional[str] = None,
+        _schema: Optional[str] = None,
+        creation_context: Optional[SystemData] = None,
+        **kwargs: Any,
+    ) -> None:
+        self.latest_version = None
+        self._intellectual_property = kwargs.pop("intellectual_property", None)
+        # Setting this before super init because when asset init version, _auto_increment_version's value may change
+        self._auto_increment_version = kwargs.pop("auto_increment", False)
+        # Get source from id first, then kwargs.
+        self._source = (
+            self._resolve_component_source_from_id(id) if id else kwargs.pop("_source", ComponentSource.CLASS)
+        )
+        # use ANONYMOUS_COMPONENT_NAME instead of guid
+        is_anonymous = kwargs.pop("is_anonymous", False)
+        if not name and version is None:
+            name = ANONYMOUS_COMPONENT_NAME
+            version = "1"
+            is_anonymous = True
+
+        super().__init__(
+            name=name,
+            version=version,
+            id=id,
+            description=description,
+            tags=tags,
+            properties=properties,
+            creation_context=creation_context,
+            is_anonymous=is_anonymous,
+            base_path=kwargs.pop(BASE_PATH_CONTEXT_KEY, None),
+            source_path=kwargs.pop(SOURCE_PATH_CONTEXT_KEY, None),
+        )
+        # store kwargs to self._other_parameter instead of pop to super class to allow component have extra
+        # fields not defined in current schema.
+
+        inputs = inputs if inputs else {}
+        outputs = outputs if outputs else {}
+
+        self.name = name
+        self._schema = _schema
+        self._type = type
+        self._display_name = display_name
+        self._is_deterministic = is_deterministic
+        self._inputs = self._build_io(inputs, is_input=True)
+        self._outputs = self._build_io(outputs, is_input=False)
+        # Store original yaml
+        self._yaml_str = yaml_str
+        self._other_parameter = kwargs
+
+    @property
+    def _func(self) -> Callable[..., "BaseNode"]:
+        from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function
+
+        # validate input/output names before creating component function
+        validation_result = self._validate_io_names(self.inputs)
+        validation_result.merge_with(self._validate_io_names(self.outputs))
+        self._try_raise(validation_result)
+
+        res: Callable = _generate_component_function(self)
+        return res
+
+    @property
+    def type(self) -> Optional[str]:
+        """Type of the component, default is 'command'.
+
+        :return: Type of the component.
+        :rtype: str
+        """
+        return self._type
+
+    @property
+    def display_name(self) -> Optional[str]:
+        """Display name of the component.
+
+        :return: Display name of the component.
+        :rtype: str
+        """
+        return self._display_name
+
+    @display_name.setter
+    def display_name(self, custom_display_name: str) -> None:
+        """Set display_name of the component.
+
+        :param custom_display_name: The new display name
+        :type custom_display_name: str
+        """
+        self._display_name = custom_display_name
+
+    @property
+    def is_deterministic(self) -> Optional[bool]:
+        """Whether the component is deterministic.
+
+        :return: Whether the component is deterministic
+        :rtype: bool
+        """
+        return self._is_deterministic
+
+    @property
+    def inputs(self) -> Dict:
+        """Inputs of the component.
+
+        :return: Inputs of the component.
+        :rtype: dict
+        """
+        res: dict = self._inputs
+        return res
+
+    @property
+    def outputs(self) -> Dict:
+        """Outputs of the component.
+
+        :return: Outputs of the component.
+        :rtype: dict
+        """
+        return self._outputs
+
+    @property
+    def version(self) -> Optional[str]:
+        """Version of the component.
+
+        :return: Version of the component.
+        :rtype: str
+        """
+        return self._version
+
+    @version.setter
+    def version(self, value: str) -> None:
+        """Set the version of the component.
+
+        :param value: The version of the component.
+        :type value: str
+        """
+        if value:
+            if not isinstance(value, str):
+                msg = f"Component version must be a string, not type {type(value)}."
+                raise ValidationException(
+                    message=msg,
+                    target=ErrorTarget.COMPONENT,
+                    no_personal_data_message=msg,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+        self._version = value
+        self._auto_increment_version = self.name and not self._version
+
+    def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
+        """Dump the component content into a file in yaml format.
+
+        :param dest: The destination to receive this component's content.
+            Must be either a path to a local file, or an already-open file stream.
+            If dest is a file path, a new file will be created,
+            and an exception is raised if the file exists.
+            If dest is an open file, the file will be written to directly,
+            and an exception will be raised if the file is not writable.
+        :type dest: Union[PathLike, str, IO[AnyStr]]
+        """
+        path = kwargs.pop("path", None)
+        yaml_serialized = self._to_dict()
+        dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)
+
+    @staticmethod
+    def _resolve_component_source_from_id(  # pylint: disable=docstring-type-do-not-use-class
+        id: Optional[Union["Component", str]],
+    ) -> Any:
+        """Resolve the component source from id.
+
+        :param id: The component ID
+        :type id: Optional[str]
+        :return: The component source
+        :rtype: Literal[
+            ComponentSource.CLASS,
+            ComponentSource.REMOTE_REGISTRY,
+            ComponentSource.REMOTE_WORKSPACE_COMPONENT
+
+        ]
+        """
+        if id is None:
+            return ComponentSource.CLASS
+        # Consider default is workspace source, as
+        # azureml: prefix will be removed for arm versioned id.
+        return (
+            ComponentSource.REMOTE_REGISTRY
+            if not isinstance(id, Component) and id.startswith(REGISTRY_URI_FORMAT)
+            else ComponentSource.REMOTE_WORKSPACE_COMPONENT
+        )
+
+    @classmethod
+    def _validate_io_names(cls, io_names: Iterable[str], raise_error: bool = False) -> MutableValidationResult:
+        """Validate input/output names, raise exception if invalid.
+
+        :param io_names: The names to validate
+        :type io_names: Iterable[str]
+        :param raise_error: Whether to raise if validation fails. Defaults to False
+        :type raise_error: bool
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        validation_result = cls._create_empty_validation_result()
+        lower2original_kwargs: dict = {}
+
+        for name in io_names:
+            if re.match(IOConstants.VALID_KEY_PATTERN, name) is None:
+                msg = "{!r} is not a valid parameter name, must be composed letters, numbers, and underscores."
+                validation_result.append_error(message=msg.format(name), yaml_path=f"inputs.{name}")
+            # validate name conflict
+            lower_key = name.lower()
+            if lower_key in lower2original_kwargs:
+                msg = "Invalid component input names {!r} and {!r}, which are equal ignore case."
+                validation_result.append_error(
+                    message=msg.format(name, lower2original_kwargs[lower_key]), yaml_path=f"inputs.{name}"
+                )
+            else:
+                lower2original_kwargs[lower_key] = name
+        return cls._try_raise(validation_result, raise_error=raise_error)
+
+    @classmethod
+    def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool) -> Dict:
+        component_io: dict = {}
+        for name, port in io_dict.items():
+            if is_input:
+                component_io[name] = port if isinstance(port, Input) else Input(**port)
+            else:
+                component_io[name] = port if isinstance(port, Output) else Output(**port)
+
+        if is_input:
+            # Restore flattened parameters to group
+            res: dict = GroupInput.restore_flattened_inputs(component_io)
+            return res
+        return component_io
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
+        return ComponentSchema(context=context)
+
+    @classmethod
+    def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
+        return ValidationException(
+            message=message,
+            no_personal_data_message=no_personal_data_message,
+            target=ErrorTarget.COMPONENT,
+        )
+
+    @classmethod
+    def _is_flow(cls, data: Any) -> bool:
+        _schema = data.get(CommonYamlFields.SCHEMA, None)
+
+        if _schema and _schema in [SchemaUrl.PROMPTFLOW_FLOW, SchemaUrl.PROMPTFLOW_RUN]:
+            return True
+        return False
+
+    @classmethod
+    def _load(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "Component":
+        data = data or {}
+        params_override = params_override or []
+        base_path = Path(yaml_path).parent if yaml_path else Path("./")
+
+        type_in_override = find_type_in_override(params_override)
+
+        # type_in_override > type_in_yaml > default (command)
+        if type_in_override is None:
+            type_in_override = data.get(CommonYamlFields.TYPE, None)
+        if type_in_override is None and cls._is_flow(data):
+            type_in_override = NodeType.FLOW_PARALLEL
+        if type_in_override is None:
+            type_in_override = NodeType.COMMAND
+        data[CommonYamlFields.TYPE] = type_in_override
+
+        from azure.ai.ml.entities._component.component_factory import component_factory
+
+        create_instance_func, _ = component_factory.get_create_funcs(
+            data,
+            for_load=True,
+        )
+        new_instance: Component = create_instance_func()
+        # specific keys must be popped before loading with schema using kwargs
+        init_kwargs = {
+            "yaml_str": kwargs.pop("yaml_str", None),
+            "_source": kwargs.pop("_source", ComponentSource.YAML_COMPONENT),
+        }
+        init_kwargs.update(
+            new_instance._load_with_schema(  # pylint: disable=protected-access
+                data,
+                context={
+                    BASE_PATH_CONTEXT_KEY: base_path,
+                    SOURCE_PATH_CONTEXT_KEY: yaml_path,
+                    PARAMS_OVERRIDE_KEY: params_override,
+                },
+                unknown=INCLUDE,
+                raise_original_exception=True,
+                **kwargs,
+            )
+        )
+        # Set base path separately to avoid doing this in post load, as return types of post load are not unified,
+        # could be object or dict.
+        # base_path in context can be changed in loading, so we use original base_path here.
+        init_kwargs[BASE_PATH_CONTEXT_KEY] = base_path.absolute()
+        if yaml_path:
+            init_kwargs[SOURCE_PATH_CONTEXT_KEY] = Path(yaml_path).absolute().as_posix()
+        # TODO: Bug Item number: 2883415
+        new_instance.__init__(  # type: ignore
+            **init_kwargs,
+        )
+        return new_instance
+
+    @classmethod
+    def _from_container_rest_object(cls, component_container_rest_object: ComponentContainer) -> "Component":
+        component_container_details: ComponentContainerProperties = component_container_rest_object.properties
+        component = Component(
+            id=component_container_rest_object.id,
+            name=component_container_rest_object.name,
+            description=component_container_details.description,
+            creation_context=SystemData._from_rest_object(component_container_rest_object.system_data),
+            tags=component_container_details.tags,
+            properties=component_container_details.properties,
+            type=NodeType._CONTAINER,
+            # Set this field to None as it hold a default True in init.
+            is_deterministic=None,  # type: ignore[arg-type]
+        )
+        component.latest_version = component_container_details.latest_version
+        return component
+
+    @classmethod
+    def _from_rest_object(cls, obj: ComponentVersion) -> "Component":
+        # TODO: Remove in PuP with native import job/component type support in MFE/Designer
+        # Convert command component back to import component private preview
+        component_spec = obj.properties.component_spec
+        if component_spec[CommonYamlFields.TYPE] == NodeType.COMMAND and component_spec["command"] == NodeType.IMPORT:
+            component_spec[CommonYamlFields.TYPE] = NodeType.IMPORT
+            component_spec["source"] = component_spec.pop("inputs")
+            component_spec["output"] = component_spec.pop("outputs")["output"]
+
+        # shouldn't block serialization when name is not valid
+        # maybe override serialization method for name field?
+        from azure.ai.ml.entities._component.component_factory import component_factory
+
+        create_instance_func, _ = component_factory.get_create_funcs(obj.properties.component_spec, for_load=True)
+
+        instance: Component = create_instance_func()
+        # TODO: Bug Item number: 2883415
+        instance.__init__(**instance._from_rest_object_to_init_params(obj))  # type: ignore
+        return instance
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+        # Object got from rest data contain _source, we delete it.
+        if "_source" in obj.properties.component_spec:
+            del obj.properties.component_spec["_source"]
+
+        rest_component_version = obj.properties
+        _type = rest_component_version.component_spec[CommonYamlFields.TYPE]
+
+        # inputs/outputs will be parsed by instance._build_io in instance's __init__
+        inputs = rest_component_version.component_spec.pop("inputs", {})
+        # parse String -> string, Integer -> integer, etc
+        for _input in inputs.values():
+            _input["type"] = Input._map_from_rest_type(_input["type"])
+        outputs = rest_component_version.component_spec.pop("outputs", {})
+
+        origin_name = rest_component_version.component_spec[CommonYamlFields.NAME]
+        rest_component_version.component_spec[CommonYamlFields.NAME] = ANONYMOUS_COMPONENT_NAME
+        init_kwargs = cls._load_with_schema(
+            rest_component_version.component_spec, context={BASE_PATH_CONTEXT_KEY: Path.cwd()}, unknown=INCLUDE
+        )
+        init_kwargs.update(
+            {
+                "id": obj.id,
+                "is_anonymous": rest_component_version.is_anonymous,
+                "creation_context": obj.system_data,
+                "inputs": inputs,
+                "outputs": outputs,
+                "name": origin_name,
+            }
+        )
+
+        # remove empty values, because some property only works for specific component, eg: distribution for command
+        # note that there is an issue that environment == {} will always be true, so use isinstance here
+        return {k: v for k, v in init_kwargs.items() if v is not None and not (isinstance(v, dict) and not v)}
+
+    def _get_anonymous_hash(self) -> str:
+        """Return the hash of anonymous component.
+
+        Anonymous Components (same code and interface) will have same hash.
+
+        :return: The component hash
+        :rtype: str
+        """
+        # omit version since anonymous component's version is random guid
+        # omit name since name doesn't impact component's uniqueness
+        return self._get_component_hash(keys_to_omit=["name", "id", "version"])
+
+    def _get_component_hash(self, keys_to_omit: Optional[Iterable[str]] = None) -> str:
+        """Return the hash of component.
+
+        :param keys_to_omit: An iterable of keys to omit when computing the component hash
+        :type keys_to_omit: Optional[Iterable[str]]
+        :return: The component hash
+        :rtype: str
+        """
+        component_interface_dict = self._to_dict()
+        res: str = hash_dict(component_interface_dict, keys_to_omit=keys_to_omit)
+        return res
+
+    @classmethod
+    def _get_resource_type(cls) -> str:
+        return "Microsoft.MachineLearningServices/workspaces/components/versions"
+
+    def _get_resource_name_version(self) -> Tuple:
+        version: Optional[str] = None
+        if not self.version and not self._auto_increment_version:
+            version = str(uuid.uuid4())
+        else:
+            version = self.version
+        return self.name or ANONYMOUS_COMPONENT_NAME, version
+
+    def _validate(self, raise_error: Optional[bool] = False) -> MutableValidationResult:
+        origin_name = self.name
+        # skip name validation for anonymous component as ANONYMOUS_COMPONENT_NAME will be used in component creation
+        if self._is_anonymous:
+            self.name = ANONYMOUS_COMPONENT_NAME
+        try:
+            return super()._validate(raise_error)
+        finally:
+            self.name = origin_name
+
+    def _customized_validate(self) -> MutableValidationResult:
+        validation_result = super(Component, self)._customized_validate()
+
+        # validate inputs names
+        validation_result.merge_with(self._validate_io_names(self.inputs, raise_error=False))
+        validation_result.merge_with(self._validate_io_names(self.outputs, raise_error=False))
+
+        return validation_result
+
+    def _get_anonymous_component_name_version(self) -> Tuple:
+        return ANONYMOUS_COMPONENT_NAME, self._get_anonymous_hash()
+
+    def _get_rest_name_version(self) -> Tuple:
+        if self._is_anonymous:
+            return self._get_anonymous_component_name_version()
+        return self.name, self.version
+
+    def _to_rest_object(self) -> ComponentVersion:
+        component = self._to_dict()
+
+        # TODO: Remove in PuP with native import job/component type support in MFE/Designer
+        # Convert import component to command component private preview
+        if component.get(CommonYamlFields.TYPE, None) == NodeType.IMPORT:
+            component[CommonYamlFields.TYPE] = NodeType.COMMAND
+            component["inputs"] = component.pop("source")
+            component["outputs"] = dict({"output": component.pop("output")})
+            # method _to_dict() will remove empty keys
+            if "tags" not in component:
+                component["tags"] = {}
+            component["tags"]["component_type_overwrite"] = NodeType.IMPORT
+            component["command"] = NodeType.IMPORT
+
+        # add source type to component rest object
+        component["_source"] = self._source
+        if self._intellectual_property:
+            # hack while full pass through supported is worked on for IPP fields
+            component.pop("intellectual_property")
+            component["intellectualProperty"] = self._intellectual_property._to_rest_object().serialize()
+        properties = ComponentVersionProperties(
+            component_spec=component,
+            description=self.description,
+            is_anonymous=self._is_anonymous,
+            properties=dict(self.properties) if self.properties else {},
+            tags=self.tags,
+        )
+        result = ComponentVersion(properties=properties)
+        if self._is_anonymous:
+            result.name = ANONYMOUS_COMPONENT_NAME
+        else:
+            result.name = self.name
+            result.properties.properties["client_component_hash"] = self._get_component_hash(keys_to_omit=["version"])
+        return result
+
+    def _to_dict(self) -> Dict:
+        # Replace the name of $schema to schema.
+        component_schema_dict: dict = self._dump_for_validation()
+        component_schema_dict.pop(BASE_PATH_CONTEXT_KEY, None)
+
+        # TODO: handle other_parameters and remove override from subclass
+        return component_schema_dict
+
+    def _localize(self, base_path: str) -> None:
+        """Called on an asset got from service to clean up remote attributes like id, creation_context, etc. and update
+        base_path.
+
+        :param base_path: The base_path
+        :type base_path: str
+        """
+        if not getattr(self, "id", None):
+            raise ValueError("Only remote asset can be localize but got a {} without id.".format(type(self)))
+        self._id = None
+        self._creation_context = None
+        self._base_path = base_path
+
+    def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict:
+        # Note: the is_anonymous is not reliable here, create_or_update will log is_anonymous from parameter.
+        is_anonymous = self.name is None or ANONYMOUS_COMPONENT_NAME in self.name
+        return {"type": self.type, "source": self._source, "is_anonymous": is_anonymous}
+
+    # pylint: disable-next=docstring-missing-param
+    def __call__(self, *args: Any, **kwargs: Any) -> "BaseNode":
+        """Call ComponentVersion as a function and get a Component object.
+
+        :return: The component object
+        :rtype: BaseNode
+        """
+        if args:
+            # raise clear error message for unsupported positional args
+            if self._func._has_parameters:  # type: ignore
+                _error = f"got {args} for {self.name}"
+                msg = (
+                    f"Component function doesn't support positional arguments, {_error}. "  # type: ignore
+                    f"Please use keyword arguments like: {self._func._func_calling_example}."
+                )
+            else:
+                msg = (
+                    "Component function doesn't has any parameters, "
+                    f"please make sure component {self.name} has inputs. "
+                )
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.COMPONENT,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        return self._func(*args, **kwargs)  # pylint: disable=not-callable
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py
new file mode 100644
index 00000000..012dd260
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py
@@ -0,0 +1,171 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+from typing import Any, Callable, Dict, Optional, Tuple
+
+from marshmallow import Schema
+
+from ..._restclient.v2022_10_01.models import ComponentVersion
+from ..._utils.utils import is_internal_component_data
+from ...constants._common import SOURCE_PATH_CONTEXT_KEY
+from ...constants._component import DataTransferTaskType, NodeType
+from ...entities._component.automl_component import AutoMLComponent
+from ...entities._component.command_component import CommandComponent
+from ...entities._component.component import Component
+from ...entities._component.datatransfer_component import (
+    DataTransferCopyComponent,
+    DataTransferExportComponent,
+    DataTransferImportComponent,
+)
+from ...entities._component.import_component import ImportComponent
+from ...entities._component.parallel_component import ParallelComponent
+from ...entities._component.pipeline_component import PipelineComponent
+from ...entities._component.spark_component import SparkComponent
+from ...entities._util import get_type_from_spec
+from .flow import FlowComponent
+
+
+class _ComponentFactory:
+    """A class to create component instances from yaml dict or rest objects without hard-coded type check."""
+
+    def __init__(self) -> None:
+        self._create_instance_funcs: Dict = {}
+        self._create_schema_funcs: Dict = {}
+
+        self.register_type(
+            _type=NodeType.PARALLEL,
+            create_instance_func=lambda: ParallelComponent.__new__(ParallelComponent),
+            create_schema_func=ParallelComponent._create_schema_for_validation,
+        )
+        self.register_type(
+            _type=NodeType.COMMAND,
+            create_instance_func=lambda: CommandComponent.__new__(CommandComponent),
+            create_schema_func=CommandComponent._create_schema_for_validation,
+        )
+        self.register_type(
+            _type=NodeType.IMPORT,
+            create_instance_func=lambda: ImportComponent.__new__(ImportComponent),
+            create_schema_func=ImportComponent._create_schema_for_validation,
+        )
+        self.register_type(
+            _type=NodeType.PIPELINE,
+            create_instance_func=lambda: PipelineComponent.__new__(PipelineComponent),
+            create_schema_func=PipelineComponent._create_schema_for_validation,
+        )
+        self.register_type(
+            _type=NodeType.AUTOML,
+            create_instance_func=lambda: AutoMLComponent.__new__(AutoMLComponent),
+            create_schema_func=AutoMLComponent._create_schema_for_validation,
+        )
+        self.register_type(
+            _type=NodeType.SPARK,
+            create_instance_func=lambda: SparkComponent.__new__(SparkComponent),
+            create_schema_func=SparkComponent._create_schema_for_validation,
+        )
+        self.register_type(
+            _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.COPY_DATA]),
+            create_instance_func=lambda: DataTransferCopyComponent.__new__(DataTransferCopyComponent),
+            create_schema_func=DataTransferCopyComponent._create_schema_for_validation,
+        )
+
+        self.register_type(
+            _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.IMPORT_DATA]),
+            create_instance_func=lambda: DataTransferImportComponent.__new__(DataTransferImportComponent),
+            create_schema_func=DataTransferImportComponent._create_schema_for_validation,
+        )
+
+        self.register_type(
+            _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.EXPORT_DATA]),
+            create_instance_func=lambda: DataTransferExportComponent.__new__(DataTransferExportComponent),
+            create_schema_func=DataTransferExportComponent._create_schema_for_validation,
+        )
+
+        self.register_type(
+            _type=NodeType.FLOW_PARALLEL,
+            create_instance_func=lambda: FlowComponent.__new__(FlowComponent),
+            create_schema_func=FlowComponent._create_schema_for_validation,
+        )
+
+    def get_create_funcs(
+        self, yaml_spec: dict, for_load: bool = False
+    ) -> Tuple[Callable[..., Component], Callable[[Any], Schema]]:
+        """Get registered functions to create an instance and its corresponding schema for the given type.
+
+        :param yaml_spec: The YAML specification.
+        :type yaml_spec: dict
+        :param for_load: Whether the function is called for loading a component. Defaults to False.
+        :type for_load: bool
+        :return: A tuple containing the create_instance_func and create_schema_func.
+        :rtype: tuple
+        """
+
+        _type = get_type_from_spec(yaml_spec, valid_keys=self._create_instance_funcs)
+        # SparkComponent and InternalSparkComponent share the same type name, but they are different types.
+        if for_load and is_internal_component_data(yaml_spec, raise_if_not_enabled=True) and _type == NodeType.SPARK:
+            from azure.ai.ml._internal._schema.node import NodeType as InternalNodeType
+
+            _type = InternalNodeType.SPARK
+
+        create_instance_func = self._create_instance_funcs[_type]
+        create_schema_func = self._create_schema_funcs[_type]
+        return create_instance_func, create_schema_func
+
+    def register_type(
+        self,
+        _type: str,
+        create_instance_func: Callable[..., Component],
+        create_schema_func: Callable[[Any], Schema],
+    ) -> None:
+        """Register a new component type.
+
+        :param _type: The type name of the component.
+        :type _type: str
+        :param create_instance_func: A function to create an instance of the component.
+        :type create_instance_func: Callable[..., ~azure.ai.ml.entities.Component]
+        :param create_schema_func: A function to create a schema for the component.
+        :type create_schema_func: Callable[[Any], Schema]
+        """
+        self._create_instance_funcs[_type] = create_instance_func
+        self._create_schema_funcs[_type] = create_schema_func
+
+    @classmethod
+    def load_from_dict(cls, *, data: Dict, context: Dict, _type: Optional[str] = None, **kwargs: Any) -> Component:
+        """Load a component from a YAML dict.
+
+        :keyword data: The YAML dict.
+        :paramtype data: dict
+        :keyword context: The context of the YAML dict.
+        :paramtype context: dict
+        :keyword _type: The type name of the component. When None, it will be inferred from the YAML dict.
+        :paramtype _type: str
+        :return: The loaded component.
+        :rtype: ~azure.ai.ml.entities.Component
+        """
+
+        return Component._load(
+            data=data,
+            yaml_path=context.get(SOURCE_PATH_CONTEXT_KEY, None),
+            params_override=[{"type": _type}] if _type is not None else [],
+            **kwargs,
+        )
+
+    @classmethod
+    def load_from_rest(cls, *, obj: ComponentVersion, _type: Optional[str] = None) -> Component:
+        """Load a component from a REST object.
+
+        :keyword obj: The REST object.
+        :paramtype obj: ComponentVersion
+        :keyword _type: The type name of the component. When None, it will be inferred from the REST object.
+        :paramtype _type: str
+        :return: The loaded component.
+        :rtype: ~azure.ai.ml.entities.Component
+        """
+        if _type is not None:
+            obj.properties.component_spec["type"] = _type
+        return Component._from_rest_object(obj)
+
+
+component_factory = _ComponentFactory()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py
new file mode 100644
index 00000000..e71712ab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py
@@ -0,0 +1,325 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from pathlib import Path
+from typing import Any, Dict, NoReturn, Optional, Union, cast
+
+from marshmallow import Schema
+
+from azure.ai.ml._schema.component.data_transfer_component import (
+    DataTransferCopyComponentSchema,
+    DataTransferExportComponentSchema,
+    DataTransferImportComponentSchema,
+)
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, COMPONENT_TYPE, AssetTypes
+from azure.ai.ml.constants._component import DataTransferTaskType, ExternalDataType, NodeType
+from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
+from azure.ai.ml.entities._inputs_outputs.output import Output
+from azure.ai.ml.entities._validation.core import MutableValidationResult
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
+
+from ..._schema import PathAwareSchema
+from .._util import convert_ordered_dict_to_dict, validate_attribute_type
+from .component import Component
+
+
+class DataTransferComponent(Component):
+    """DataTransfer component version, used to define a data transfer component.
+
+    :param task: Task type in the data transfer component. Possible values are "copy_data",
+                 "import_data", and "export_data".
+    :type task: str
+    :param inputs: Mapping of input data bindings used in the job.
+    :type inputs: dict
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: dict
+    :param kwargs: Additional parameters for the data transfer component.
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        task: Optional[str] = None,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+        kwargs[COMPONENT_TYPE] = NodeType.DATA_TRANSFER
+        # Set default base path
+        if BASE_PATH_CONTEXT_KEY not in kwargs:
+            kwargs[BASE_PATH_CONTEXT_KEY] = Path(".")
+
+        super().__init__(
+            inputs=inputs,
+            outputs=outputs,
+            **kwargs,
+        )
+        self._task = task
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {}
+
+    @property
+    def task(self) -> Optional[str]:
+        """Task type of the component.
+
+        :return: Task type of the component.
+        :rtype: str
+        """
+        return self._task
+
+    def _to_dict(self) -> Dict:
+        return cast(
+            dict,
+            convert_ordered_dict_to_dict({**self._other_parameter, **super(DataTransferComponent, self)._to_dict()}),
+        )
+
+    def __str__(self) -> str:
+        try:
+            _toYaml: str = self._to_yaml()
+            return _toYaml
+        except BaseException:  # pylint: disable=W0718
+            _toStr: str = super(DataTransferComponent, self).__str__()
+            return _toStr
+
+    @classmethod
+    def _build_source_sink(cls, io_dict: Union[Dict, Database, FileSystem]) -> Union[Database, FileSystem]:
+        component_io: Union[Database, FileSystem] = Database()
+
+        if isinstance(io_dict, Database):
+            component_io = Database()
+        elif isinstance(io_dict, FileSystem):
+            component_io = FileSystem()
+        else:
+            if isinstance(io_dict, dict):
+                data_type = io_dict.pop("type", None)
+                if data_type == ExternalDataType.DATABASE:
+                    component_io = Database()
+                elif data_type == ExternalDataType.FILE_SYSTEM:
+                    component_io = FileSystem()
+                else:
+                    msg = "Type in source or sink only support {} and {}, currently got {}."
+                    raise ValidationException(
+                        message=msg.format(
+                            ExternalDataType.DATABASE,
+                            ExternalDataType.FILE_SYSTEM,
+                            data_type,
+                        ),
+                        no_personal_data_message=msg.format(
+                            ExternalDataType.DATABASE,
+                            ExternalDataType.FILE_SYSTEM,
+                            "data_type",
+                        ),
+                        target=ErrorTarget.COMPONENT,
+                        error_category=ErrorCategory.USER_ERROR,
+                        error_type=ValidationErrorType.INVALID_VALUE,
+                    )
+            else:
+                msg = "Source or sink only support dict, Database and FileSystem"
+                raise ValidationException(
+                    message=msg,
+                    no_personal_data_message=msg,
+                    target=ErrorTarget.COMPONENT,
+                    error_category=ErrorCategory.USER_ERROR,
+                    error_type=ValidationErrorType.INVALID_VALUE,
+                )
+
+        return component_io
+
+
+@experimental
+class DataTransferCopyComponent(DataTransferComponent):
+    """DataTransfer copy component version, used to define a data transfer copy component.
+
+    :param data_copy_mode: Data copy mode in the copy task.
+                           Possible values are "merge_with_overwrite" and "fail_if_conflict".
+    :type data_copy_mode: str
+    :param inputs: Mapping of input data bindings used in the job.
+    :type inputs: dict
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: dict
+    :param kwargs: Additional parameters for the data transfer copy component.
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        data_copy_mode: Optional[str] = None,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        kwargs["task"] = DataTransferTaskType.COPY_DATA
+        super().__init__(
+            inputs=inputs,
+            outputs=outputs,
+            **kwargs,
+        )
+
+        self._data_copy_mode = data_copy_mode
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return DataTransferCopyComponentSchema(context=context)
+
+    @property
+    def data_copy_mode(self) -> Optional[str]:
+        """Data copy mode of the component.
+
+        :return: Data copy mode of the component.
+        :rtype: str
+        """
+        return self._data_copy_mode
+
+    def _customized_validate(self) -> MutableValidationResult:
+        validation_result = super(DataTransferCopyComponent, self)._customized_validate()
+        validation_result.merge_with(self._validate_input_output_mapping())
+        return validation_result
+
+    def _validate_input_output_mapping(self) -> MutableValidationResult:
+        validation_result = self._create_empty_validation_result()
+        inputs_count = len(self.inputs)
+        outputs_count = len(self.outputs)
+        if outputs_count != 1:
+            msg = "Only support single output in {}, but there're {} outputs."
+            validation_result.append_error(
+                message=msg.format(DataTransferTaskType.COPY_DATA, outputs_count),
+                yaml_path="outputs",
+            )
+        else:
+            input_type = None
+            output_type = None
+            if inputs_count == 1:
+                for _, input_data in self.inputs.items():
+                    input_type = input_data.type
+                for _, output_data in self.outputs.items():
+                    output_type = output_data.type
+                if input_type is None or output_type is None or input_type != output_type:
+                    msg = "Input type {} doesn't exactly match with output type {} in task {}"
+                    validation_result.append_error(
+                        message=msg.format(input_type, output_type, DataTransferTaskType.COPY_DATA),
+                        yaml_path="outputs",
+                    )
+            elif inputs_count > 1:
+                for _, output_data in self.outputs.items():
+                    output_type = output_data.type
+                if output_type is None or output_type != AssetTypes.URI_FOLDER:
+                    msg = "output type {} need to be {} in task {}"
+                    validation_result.append_error(
+                        message=msg.format(
+                            output_type,
+                            AssetTypes.URI_FOLDER,
+                            DataTransferTaskType.COPY_DATA,
+                        ),
+                        yaml_path="outputs",
+                    )
+            else:
+                msg = "Inputs must be set in task {}."
+                validation_result.append_error(
+                    message=msg.format(DataTransferTaskType.COPY_DATA),
+                    yaml_path="inputs",
+                )
+        return validation_result
+
+
+@experimental
+class DataTransferImportComponent(DataTransferComponent):
+    """DataTransfer import component version, used to define a data transfer import component.
+
+    :param source: The data source of the file system or database.
+    :type source: dict
+    :param outputs: Mapping of output data bindings used in the job.
+                    Default value is an output port with the key "sink" and the type "mltable".
+    :type outputs: dict
+    :param kwargs: Additional parameters for the data transfer import component.
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        source: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        outputs = outputs or {"sink": Output(type=AssetTypes.MLTABLE)}
+        kwargs["task"] = DataTransferTaskType.IMPORT_DATA
+        super().__init__(
+            outputs=outputs,
+            **kwargs,
+        )
+
+        source = source if source else {}
+        self.source = self._build_source_sink(source)
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return DataTransferImportComponentSchema(context=context)
+
+    # pylint: disable-next=docstring-missing-param
+    def __call__(self, *args: Any, **kwargs: Any) -> NoReturn:
+        """Call ComponentVersion as a function and get a Component object."""
+
+        msg = "DataTransfer component is not callable for import task."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.COMPONENT,
+            error_category=ErrorCategory.USER_ERROR,
+        )
+
+
+@experimental
+class DataTransferExportComponent(DataTransferComponent):
+    """DataTransfer export component version, used to define a data transfer export component.
+
+    :param sink: The sink of external data and databases.
+    :type sink: Union[Dict, Database, FileSystem]
+    :param inputs: Mapping of input data bindings used in the job.
+    :type inputs: dict
+    :param kwargs: Additional parameters for the data transfer export component.
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        inputs: Optional[Dict] = None,
+        sink: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        kwargs["task"] = DataTransferTaskType.EXPORT_DATA
+        super().__init__(
+            inputs=inputs,
+            **kwargs,
+        )
+
+        sink = sink if sink else {}
+        self.sink = self._build_source_sink(sink)
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return DataTransferExportComponentSchema(context=context)
+
+    # pylint: disable-next=docstring-missing-param
+    def __call__(self, *args: Any, **kwargs: Any) -> NoReturn:
+        """Call ComponentVersion as a function and get a Component object."""
+
+        msg = "DataTransfer component is not callable for export task."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.COMPONENT,
+            error_category=ErrorCategory.USER_ERROR,
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py
new file mode 100644
index 00000000..e4ff06cc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py
@@ -0,0 +1,553 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import contextlib
+import json
+import os
+from collections import defaultdict
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Union
+
+import yaml  # type: ignore[import]
+from marshmallow import EXCLUDE, Schema, ValidationError
+
+from azure.ai.ml.constants._common import (
+    BASE_PATH_CONTEXT_KEY,
+    COMPONENT_TYPE,
+    PROMPTFLOW_AZUREML_OVERRIDE_KEY,
+    SOURCE_PATH_CONTEXT_KEY,
+    AssetTypes,
+    SchemaUrl,
+)
+from azure.ai.ml.constants._component import ComponentParameterTypes, NodeType
+
+from ..._restclient.v2022_10_01.models import ComponentVersion
+from ..._schema import PathAwareSchema
+from ..._schema.component.flow import FlowComponentSchema, FlowSchema, RunSchema
+from ...exceptions import ErrorCategory, ErrorTarget, ValidationException
+from .. import Environment
+from .._inputs_outputs import GroupInput, Input, Output
+from ._additional_includes import AdditionalIncludesMixin
+from .component import Component
+
+# avoid circular import error
+if TYPE_CHECKING:
+    from azure.ai.ml.entities._builders.parallel import Parallel
+
+# pylint: disable=protected-access
+
+
+class _FlowPortNames:
+    """Common yaml fields.
+
+    Common yaml fields are used to define the common fields in yaml files. It can be one of the following values: type,
+    name, $schema.
+    """
+
+    DATA = "data"
+    RUN_OUTPUTS = "run_outputs"
+    CONNECTIONS = "connections"
+
+    FLOW_OUTPUTS = "flow_outputs"
+    DEBUG_INFO = "debug_info"
+
+
+class _FlowComponentPortDict(dict):
+    def __init__(self, ports: Dict):
+        self._allow_update_item = True
+        super().__init__()
+        for input_port_name, input_port in ports.items():
+            self[input_port_name] = input_port
+        self._allow_update_item = False
+
+    def __setitem__(self, key: Any, value: Any) -> None:
+        if not self._allow_update_item:
+            raise RuntimeError("Ports of flow component are not editable.")
+        super().__setitem__(key, value)
+
+    def __delitem__(self, key: Any) -> None:
+        if not self._allow_update_item:
+            raise RuntimeError("Ports of flow component are not editable.")
+        super().__delitem__(key)
+
+
+class FlowComponentInputDict(_FlowComponentPortDict):
+    """Input port dictionary for FlowComponent, with fixed input ports."""
+
+    def __init__(self) -> None:
+        super().__init__(
+            {
+                _FlowPortNames.CONNECTIONS: GroupInput(values={}, _group_class=None),
+                _FlowPortNames.DATA: Input(type=AssetTypes.URI_FOLDER, optional=False),
+                _FlowPortNames.FLOW_OUTPUTS: Input(type=AssetTypes.URI_FOLDER, optional=True),
+            }
+        )
+
+    @contextlib.contextmanager
+    def _fit_inputs(self, inputs: Optional[Dict]) -> Generator:
+        """Add dynamic input ports to the input port dictionary.
+        Input ports of a flow component include:
+        1. data: required major uri_folder input
+        2. run_output: optional uri_folder input
+        3. connections.xxx.xxx: group of string parameters, first layer key can be any node name,
+           but we won't resolve the exact keys in SDK
+        4. xxx: input_mapping parameters, key can be any node name, but we won't resolve the exact keys in SDK
+
+        #3 will be grouped into connections, we make it a fixed group input port.
+        #4 are dynamic input ports, we will add them temporarily in this context manager and remove them
+        after the context manager is finished.
+
+        :param inputs: The dynamic input to fit.
+        :type inputs: Dict[str, Any]
+        :return: None
+        :rtype: None
+        """
+        dynamic_columns_mapping_keys = []
+        dynamic_connections_inputs = defaultdict(list)
+        from azure.ai.ml.entities._job.pipeline._io import _GroupAttrDict
+        from azure.ai.ml.entities._job.pipeline._io.mixin import flatten_dict
+
+        flattened_inputs = flatten_dict(inputs, _GroupAttrDict, allow_dict_fields=[_FlowPortNames.CONNECTIONS])
+
+        for flattened_input_key in flattened_inputs:
+            if flattened_input_key.startswith(f"{_FlowPortNames.CONNECTIONS}."):
+                if flattened_input_key.count(".") != 2:
+                    raise ValidationException(
+                        message="flattened connection input prot name must be "
+                        "in the format of connections.<node_name>.<port_name>, "
+                        "but got %s" % flattened_input_key,
+                        no_personal_data_message="flattened connection input prot name must be in the format of "
+                        "connections.<node_name>.<port_name>",
+                        target=ErrorTarget.COMPONENT,
+                        error_category=ErrorCategory.USER_ERROR,
+                    )
+                _, node_name, param_name = flattened_input_key.split(".")
+                dynamic_connections_inputs[node_name].append(param_name)
+                continue
+            if flattened_input_key not in self:
+                dynamic_columns_mapping_keys.append(flattened_input_key)
+
+        self._allow_update_item = True
+        for flattened_input_key in dynamic_columns_mapping_keys:
+            self[flattened_input_key] = Input(type=ComponentParameterTypes.STRING, optional=True)
+        if dynamic_connections_inputs:
+            self[_FlowPortNames.CONNECTIONS] = GroupInput(
+                values={
+                    node_name: GroupInput(
+                        values={
+                            parameter_name: Input(
+                                type=ComponentParameterTypes.STRING,
+                            )
+                            for parameter_name in param_names
+                        },
+                        _group_class=None,
+                    )
+                    for node_name, param_names in dynamic_connections_inputs.items()
+                },
+                _group_class=None,
+            )
+        self._allow_update_item = False
+
+        yield
+
+        self._allow_update_item = True
+        for flattened_input_key in dynamic_columns_mapping_keys:
+            del self[flattened_input_key]
+        self[_FlowPortNames.CONNECTIONS] = GroupInput(values={}, _group_class=None)
+        self._allow_update_item = False
+
+
+class FlowComponentOutputDict(_FlowComponentPortDict):
+    """Output port dictionary for FlowComponent, with fixed output ports."""
+
+    def __init__(self) -> None:
+        super().__init__(
+            {
+                _FlowPortNames.FLOW_OUTPUTS: Output(type=AssetTypes.URI_FOLDER),
+                _FlowPortNames.DEBUG_INFO: Output(type=AssetTypes.URI_FOLDER),
+            }
+        )
+
+
+class FlowComponent(Component, AdditionalIncludesMixin):
+    """Flow component version, used to define a Flow Component or Job.
+
+    :keyword name: The name of the Flow job or component.
+    :type name: Optional[str]
+    :keyword version: The version of the Flow job or component.
+    :type version: Optional[str]
+    :keyword description: The description of the component. Defaults to None.
+    :type description: Optional[str]
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None.
+    :type tags: Optional[dict]
+    :keyword display_name: The display name of the component.
+    :type display_name: Optional[str]
+    :keyword flow: The path to the flow directory or flow definition file. Defaults to None and base path of this
+        component will be used as flow directory.
+    :type flow: Optional[Union[str, Path]]
+    :keyword column_mappings: The column mapping for the flow. Defaults to None.
+    :type column_mapping: Optional[dict[str, str]]
+    :keyword variant: The variant of the flow. Defaults to None.
+    :type variant: Optional[str]
+    :keyword connections: The connections for the flow. Defaults to None.
+    :type connections: Optional[dict[str, dict[str, str]]]
+    :keyword environment_variables: The environment variables for the flow. Defaults to None.
+    :type environment_variables: Optional[dict[str, str]]
+    :keyword environment: The environment for the flow component. Defaults to None.
+    :type environment: Optional[Union[str, Environment])
+    :keyword is_deterministic: Specifies whether the Flow will return the same output given the same input.
+        Defaults to True. When True, if a Flow (component) is deterministic and has been run before in the
+        current workspace with the same input and settings, it will reuse results from a previous submitted job
+        when used as a node or step in a pipeline. In that scenario, no compute resources will be used.
+    :type is_deterministic: Optional[bool]
+    :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None.
+    :type additional_includes: Optional[list[str]]
+    :keyword properties: The job property dictionary. Defaults to None.
+    :type properties: Optional[dict[str, str]]
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if FlowComponent cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        display_name: Optional[str] = None,
+        flow: Optional[Union[str, Path]] = None,
+        column_mapping: Optional[Dict[str, str]] = None,
+        variant: Optional[str] = None,
+        connections: Optional[Dict[str, Dict[str, str]]] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        environment: Optional[Union[str, Environment]] = None,
+        is_deterministic: bool = True,
+        additional_includes: Optional[List] = None,
+        properties: Optional[Dict] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        kwargs[COMPONENT_TYPE] = NodeType.FLOW_PARALLEL
+
+        # always use flow directory as base path
+        # Note: we suppose that there is no relative path in run.yaml other than flow.
+        #   If there are any, we will need to rebase them so that they have the same base path as attributes in
+        #   flow.dag.yaml
+        flow_dir, self._flow = self._get_flow_definition(
+            flow=flow,
+            base_path=kwargs.pop(BASE_PATH_CONTEXT_KEY, Path.cwd()),
+            source_path=kwargs.get(SOURCE_PATH_CONTEXT_KEY, None),
+        )
+        kwargs[BASE_PATH_CONTEXT_KEY] = flow_dir
+
+        super().__init__(
+            name=name or self._normalize_component_name(flow_dir.name),
+            version=version or "1",
+            description=description,
+            tags=tags,
+            display_name=display_name,
+            inputs={},
+            outputs={},
+            is_deterministic=is_deterministic,
+            properties=properties,
+            **kwargs,
+        )
+        self._environment = environment
+        self._column_mapping = column_mapping or {}
+        self._variant = variant
+        self._connections = connections or {}
+
+        self._inputs = FlowComponentInputDict()
+        self._outputs = FlowComponentOutputDict()
+
+        if flow:
+            # file existence has been checked in _get_flow_definition
+            # we don't need to rebase additional_includes as we have updated base_path
+            with open(Path(self.base_path, self._flow), "r", encoding="utf-8") as f:
+                flow_content = yaml.safe_load(f.read())
+            additional_includes = flow_content.get("additional_includes", None)
+            # environment variables in run.yaml have higher priority than those in flow.dag.yaml
+            self._environment_variables = flow_content.get("environment_variables", {})
+            self._environment_variables.update(environment_variables or {})
+        else:
+            self._environment_variables = environment_variables or {}
+
+        self._additional_includes = additional_includes or []
+
+        # unlike other Component, code is a private property in FlowComponent and
+        # will be used to store the arm id of the created code before constructing rest object
+        # we haven't used self.flow directly as self.flow can be a path to the flow dag yaml file instead of a directory
+        self._code_arm_id: Optional[str] = None
+
+    # region valid properties
+    @property
+    def flow(self) -> str:
+        """The path to the flow definition file relative to the flow directory.
+
+        :rtype: str
+        """
+        return self._flow
+
+    @property
+    def environment(self) -> Optional[Union[str, Environment]]:
+        """The environment for the flow component. Defaults to None.
+
+        :rtype: Union[str, Environment])
+        """
+        return self._environment
+
+    @environment.setter
+    def environment(self, value: Union[str, Environment]) -> None:
+        """The environment for the flow component. Defaults to None.
+
+        :param value: The column mapping for the flow.
+        :type value: Union[str, Environment])
+        """
+        self._environment = value
+
+    @property
+    def column_mapping(self) -> Dict[str, str]:
+        """The column mapping for the flow. Defaults to None.
+
+        :rtype: Dict[str, str]
+        """
+        return self._column_mapping
+
+    @column_mapping.setter
+    def column_mapping(self, value: Optional[Dict[str, str]]) -> None:
+        """
+        The column mapping for the flow. Defaults to None.
+
+        :param value: The column mapping for the flow.
+        :type value: Optional[Dict[str, str]]
+        """
+        self._column_mapping = value or {}
+
+    @property
+    def variant(self) -> Optional[str]:
+        """The variant of the flow. Defaults to None.
+
+        :rtype: Optional[str]
+        """
+        return self._variant
+
+    @variant.setter
+    def variant(self, value: Optional[str]) -> None:
+        """The variant of the flow. Defaults to None.
+
+        :param value: The variant of the flow.
+        :type value: Optional[str]
+        """
+        self._variant = value
+
+    @property
+    def connections(self) -> Dict[str, Dict[str, str]]:
+        """The connections for the flow. Defaults to None.
+
+        :rtype: Dict[str, Dict[str, str]]
+        """
+        return self._connections
+
+    @connections.setter
+    def connections(self, value: Optional[Dict[str, Dict[str, str]]]) -> None:
+        """
+        The connections for the flow. Defaults to None.
+
+        :param value: The connections for the flow.
+        :type value: Optional[Dict[str, Dict[str, str]]]
+        """
+        self._connections = value or {}
+
+    @property
+    def environment_variables(self) -> Dict[str, str]:
+        """The environment variables for the flow. Defaults to None.
+
+        :rtype: Dict[str, str]
+        """
+        return self._environment_variables
+
+    @environment_variables.setter
+    def environment_variables(self, value: Optional[Dict[str, str]]) -> None:
+        """The environment variables for the flow. Defaults to None.
+
+        :param value: The environment variables for the flow.
+        :type value: Optional[Dict[str, str]]
+        """
+        self._environment_variables = value or {}
+
+    @property
+    def additional_includes(self) -> List:
+        """A list of shared additional files to be included in the component. Defaults to None.
+
+        :rtype: List
+        """
+        return self._additional_includes
+
+    @additional_includes.setter
+    def additional_includes(self, value: Optional[List]) -> None:
+        """A list of shared additional files to be included in the component. Defaults to None.
+        All local additional includes should be relative to the flow directory.
+
+        :param value: A list of shared additional files to be included in the component.
+        :type value: Optional[List]
+        """
+        self._additional_includes = value or []
+
+    # endregion
+
+    @classmethod
+    def _normalize_component_name(cls, value: str) -> str:
+        return value.replace("-", "_")
+
+    # region Component
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+        raise RuntimeError("FlowComponent does not support loading from REST object.")
+
+    def _to_rest_object(self) -> ComponentVersion:
+        rest_obj = super()._to_rest_object()
+        rest_obj.properties.component_spec["code"] = self._code_arm_id
+        rest_obj.properties.component_spec["flow_file_name"] = self._flow
+        return rest_obj
+
+    def _func(self, **kwargs: Any) -> "Parallel":  # pylint: disable=invalid-overridden-method
+        from azure.ai.ml.entities._builders.parallel import Parallel
+
+        with self._inputs._fit_inputs(kwargs):  # type: ignore[attr-defined]
+            # pylint: disable=not-callable
+            return super()._func(**kwargs)  # type: ignore
+
+    @classmethod
+    def _get_flow_definition(
+        cls,
+        base_path: Path,
+        *,
+        flow: Optional[Union[str, os.PathLike]] = None,
+        source_path: Optional[Union[str, os.PathLike]] = None,
+    ) -> Tuple[Path, str]:
+        """
+        Get the path to the flow directory and the file name of the flow dag yaml file.
+        If flow is not specified, we will assume that the source_path is the path to the flow dag yaml file.
+        If flow is specified, it can be either a path to the flow dag yaml file or a path to the flow directory.
+        If flow is a path to the flow directory, we will assume that the flow dag yaml file is named flow.dag.yaml.
+
+        :param base_path: The base path of the flow component.
+        :type base_path: Path
+        :keyword flow: The path to the flow directory or flow definition file. Defaults to None and base path of this
+            component will be used as flow directory.
+        :type flow: Optional[Union[str, Path]]
+        :keyword source_path: The source path of the flow component, should be path to the flow dag yaml file
+            if specified.
+        :type source_path: Optional[Union[str, os.PathLike]]
+        :return: The path to the flow directory and the file name of the flow dag yaml file.
+        :rtype: Tuple[Path, str]
+        """
+        flow_file_name = "flow.dag.yaml"
+
+        if flow is None and source_path is None:
+            raise cls._create_validation_error(
+                message="Either flow or source_path must be specified.",
+                no_personal_data_message="Either flow or source_path must be specified.",
+            )
+
+        if flow is None:
+            # Flow component must be created with a local yaml file, so no need to check if source_path exists
+            if isinstance(source_path, (os.PathLike, str)):
+                flow_file_name = os.path.basename(source_path)
+            return Path(base_path), flow_file_name
+
+        flow_path = Path(flow)
+        if not flow_path.is_absolute():
+            # if flow_path points to a symlink, we still use the parent of the symlink as origin code
+            flow_path = Path(base_path, flow)
+
+        if flow_path.is_dir() and (flow_path / flow_file_name).is_file():
+            return flow_path, flow_file_name
+
+        if flow_path.is_file():
+            return flow_path.parent, flow_path.name
+
+        raise cls._create_validation_error(
+            message="Flow path must be a directory containing flow.dag.yaml or a file, but got %s" % flow_path,
+            no_personal_data_message="Flow path must be a directory or a file",
+        )
+
+    # endregion
+
+    # region SchemaValidatableMixin
+    @classmethod
+    def _load_with_schema(
+        cls, data: Any, *, context: Optional[Any] = None, raise_original_exception: bool = False, **kwargs: Any
+    ) -> Any:
+        # FlowComponent should be loaded with FlowSchema or FlowRunSchema instead of FlowComponentSchema
+        context = context or {BASE_PATH_CONTEXT_KEY: Path.cwd()}
+        _schema = data.get("$schema", None)
+        if _schema == SchemaUrl.PROMPTFLOW_RUN:
+            schema = RunSchema(context=context)
+        elif _schema == SchemaUrl.PROMPTFLOW_FLOW:
+            schema = FlowSchema(context=context)
+        else:
+            raise cls._create_validation_error(
+                message="$schema must be specified correctly for loading component from flow, but got %s" % _schema,
+                no_personal_data_message="$schema must be specified for loading component from flow",
+            )
+
+        # unlike other component, we should ignore unknown fields in flow to keep init_params clean and avoid
+        # too much understanding of flow.dag.yaml & run.yaml
+        kwargs["unknown"] = EXCLUDE
+        try:
+            loaded_dict = schema.load(data, **kwargs)
+        except ValidationError as e:
+            if raise_original_exception:
+                raise e
+            msg = "Trying to load data with schema failed. Data:\n%s\nError: %s" % (
+                json.dumps(data, indent=4) if isinstance(data, dict) else data,
+                json.dumps(e.messages, indent=4),
+            )
+            raise cls._create_validation_error(
+                message=msg,
+                no_personal_data_message=str(e),
+            ) from e
+        loaded_dict.update(loaded_dict.pop(PROMPTFLOW_AZUREML_OVERRIDE_KEY, {}))
+        return loaded_dict
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return FlowComponentSchema(context=context)
+
+    # endregion
+
+    # region AdditionalIncludesMixin
+    def _get_origin_code_value(self) -> Union[str, os.PathLike, None]:
+        if self._code_arm_id:
+            return self._code_arm_id
+        res: Union[str, os.PathLike, None] = self.base_path
+        return res
+
+    def _fill_back_code_value(self, value: str) -> None:
+        self._code_arm_id = value
+
+    @contextlib.contextmanager
+    def _try_build_local_code(self) -> Generator:
+        # false-positive by pylint, hence disable it
+        # (https://github.com/pylint-dev/pylint/blob/main/doc/data/messages
+        # /c/contextmanager-generator-missing-cleanup/details.rst)
+        with super()._try_build_local_code() as code:  # pylint:disable=contextmanager-generator-missing-cleanup
+            if not code or not code.path:
+                yield code
+                return
+
+            if not (Path(code.path) / ".promptflow" / "flow.tools.json").is_file():
+                raise self._create_validation_error(
+                    message="Flow component must be created with a ./promptflow/flow.tools.json, "
+                    "please run `pf flow validate` to generate it or skip it in your ignore file.",
+                    no_personal_data_message="Flow component must be created with a ./promptflow/flow.tools.json, "
+                    "please run `pf flow validate` to generate it or skip it in your ignore file.",
+                )
+            # TODO: should we remove additional includes from flow.dag.yaml? for now we suppose it will be removed
+            #  by mldesigner compile if needed
+
+            yield code
+
+    # endregion
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py
new file mode 100644
index 00000000..13464a06
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py
@@ -0,0 +1,96 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from pathlib import Path
+from typing import Any, Dict, Optional, Union
+
+from marshmallow import Schema
+
+from azure.ai.ml._schema.component.import_component import ImportComponentSchema
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+
+from ..._schema import PathAwareSchema
+from ..._utils.utils import parse_args_description_from_docstring
+from .._util import convert_ordered_dict_to_dict
+from .component import Component
+
+
+class ImportComponent(Component):
+    """Import component version, used to define an import component.
+
+    :param name: Name of the component.
+    :type name: str
+    :param version: Version of the component.
+    :type version: str
+    :param description: Description of the component.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict
+    :param display_name: Display name of the component.
+    :type display_name: str
+    :param source: Input source parameters of the component.
+    :type source: dict
+    :param output: Output of the component.
+    :type output: dict
+    :param is_deterministic: Whether the command component is deterministic. Defaults to True.
+    :type is_deterministic: bool
+    :param kwargs: Additional parameters for the import component.
+    """
+
+    def __init__(
+        self,
+        *,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        display_name: Optional[str] = None,
+        source: Optional[Dict] = None,
+        output: Optional[Dict] = None,
+        is_deterministic: bool = True,
+        **kwargs: Any,
+    ) -> None:
+        kwargs[COMPONENT_TYPE] = NodeType.IMPORT
+        # Set default base path
+        if BASE_PATH_CONTEXT_KEY not in kwargs:
+            kwargs[BASE_PATH_CONTEXT_KEY] = Path(".")
+
+        super().__init__(
+            name=name,
+            version=version,
+            description=description,
+            tags=tags,
+            display_name=display_name,
+            inputs=source,
+            outputs={"output": output} if output else None,
+            is_deterministic=is_deterministic,
+            **kwargs,
+        )
+
+        self.source = source
+        self.output = output
+
+    def _to_dict(self) -> Dict:
+        # TODO: Bug Item number: 2897665
+        res: Dict = convert_ordered_dict_to_dict(  # type: ignore
+            {**self._other_parameter, **super(ImportComponent, self)._to_dict()}
+        )
+        return res
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return ImportComponentSchema(context=context)
+
+    @classmethod
+    def _parse_args_description_from_docstring(cls, docstring: str) -> Dict:
+        res: dict = parse_args_description_from_docstring(docstring)
+        return res
+
+    def __str__(self) -> str:
+        try:
+            toYaml: str = self._to_yaml()
+            return toYaml
+        except BaseException:  # pylint: disable=W0718
+            toStr: str = super(ImportComponent, self).__str__()
+            return toStr
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py
new file mode 100644
index 00000000..3f29b1e1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py
@@ -0,0 +1,305 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import json
+import os
+import re
+from typing import Any, Dict, List, Optional, Union, cast
+
+from marshmallow import Schema
+
+from azure.ai.ml._restclient.v2022_10_01.models import ComponentVersion
+from azure.ai.ml._schema.component.parallel_component import ParallelComponentSchema
+from azure.ai.ml.constants._common import COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration
+from azure.ai.ml.entities._job.parallel.parallel_task import ParallelTask
+from azure.ai.ml.entities._job.parallel.parameterized_parallel import ParameterizedParallel
+from azure.ai.ml.entities._job.parallel.retry_settings import RetrySettings
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+from ..._schema import PathAwareSchema
+from .._util import validate_attribute_type
+from .._validation import MutableValidationResult
+from .code import ComponentCodeMixin
+from .component import Component
+
+
+class ParallelComponent(
+    Component, ParameterizedParallel, ComponentCodeMixin
+):  # pylint: disable=too-many-instance-attributes
+    """Parallel component version, used to define a parallel component.
+
+    :param name: Name of the component. Defaults to None
+    :type name: str
+    :param version: Version of the component. Defaults to None
+    :type version: str
+    :param description: Description of the component. Defaults to None
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None
+    :type tags: dict
+    :param display_name: Display name of the component. Defaults to None
+    :type display_name: str
+    :param retry_settings: parallel component run failed retry. Defaults to None
+    :type retry_settings: BatchRetrySettings
+    :param logging_level: A string of the logging level name. Defaults to None
+    :type logging_level: str
+    :param max_concurrency_per_instance: The max parallellism that each compute instance has. Defaults to None
+    :type max_concurrency_per_instance: int
+    :param error_threshold: The number of item processing failures should be ignored. Defaults to None
+    :type error_threshold: int
+    :param mini_batch_error_threshold: The number of mini batch processing failures should be ignored. Defaults to None
+    :type mini_batch_error_threshold: int
+    :param task: The parallel task. Defaults to None
+    :type task: ParallelTask
+    :param mini_batch_size: For FileDataset input, this field is the number of files a user script can process
+        in one run() call. For TabularDataset input, this field is the approximate size of data the user script
+        can process in one run() call. Example values are 1024, 1024KB, 10MB, and 1GB.
+        (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
+        through PipelineParameter.
+    :type mini_batch_size: str
+    :param partition_keys:  The keys used to partition dataset into mini-batches. Defaults to None
+        If specified, the data with the same key will be partitioned into the same mini-batch.
+        If both partition_keys and mini_batch_size are specified, partition_keys will take effect.
+        The input(s) must be partitioned dataset(s),
+        and the partition_keys must be a subset of the keys of every input dataset for this to work.
+    :type partition_keys: list
+    :param input_data: The input data. Defaults to None
+    :type input_data: str
+    :param resources: Compute Resource configuration for the component. Defaults to None
+    :type resources: Union[dict, ~azure.ai.ml.entities.JobResourceConfiguration]
+    :param inputs: Inputs of the component. Defaults to None
+    :type inputs: dict
+    :param outputs: Outputs of the component. Defaults to None
+    :type outputs: dict
+    :param code: promoted property from task.code
+    :type code: str
+    :param instance_count: promoted property from resources.instance_count. Defaults to None
+    :type instance_count: int
+    :param is_deterministic: Whether the parallel component is deterministic. Defaults to True
+    :type is_deterministic: bool
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if ParallelComponent cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(  # pylint: disable=too-many-locals
+        self,
+        *,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict[str, Any]] = None,
+        display_name: Optional[str] = None,
+        retry_settings: Optional[RetrySettings] = None,
+        logging_level: Optional[str] = None,
+        max_concurrency_per_instance: Optional[int] = None,
+        error_threshold: Optional[int] = None,
+        mini_batch_error_threshold: Optional[int] = None,
+        task: Optional[ParallelTask] = None,
+        mini_batch_size: Optional[str] = None,
+        partition_keys: Optional[List] = None,
+        input_data: Optional[str] = None,
+        resources: Optional[JobResourceConfiguration] = None,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        code: Optional[str] = None,  # promoted property from task.code
+        instance_count: Optional[int] = None,  # promoted property from resources.instance_count
+        is_deterministic: bool = True,
+        **kwargs: Any,
+    ):
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+        kwargs[COMPONENT_TYPE] = NodeType.PARALLEL
+
+        super().__init__(
+            name=name,
+            version=version,
+            description=description,
+            tags=tags,
+            display_name=display_name,
+            inputs=inputs,
+            outputs=outputs,
+            is_deterministic=is_deterministic,
+            **kwargs,
+        )
+
+        # No validation on value passed here because in pipeline job, required code&environment maybe absent
+        # and fill in later with job defaults.
+        self.task = task
+        self.mini_batch_size: int = 0
+        self.partition_keys = partition_keys
+        self.input_data = input_data
+        self.retry_settings = retry_settings
+        self.logging_level = logging_level
+        self.max_concurrency_per_instance = max_concurrency_per_instance
+        self.error_threshold = error_threshold
+        self.mini_batch_error_threshold = mini_batch_error_threshold
+        self.resources = resources
+
+        # check mutual exclusivity of promoted properties
+        if self.resources is not None and instance_count is not None:
+            msg = "instance_count and resources are mutually exclusive"
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.COMPONENT,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        self.instance_count = instance_count
+        self.code = code
+
+        if mini_batch_size is not None:
+            # Convert str to int.
+            pattern = re.compile(r"^\d+([kKmMgG][bB])*$")
+            if not pattern.match(mini_batch_size):
+                raise ValueError(r"Parameter mini_batch_size must follow regex rule ^\d+([kKmMgG][bB])*$")
+
+            try:
+                self.mini_batch_size = int(mini_batch_size)
+            except ValueError as e:
+                unit = mini_batch_size[-2:].lower()
+                if unit == "kb":
+                    self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024
+                elif unit == "mb":
+                    self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024
+                elif unit == "gb":
+                    self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024 * 1024
+                else:
+                    raise ValueError("mini_batch_size unit must be kb, mb or gb") from e
+
+    @property
+    def instance_count(self) -> Optional[int]:
+        """Return value of promoted property resources.instance_count.
+
+        :return: Value of resources.instance_count.
+        :rtype: Optional[int]
+        """
+        return self.resources.instance_count if self.resources and not isinstance(self.resources, dict) else None
+
+    @instance_count.setter
+    def instance_count(self, value: int) -> None:
+        """Set the value of the promoted property resources.instance_count.
+
+        :param value: The value to set for resources.instance_count.
+        :type value: int
+        """
+        if not value:
+            return
+        if not self.resources:
+            self.resources = JobResourceConfiguration(instance_count=value)
+        else:
+            if not isinstance(self.resources, dict):
+                self.resources.instance_count = value
+
+    @property
+    def code(self) -> Optional[str]:
+        """Return value of promoted property task.code, which is a local or
+        remote path pointing at source code.
+
+        :return: Value of task.code.
+        :rtype: Optional[str]
+        """
+        return self.task.code if self.task else None
+
+    @code.setter
+    def code(self, value: str) -> None:
+        """Set the value of the promoted property task.code.
+
+        :param value: The value to set for task.code.
+        :type value: str
+        """
+        if not value:
+            return
+        if not self.task:
+            self.task = ParallelTask(code=value)
+        else:
+            self.task.code = value
+
+    def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+        """Dump the component content into a sorted yaml string.
+
+        :return: The ordered dict
+        :rtype: Dict
+        """
+
+        obj: dict = super()._to_ordered_dict_for_yaml_dump()
+        # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+        if self.code and isinstance(self.code, str):
+            obj["task"]["code"] = self.code
+        return obj
+
+    @property
+    def environment(self) -> Optional[str]:
+        """Return value of promoted property task.environment, indicate the
+        environment that training job will run in.
+
+        :return: Value of task.environment.
+        :rtype: Optional[Environment, str]
+        """
+        if self.task:
+            return cast(Optional[str], self.task.environment)
+        return None
+
+    @environment.setter
+    def environment(self, value: str) -> None:
+        """Set the value of the promoted property task.environment.
+
+        :param value: The value to set for task.environment.
+        :type value: str
+        """
+        if not value:
+            return
+        if not self.task:
+            self.task = ParallelTask(environment=value)
+        else:
+            self.task.environment = value
+
+    def _customized_validate(self) -> MutableValidationResult:
+        validation_result = super()._customized_validate()
+        self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
+        return validation_result
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "retry_settings": (dict, RetrySettings),
+            "task": (dict, ParallelTask),
+            "logging_level": str,
+            "max_concurrency_per_instance": int,
+            "input_data": str,
+            "error_threshold": int,
+            "mini_batch_error_threshold": int,
+            "code": (str, os.PathLike),
+            "resources": (dict, JobResourceConfiguration),
+        }
+
+    def _to_rest_object(self) -> ComponentVersion:
+        rest_object = super()._to_rest_object()
+        # schema required list while backend accept json string
+        if self.partition_keys:
+            rest_object.properties.component_spec["partition_keys"] = json.dumps(self.partition_keys)
+        return rest_object
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+        # schema required list while backend accept json string
+        # update rest obj as it will be
+        partition_keys = obj.properties.component_spec.get("partition_keys", None)
+        if partition_keys:
+            obj.properties.component_spec["partition_keys"] = json.loads(partition_keys)
+        res: dict = super()._from_rest_object_to_init_params(obj)
+        return res
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return ParallelComponentSchema(context=context)
+
+    def __str__(self) -> str:
+        try:
+            toYaml: str = self._to_yaml()
+            return toYaml
+        except BaseException:  # pylint: disable=W0718
+            toStr: str = super(ParallelComponent, self).__str__()
+            return toStr
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py
new file mode 100644
index 00000000..229b714d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py
@@ -0,0 +1,529 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import json
+import logging
+import os
+import re
+import time
+import typing
+from collections import Counter
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+from marshmallow import Schema
+
+from azure.ai.ml._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties
+from azure.ai.ml._schema import PathAwareSchema
+from azure.ai.ml._schema.pipeline.pipeline_component import PipelineComponentSchema
+from azure.ai.ml._utils._asset_utils import get_object_hash
+from azure.ai.ml._utils.utils import hash_dict, is_data_binding_expression
+from azure.ai.ml.constants._common import ARM_ID_PREFIX, ASSET_ARM_ID_REGEX_FORMAT, COMPONENT_TYPE
+from azure.ai.ml.constants._component import ComponentSource, NodeType
+from azure.ai.ml.constants._job.pipeline import ValidationErrorCode
+from azure.ai.ml.entities._builders import BaseNode, Command
+from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode, LoopNode
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._inputs_outputs import GroupInput, Input
+from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+from azure.ai.ml.entities._job.pipeline._attr_dict import has_attr_safe, try_get_non_arbitrary_attr
+from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression
+from azure.ai.ml.entities._validation import MutableValidationResult
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+module_logger = logging.getLogger(__name__)
+
+
+class PipelineComponent(Component):
+    """Pipeline component, currently used to store components in an azure.ai.ml.dsl.pipeline.
+
+    :param name: Name of the component.
+    :type name: str
+    :param version: Version of the component.
+    :type version: str
+    :param description: Description of the component.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict
+    :param display_name: Display name of the component.
+    :type display_name: str
+    :param inputs: Component inputs.
+    :type inputs: dict
+    :param outputs: Component outputs.
+    :type outputs: dict
+    :param jobs: Id to components dict inside the pipeline definition.
+    :type jobs: Dict[str, ~azure.ai.ml.entities._builders.BaseNode]
+    :param is_deterministic: Whether the pipeline component is deterministic.
+    :type is_deterministic: bool
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if PipelineComponent cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        display_name: Optional[str] = None,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        jobs: Optional[Dict[str, BaseNode]] = None,
+        is_deterministic: Optional[bool] = None,
+        **kwargs: Any,
+    ) -> None:
+        kwargs[COMPONENT_TYPE] = NodeType.PIPELINE
+        super().__init__(
+            name=name,
+            version=version,
+            description=description,
+            tags=tags,
+            display_name=display_name,
+            inputs=inputs,
+            outputs=outputs,
+            is_deterministic=is_deterministic,  # type: ignore[arg-type]
+            **kwargs,
+        )
+        self._jobs = self._process_jobs(jobs) if jobs else {}
+        # for telemetry
+        self._job_types, self._job_sources = self._get_job_type_and_source()
+        # Private support: create pipeline component from pipeline job
+        self._source_job_id = kwargs.pop("source_job_id", None)
+        # TODO: set anonymous hash for reuse
+
+    def _process_jobs(self, jobs: Dict[str, BaseNode]) -> Dict[str, BaseNode]:
+        """Process and validate jobs.
+
+        :param jobs: A map of node name to node
+        :type jobs: Dict[str, BaseNode]
+        :return: The processed jobs
+        :rtype: Dict[str, BaseNode]
+        """
+        # Remove swept Command
+        node_names_to_skip = []
+        for node_name, job_instance in jobs.items():
+            if isinstance(job_instance, Command) and job_instance._swept is True:
+                node_names_to_skip.append(node_name)
+
+        for key in node_names_to_skip:
+            del jobs[key]
+
+        # Set path and validate node type.
+        for _, job_instance in jobs.items():
+            if isinstance(job_instance, BaseNode):
+                job_instance._set_base_path(self.base_path)
+
+            if not isinstance(job_instance, (BaseNode, AutoMLJob, ControlFlowNode)):
+                msg = f"Not supported pipeline job type: {type(job_instance)}"
+                raise ValidationException(
+                    message=msg,
+                    no_personal_data_message=msg,
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+        return jobs
+
+    def _customized_validate(self) -> MutableValidationResult:
+        """Validate pipeline component structure.
+
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        validation_result = super(PipelineComponent, self)._customized_validate()
+
+        # Validate inputs
+        for input_name, input_value in self.inputs.items():
+            if input_value.type is None:
+                validation_result.append_error(
+                    yaml_path="inputs.{}".format(input_name),
+                    message="Parameter type unknown, please add type annotation or specify input default value.",
+                    error_code=ValidationErrorCode.PARAMETER_TYPE_UNKNOWN,
+                )
+
+        # Validate all nodes
+        for node_name, node in self.jobs.items():
+            if isinstance(node, BaseNode):
+                # Node inputs will be validated.
+                validation_result.merge_with(node._validate(), "jobs.{}".format(node_name))
+                if isinstance(node.component, Component):
+                    # Validate binding if not remote resource.
+                    validation_result.merge_with(self._validate_binding_inputs(node))
+            elif isinstance(node, AutoMLJob):
+                pass
+            elif isinstance(node, ControlFlowNode):
+                # Validate control flow node.
+                validation_result.merge_with(node._validate(), "jobs.{}".format(node_name))
+            else:
+                validation_result.append_error(
+                    yaml_path="jobs.{}".format(node_name),
+                    message=f"Not supported pipeline job type: {type(node)}",
+                )
+
+        return validation_result
+
+    def _validate_compute_is_set(self, *, parent_node_name: Optional[str] = None) -> MutableValidationResult:
+        """Validate compute in pipeline component.
+
+        This function will only be called from pipeline_job._validate_compute_is_set
+        when both of the pipeline_job.compute and pipeline_job.settings.default_compute is None.
+        Rules:
+        - For pipeline node: will call node._component._validate_compute_is_set to validate node compute in sub graph.
+        - For general node:
+            - If _skip_required_compute_missing_validation is True, validation will be skipped.
+            - All the rest of cases without compute will add compute not set error to validation result.
+
+        :keyword parent_node_name: The name of the parent node.
+        :type parent_node_name: Optional[str]
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+
+        # Note: do not put this into customized validate, as we would like call
+        # this from pipeline_job._validate_compute_is_set
+        validation_result = self._create_empty_validation_result()
+        no_compute_nodes = []
+        parent_node_name = parent_node_name if parent_node_name else ""
+        for node_name, node in self.jobs.items():
+            full_node_name = f"{parent_node_name}{node_name}.jobs."
+            if node.type == NodeType.PIPELINE and isinstance(node._component, PipelineComponent):
+                validation_result.merge_with(node._component._validate_compute_is_set(parent_node_name=full_node_name))
+                continue
+            if isinstance(node, BaseNode) and node._skip_required_compute_missing_validation:
+                continue
+            if has_attr_safe(node, "compute") and node.compute is None:
+                no_compute_nodes.append(node_name)
+
+        for node_name in no_compute_nodes:
+            validation_result.append_error(
+                yaml_path=f"jobs.{parent_node_name}{node_name}.compute",
+                message="Compute not set",
+            )
+        return validation_result
+
+    def _get_input_binding_dict(self, node: BaseNode) -> Tuple[dict, dict]:
+        """Return the input binding dict for each node.
+
+        :param node: The node
+        :type node: BaseNode
+        :return: A 2-tuple of (binding_dict, optional_binding_in_expression_dict)
+        :rtype: Tuple[dict, dict]
+        """
+        # pylint: disable=too-many-nested-blocks
+        binding_inputs = node._build_inputs()
+        # Collect binding relation dict {'pipeline_input': ['node_input']}
+        binding_dict: dict = {}
+        optional_binding_in_expression_dict: dict = {}
+        for component_input_name, component_binding_input in binding_inputs.items():
+            if isinstance(component_binding_input, PipelineExpression):
+                for pipeline_input_name in component_binding_input._inputs.keys():
+                    if pipeline_input_name not in self.inputs:
+                        continue
+                    if pipeline_input_name not in binding_dict:
+                        binding_dict[pipeline_input_name] = []
+                    binding_dict[pipeline_input_name].append(component_input_name)
+                    if pipeline_input_name not in optional_binding_in_expression_dict:
+                        optional_binding_in_expression_dict[pipeline_input_name] = []
+                    optional_binding_in_expression_dict[pipeline_input_name].append(pipeline_input_name)
+            else:
+                if isinstance(component_binding_input, Input):
+                    component_binding_input = component_binding_input.path
+                if is_data_binding_expression(component_binding_input, ["parent"]):
+                    # data binding may have more than one PipelineInput now
+                    for pipeline_input_name in PipelineExpression.parse_pipeline_inputs_from_data_binding(
+                        component_binding_input
+                    ):
+                        if pipeline_input_name not in self.inputs:
+                            continue
+                        if pipeline_input_name not in binding_dict:
+                            binding_dict[pipeline_input_name] = []
+                        binding_dict[pipeline_input_name].append(component_input_name)
+                        # for data binding expression "${{parent.inputs.pipeline_input}}", it should not be optional
+                        if len(component_binding_input.replace("${{parent.inputs." + pipeline_input_name + "}}", "")):
+                            if pipeline_input_name not in optional_binding_in_expression_dict:
+                                optional_binding_in_expression_dict[pipeline_input_name] = []
+                            optional_binding_in_expression_dict[pipeline_input_name].append(pipeline_input_name)
+        return binding_dict, optional_binding_in_expression_dict
+
+    def _validate_binding_inputs(self, node: BaseNode) -> MutableValidationResult:
+        """Validate pipeline binding inputs and return all used pipeline input names.
+
+        Mark input as optional if all binding is optional and optional not set. Raise error if pipeline input is
+        optional but link to required inputs.
+
+        :param node: The node to validate
+        :type node: BaseNode
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        component_definition_inputs = {}
+        # Add flattened group input into definition inputs.
+        # e.g. Add {'group_name.item': PipelineInput} for {'group_name': GroupInput}
+        for name, val in node.component.inputs.items():
+            if isinstance(val, GroupInput):
+                component_definition_inputs.update(val.flatten(group_parameter_name=name))
+            component_definition_inputs[name] = val
+        # Collect binding relation dict {'pipeline_input': ['node_input']}
+        validation_result = self._create_empty_validation_result()
+        binding_dict, optional_binding_in_expression_dict = self._get_input_binding_dict(node)
+
+        # Validate links required and optional
+        for pipeline_input_name, binding_inputs in binding_dict.items():
+            pipeline_input = self.inputs[pipeline_input_name]
+            required_bindings = []
+            for name in binding_inputs:
+                # not check optional/required for pipeline input used in pipeline expression
+                if name in optional_binding_in_expression_dict.get(pipeline_input_name, []):
+                    continue
+                if name in component_definition_inputs and component_definition_inputs[name].optional is not True:
+                    required_bindings.append(f"{node.name}.inputs.{name}")
+            if pipeline_input.optional is None and not required_bindings:
+                # Set input as optional if all binding is optional and optional not set.
+                pipeline_input.optional = True
+                pipeline_input._is_inferred_optional = True
+            elif pipeline_input.optional is True and required_bindings:
+                if pipeline_input._is_inferred_optional:
+                    # Change optional=True to None if is inferred by us
+                    pipeline_input.optional = None
+                else:
+                    # Raise exception if pipeline input is optional set by user but link to required inputs.
+                    validation_result.append_error(
+                        yaml_path="inputs.{}".format(pipeline_input._port_name),
+                        message=f"Pipeline optional Input binding to required inputs: {required_bindings}",
+                    )
+        return validation_result
+
+    def _get_job_type_and_source(self) -> Tuple[Dict[str, int], Dict[str, int]]:
+        """Get job types and sources for telemetry.
+
+        :return: A 2-tuple of
+          * A map of job type to the number of occurrences
+          * A map of job source to the number of occurrences
+        :rtype: Tuple[Dict[str, int], Dict[str, int]]
+        """
+        job_types: list = []
+        job_sources = []
+        for job in self.jobs.values():
+            job_types.append(job.type)
+            if isinstance(job, BaseNode):
+                job_sources.append(job._source)
+            elif isinstance(job, AutoMLJob):
+                # Consider all automl_job has builder type for now,
+                # as it's not easy to distinguish their source(yaml/builder).
+                job_sources.append(ComponentSource.BUILDER)
+            else:
+                # Fall back to CLASS
+                job_sources.append(ComponentSource.CLASS)
+        return dict(Counter(job_types)), dict(Counter(job_sources))
+
+    @property
+    def jobs(self) -> Dict[str, BaseNode]:
+        """Return a dictionary from component variable name to component object.
+
+        :return: Dictionary mapping component variable names to component objects.
+        :rtype: Dict[str, ~azure.ai.ml.entities._builders.BaseNode]
+        """
+        return self._jobs
+
+    def _get_anonymous_hash(self) -> str:
+        """Get anonymous hash for pipeline component.
+
+        :return: The anonymous hash of the pipeline component
+        :rtype: str
+        """
+        # ideally we should always use rest object to generate hash as it's the same as
+        # what we send to server-side, but changing the hash function will break reuse of
+        # existing components except for command component (hash result is the same for
+        # command component), so we just use rest object to generate hash for pipeline component,
+        # which doesn't have reuse issue.
+        component_interface_dict = self._to_rest_object().properties.component_spec
+        # Hash local inputs in pipeline component jobs
+        for job_name, job in self.jobs.items():
+            if getattr(job, "inputs", None):
+                for input_name, input_value in job.inputs.items():
+                    try:
+                        if (
+                            getattr(input_value, "_data", None)
+                            and isinstance(input_value._data, Input)
+                            and input_value.path
+                            and os.path.exists(input_value.path)
+                        ):
+                            start_time = time.time()
+                            component_interface_dict["jobs"][job_name]["inputs"][input_name]["content_hash"] = (
+                                get_object_hash(input_value.path)
+                            )
+                            module_logger.debug(
+                                "Takes %s seconds to calculate the content hash of local input %s",
+                                time.time() - start_time,
+                                input_value.path,
+                            )
+                    except ValidationException:
+                        pass
+        hash_value: str = hash_dict(
+            component_interface_dict,
+            keys_to_omit=[
+                # omit name since anonymous component will have same name
+                "name",
+                # omit _source since it doesn't impact component's uniqueness
+                "_source",
+                # omit id since it will be set after component is registered
+                "id",
+                # omit version since it will be set to this hash later
+                "version",
+            ],
+        )
+        return hash_value
+
+    @classmethod
+    def _load_from_rest_pipeline_job(cls, data: Dict) -> "PipelineComponent":
+        # TODO: refine this?
+        # Set type as None here to avoid schema validation failed
+        definition_inputs = {p: {"type": None} for p in data.get("inputs", {}).keys()}
+        definition_outputs = {p: {"type": None} for p in data.get("outputs", {}).keys()}
+        return PipelineComponent(
+            display_name=data.get("display_name"),
+            description=data.get("description"),
+            inputs=definition_inputs,
+            outputs=definition_outputs,
+            jobs=data.get("jobs"),
+            _source=ComponentSource.REMOTE_WORKSPACE_JOB,
+        )
+
+    @classmethod
+    def _resolve_sub_nodes(cls, rest_jobs: Dict) -> Dict:
+        from azure.ai.ml.entities._job.pipeline._load_component import pipeline_node_factory
+
+        sub_nodes = {}
+        if rest_jobs is None:
+            return sub_nodes
+        for node_name, node in rest_jobs.items():
+            # TODO: Remove this ad-hoc fix after unified arm id format in object
+            component_id = node.get("componentId", "")
+            if isinstance(component_id, str) and re.match(ASSET_ARM_ID_REGEX_FORMAT, component_id):
+                node["componentId"] = component_id[len(ARM_ID_PREFIX) :]
+            if not LoopNode._is_loop_node_dict(node):
+                # skip resolve LoopNode first since it may reference other nodes
+                # use node factory instead of BaseNode._from_rest_object here as AutoMLJob is not a BaseNode
+                sub_nodes[node_name] = pipeline_node_factory.load_from_rest_object(obj=node)
+        for node_name, node in rest_jobs.items():
+            if LoopNode._is_loop_node_dict(node):
+                # resolve LoopNode after all other nodes are resolved
+                sub_nodes[node_name] = pipeline_node_factory.load_from_rest_object(obj=node, pipeline_jobs=sub_nodes)
+        return sub_nodes
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return PipelineComponentSchema(context=context)
+
+    @classmethod
+    def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]:
+        # jobs validations are done in _customized_validate()
+        return ["jobs"]
+
+    @classmethod
+    def _check_ignored_keys(cls, obj: object) -> List[str]:
+        """Return ignored keys in obj as a pipeline component when its value be set.
+
+        :param obj: The object to examine
+        :type obj: object
+        :return: List of keys to ignore
+        :rtype: List[str]
+        """
+        examine_mapping = {
+            "compute": lambda val: val is not None,
+            "settings": lambda val: val is not None and any(v is not None for v in val._to_dict().values()),
+        }
+        # Avoid new attr added by use `try_get_non...` instead of `hasattr` or `getattr` directly.
+        return [k for k, has_set in examine_mapping.items() if has_set(try_get_non_arbitrary_attr(obj, k))]
+
+    def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict:
+        telemetry_values: dict = super()._get_telemetry_values()
+        telemetry_values.update(
+            {
+                "source": self._source,
+                "node_count": len(self.jobs),
+                "node_type": json.dumps(self._job_types),
+                "node_source": json.dumps(self._job_sources),
+            }
+        )
+        return telemetry_values
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+        # Pop jobs to avoid it goes with schema load
+        jobs = obj.properties.component_spec.pop("jobs", None)
+        init_params_dict: dict = super()._from_rest_object_to_init_params(obj)
+        if jobs:
+            try:
+                init_params_dict["jobs"] = PipelineComponent._resolve_sub_nodes(jobs)
+            except Exception as e:  # pylint: disable=W0718
+                # Skip parse jobs if error exists.
+                # TODO: https://msdata.visualstudio.com/Vienna/_workitems/edit/2052262
+                module_logger.debug("Parse pipeline component jobs failed with: %s", e)
+        return init_params_dict
+
+    def _to_dict(self) -> Dict:
+        return {**self._other_parameter, **super()._to_dict()}
+
+    def _build_rest_component_jobs(self) -> Dict[str, dict]:
+        """Build pipeline component jobs to rest.
+
+        :return: A map of job name to rest objects
+        :rtype: Dict[str, dict]
+        """
+        # Build the jobs to dict
+        rest_component_jobs = {}
+        for job_name, job in self.jobs.items():
+            if isinstance(job, (BaseNode, ControlFlowNode)):
+                rest_node_dict = job._to_rest_object()
+            elif isinstance(job, AutoMLJob):
+                rest_node_dict = json.loads(json.dumps(job._to_dict(inside_pipeline=True)))
+            else:
+                msg = f"Non supported job type in Pipeline jobs: {type(job)}"
+                raise ValidationException(
+                    message=msg,
+                    no_personal_data_message=msg,
+                    target=ErrorTarget.PIPELINE,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+            rest_component_jobs[job_name] = rest_node_dict
+        return rest_component_jobs
+
+    def _to_rest_object(self) -> ComponentVersion:
+        """Check ignored keys and return rest object.
+
+        :return: The component version
+        :rtype: ComponentVersion
+        """
+        ignored_keys = self._check_ignored_keys(self)
+        if ignored_keys:
+            module_logger.warning("%s ignored on pipeline component %r.", ignored_keys, self.name)
+        component = self._to_dict()
+        # add source type to component rest object
+        component["_source"] = self._source
+        component["jobs"] = self._build_rest_component_jobs()
+        component["sourceJobId"] = self._source_job_id
+        if self._intellectual_property:
+            # hack while full pass through supported is worked on for IPP fields
+            component.pop("intellectual_property")
+            component["intellectualProperty"] = self._intellectual_property._to_rest_object().serialize()
+        properties = ComponentVersionProperties(
+            component_spec=component,
+            description=self.description,
+            is_anonymous=self._is_anonymous,
+            properties=self.properties,
+            tags=self.tags,
+        )
+        result = ComponentVersion(properties=properties)
+        result.name = self.name
+        return result
+
+    def __str__(self) -> str:
+        try:
+            toYaml: str = self._to_yaml()
+            return toYaml
+        except BaseException:  # pylint: disable=W0718
+            toStr: str = super(PipelineComponent, self).__str__()
+            return toStr
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py
new file mode 100644
index 00000000..7da65fb6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py
@@ -0,0 +1,211 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os
+from typing import Any, Dict, List, Optional, Union
+
+from marshmallow import Schema
+
+from azure.ai.ml._schema.component.spark_component import SparkComponentSchema
+from azure.ai.ml.constants._common import COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.constants._job.job import RestSparkConfKey
+from azure.ai.ml.entities._assets import Environment
+from azure.ai.ml.entities._job.parameterized_spark import ParameterizedSpark
+
+from ..._schema import PathAwareSchema
+from .._job.spark_job_entry_mixin import SparkJobEntry, SparkJobEntryMixin
+from .._util import convert_ordered_dict_to_dict, validate_attribute_type
+from .._validation import MutableValidationResult
+from ._additional_includes import AdditionalIncludesMixin
+from .component import Component
+
+
+class SparkComponent(
+    Component, ParameterizedSpark, SparkJobEntryMixin, AdditionalIncludesMixin
+):  # pylint: disable=too-many-instance-attributes
+    """Spark component version, used to define a Spark Component or Job.
+
+    :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing
+        to a remote location. Defaults to ".", indicating the current directory.
+    :type code: Union[str, os.PathLike]
+    :keyword entry: The file or class entry point.
+    :paramtype entry: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkJobEntry]]
+    :keyword py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps. Defaults to None.
+    :paramtype py_files: Optional[List[str]]
+    :keyword jars: The list of .JAR files to include on the driver and executor classpaths. Defaults to None.
+    :paramtype jars: Optional[List[str]]
+    :keyword files: The list of files to be placed in the working directory of each executor. Defaults to None.
+    :paramtype files: Optional[List[str]]
+    :keyword archives: The list of archives to be extracted into the working directory of each executor.
+        Defaults to None.
+    :paramtype archives: Optional[List[str]]
+    :keyword driver_cores: The number of cores to use for the driver process, only in cluster mode.
+    :paramtype driver_cores: Optional[int]
+    :keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
+        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+    :paramtype driver_memory: Optional[str]
+    :keyword executor_cores: The number of cores to use on each executor.
+    :paramtype executor_cores: Optional[int]
+    :keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
+        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+    :paramtype executor_memory: Optional[str]
+    :keyword executor_instances: The initial number of executors.
+    :paramtype executor_instances: Optional[int]
+    :keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
+        executors registered with this application up and down based on the workload. Defaults to False.
+    :paramtype dynamic_allocation_enabled: Optional[bool]
+    :keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is
+        enabled.
+    :paramtype dynamic_allocation_min_executors: Optional[int]
+    :keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is
+        enabled.
+    :paramtype dynamic_allocation_max_executors: Optional[int]
+    :keyword conf: A dictionary with pre-defined Spark configurations key and values. Defaults to None.
+    :paramtype conf: Optional[dict[str, str]]
+    :keyword environment: The Azure ML environment to run the job in.
+    :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+    :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None.
+    :paramtype inputs: Optional[dict[str, Union[
+        ~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
+        ~azure.ai.ml.Input,
+        str,
+        bool,
+        int,
+        float,
+        Enum,
+        ]]]
+    :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None.
+    :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
+    :keyword args: The arguments for the job. Defaults to None.
+    :paramtype args: Optional[str]
+    :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None.
+    :paramtype additional_includes: Optional[List[str]]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_spark_configurations.py
+            :start-after: [START spark_component_definition]
+            :end-before: [END spark_component_definition]
+            :language: python
+            :dedent: 8
+            :caption: Creating SparkComponent.
+    """
+
+    def __init__(
+        self,
+        *,
+        code: Optional[Union[str, os.PathLike]] = ".",
+        entry: Optional[Union[Dict[str, str], SparkJobEntry]] = None,
+        py_files: Optional[List[str]] = None,
+        jars: Optional[List[str]] = None,
+        files: Optional[List[str]] = None,
+        archives: Optional[List[str]] = None,
+        driver_cores: Optional[Union[int, str]] = None,
+        driver_memory: Optional[str] = None,
+        executor_cores: Optional[Union[int, str]] = None,
+        executor_memory: Optional[str] = None,
+        executor_instances: Optional[Union[int, str]] = None,
+        dynamic_allocation_enabled: Optional[Union[bool, str]] = None,
+        dynamic_allocation_min_executors: Optional[Union[int, str]] = None,
+        dynamic_allocation_max_executors: Optional[Union[int, str]] = None,
+        conf: Optional[Dict[str, str]] = None,
+        environment: Optional[Union[str, Environment]] = None,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        args: Optional[str] = None,
+        additional_includes: Optional[List] = None,
+        **kwargs: Any,
+    ) -> None:
+        # validate init params are valid type
+        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+        kwargs[COMPONENT_TYPE] = NodeType.SPARK
+
+        super().__init__(
+            inputs=inputs,
+            outputs=outputs,
+            **kwargs,
+        )
+
+        self.code: Optional[Union[str, os.PathLike]] = code
+        self.entry = entry
+        self.py_files = py_files
+        self.jars = jars
+        self.files = files
+        self.archives = archives
+        self.conf = conf
+        self.environment = environment
+        self.args = args
+        self.additional_includes = additional_includes or []
+        # For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf.
+        # If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent
+        # verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node
+        # in pipeline sdk
+        conf = conf or {}
+        self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None)
+        self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None)
+        self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None)
+        self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None)
+        self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None)
+        self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get(
+            RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None
+        )
+        self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get(
+            RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None
+        )
+        self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get(
+            RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None
+        )
+
+    @classmethod
+    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+        return SparkComponentSchema(context=context)
+
+    @classmethod
+    def _attr_type_map(cls) -> dict:
+        return {
+            "environment": (str, Environment),
+            "code": (str, os.PathLike),
+        }
+
+    def _customized_validate(self) -> MutableValidationResult:
+        validation_result = super()._customized_validate()
+        self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
+        return validation_result
+
+    def _to_dict(self) -> Dict:
+        # TODO: Bug Item number: 2897665
+        res: Dict = convert_ordered_dict_to_dict(  # type: ignore
+            {**self._other_parameter, **super(SparkComponent, self)._to_dict()}
+        )
+        return res
+
+    def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+        """Dump the component content into a sorted yaml string.
+
+        :return: The ordered dict
+        :rtype: Dict
+        """
+
+        obj: dict = super()._to_ordered_dict_for_yaml_dump()
+        # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+        if self.code and isinstance(self.code, str):
+            obj["code"] = self.code
+        return obj
+
+    def _get_environment_id(self) -> Union[str, None]:
+        # Return environment id of environment
+        # handle case when environment is defined inline
+        if isinstance(self.environment, Environment):
+            res: Optional[str] = self.environment.id
+            return res
+        return self.environment
+
+    def __str__(self) -> str:
+        try:
+            toYaml: str = self._to_yaml()
+            return toYaml
+        except BaseException:  # pylint: disable=W0718
+            toStr: str = super(SparkComponent, self).__str__()
+            return toStr