aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py5
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py541
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py42
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py297
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py300
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py641
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py171
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py325
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py553
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py96
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py305
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py529
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py211
13 files changed, 4016 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py
new file mode 100644
index 00000000..fdf8caba
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py
@@ -0,0 +1,5 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py
new file mode 100644
index 00000000..85f609ca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py
@@ -0,0 +1,541 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import json
+import os
+import shutil
+import tempfile
+import zipfile
+from collections import defaultdict
+from concurrent.futures import ThreadPoolExecutor
+from contextlib import contextmanager
+from multiprocessing import cpu_count
+from pathlib import Path
+from typing import Any, Dict, Generator, List, Optional, Tuple, Union
+
+from azure.ai.ml.constants._common import AzureDevopsArtifactsType
+from azure.ai.ml.entities._validation import MutableValidationResult, ValidationResultBuilder
+
+from ..._utils._artifact_utils import ArtifactCache
+from ..._utils._asset_utils import IgnoreFile, get_upload_files_from_folder
+from ..._utils.utils import is_concurrent_component_registration_enabled, is_private_preview_enabled
+from ...entities._util import _general_copy
+from .._assets import Code
+from .code import ComponentCodeMixin, ComponentIgnoreFile
+
+PLACEHOLDER_FILE_NAME = "_placeholder_spec.yaml"
+
+
+class AdditionalIncludes:
+ """Initialize the AdditionalIncludes object.
+
+ :param origin_code_value: The origin code value.
+ :type origin_code_value: Optional[str]
+ :param base_path: The base path for origin code path and additional include configs.
+ :type base_path: Path
+ :param configs: The additional include configs.
+ :type configs: List[Union[str, dict]]
+ """
+
+ def __init__(
+ self,
+ *,
+ origin_code_value: Optional[str],
+ base_path: Path,
+ configs: Optional[List[Union[str, dict]]] = None,
+ ) -> None:
+ self._base_path = base_path
+ self._origin_code_value = origin_code_value
+ self._origin_configs = configs
+
+ @property
+ def origin_configs(self) -> List:
+ """The origin additional include configs.
+ Artifact additional include configs haven't been resolved in this property.
+
+ :return: The origin additional include configs.
+ :rtype: List[Union[str, dict]]
+ """
+ return self._origin_configs or []
+
+ @property
+ def resolved_code_path(self) -> Union[None, Path]:
+ """The resolved origin code path based on base path, if code path is not specified, return None.
+ We shouldn't change this property name given it's referenced in mldesigner.
+
+ :return: The resolved origin code path.
+ :rtype: Union[None, Path]
+ """
+ if self._origin_code_value is None:
+ return None
+ if os.path.isabs(self._origin_code_value):
+ return Path(self._origin_code_value)
+ return (self.base_path / self._origin_code_value).resolve()
+
+ @property
+ def base_path(self) -> Path:
+ """Base path for origin code path and additional include configs.
+
+ :return: The base path.
+ :rtype: Path
+ """
+ return self._base_path
+
+ @property
+ def with_includes(self) -> bool:
+ """Whether the additional include configs have been provided.
+
+ :return: True if additional include configs have been provided, False otherwise.
+ :rtype: bool
+ """
+ return len(self.origin_configs) != 0
+
+ @classmethod
+ def _get_artifacts_by_config(cls, artifact_config: Dict[str, str]) -> Union[str, os.PathLike]:
+ # config key existence has been validated in _validate_additional_include_config
+ res: Union[str, os.PathLike] = ArtifactCache().get(
+ organization=artifact_config.get("organization", None),
+ project=artifact_config.get("project", None),
+ feed=artifact_config["feed"],
+ name=artifact_config["name"],
+ version=artifact_config["version"],
+ scope=artifact_config.get("scope", "organization"),
+ resolve=True,
+ )
+ return res
+
+ def _validate_additional_include_config(
+ self, additional_include_config: Union[Dict, str]
+ ) -> MutableValidationResult:
+ validation_result = ValidationResultBuilder.success()
+ if (
+ isinstance(additional_include_config, dict)
+ and additional_include_config.get("type") == AzureDevopsArtifactsType.ARTIFACT
+ ):
+ # for artifact additional include, we validate the required fields in config but won't validate the
+ # artifact content to avoid downloading it in validation stage
+ # note that runtime error will be thrown when loading the artifact
+ for item in ["feed", "name", "version"]:
+ if item not in additional_include_config:
+ # TODO: add yaml path after we support list index in yaml path
+ validation_result.append_error(
+ "{} are required for artifacts config but got {}.".format(
+ item, json.dumps(additional_include_config)
+ )
+ )
+ elif isinstance(additional_include_config, str):
+ validation_result.merge_with(self._validate_local_additional_include_config(additional_include_config))
+ else:
+ validation_result.append_error(
+ message=f"Unexpected format in additional_includes, {additional_include_config}"
+ )
+ return validation_result
+
+ @classmethod
+ def _resolve_artifact_additional_include_config(
+ cls, artifact_additional_include_config: Dict[str, str]
+ ) -> List[Tuple[str, str]]:
+ """Resolve an artifact additional include config into a list of (local_path, config_info) tuples.
+
+ Configured artifact will be downloaded to local path first; the config_info will be in below format:
+ %name%:%version% in %feed%
+
+ :param artifact_additional_include_config: Additional include config for an artifact
+ :type artifact_additional_include_config: Dict[str, str]
+ :return: A list of 2-tuples of local_path and config_info
+ :rtype: List[Tuple[str, str]]
+ """
+ result = []
+ # Note that we don't validate the artifact config here, since it has already been validated in
+ # _validate_additional_include_config
+ artifact_path = cls._get_artifacts_by_config(artifact_additional_include_config)
+ for item in os.listdir(artifact_path):
+ config_info = (
+ f"{artifact_additional_include_config['name']}:{artifact_additional_include_config['version']} in "
+ f"{artifact_additional_include_config['feed']}"
+ )
+ result.append((os.path.join(artifact_path, item), config_info))
+ return result
+
+ def _resolve_artifact_additional_include_configs(
+ self, artifact_additional_includes_configs: List[Dict[str, str]]
+ ) -> List:
+ additional_include_info_tuples = []
+ # Unlike component registration, artifact downloading is a pure download progress; so we can use
+ # more threads to speed up the downloading process.
+ # We use 5 threads per CPU core plus 5 extra threads, and the max number of threads is 64.
+ num_threads = min(64, (int(cpu_count()) * 5) + 5)
+ if (
+ len(artifact_additional_includes_configs) > 1
+ and is_concurrent_component_registration_enabled()
+ and is_private_preview_enabled()
+ ):
+ with ThreadPoolExecutor(max_workers=num_threads) as executor:
+ all_artifact_pairs_itr = executor.map(
+ self._resolve_artifact_additional_include_config, artifact_additional_includes_configs
+ )
+
+ for artifact_pairs in all_artifact_pairs_itr:
+ additional_include_info_tuples.extend(artifact_pairs)
+ else:
+ all_artifact_pairs_list = list(
+ map(self._resolve_artifact_additional_include_config, artifact_additional_includes_configs)
+ )
+
+ for artifact_pairs in all_artifact_pairs_list:
+ additional_include_info_tuples.extend(artifact_pairs)
+
+ return additional_include_info_tuples
+
+ @staticmethod
+ def _copy(src: Path, dst: Path, *, ignore_file: Optional[Any] = None) -> None:
+ if ignore_file and ignore_file.is_file_excluded(src):
+ return
+ if not src.exists():
+ raise ValueError(f"Path {src} does not exist.")
+ if src.is_file():
+ _general_copy(src, dst)
+ if src.is_dir():
+ # TODO: should we cover empty folder?
+ # use os.walk to replace shutil.copytree, which may raise FileExistsError
+ # for same folder, the expected behavior is merging
+ # ignore will be also applied during this process
+ for name in src.glob("*"):
+ if ignore_file is not None:
+ AdditionalIncludes._copy(name, dst / name.name, ignore_file=ignore_file.merge(name))
+
+ @staticmethod
+ def _is_folder_to_compress(path: Path) -> bool:
+ """Check if the additional include needs to compress corresponding folder as a zip.
+
+ For example, given additional include /mnt/c/hello.zip
+ 1) if a file named /mnt/c/hello.zip already exists, return False (simply copy)
+ 2) if a folder named /mnt/c/hello exists, return True (compress as a zip and copy)
+
+ :param path: Given path in additional include.
+ :type path: Path
+ :return: If the path need to be compressed as a zip file.
+ :rtype: bool
+ """
+ if path.suffix != ".zip":
+ return False
+ # if zip file exists, simply copy as other additional includes
+ if path.exists():
+ return False
+ # remove .zip suffix and check whether the folder exists
+ stem_path = path.parent / path.stem
+ return stem_path.is_dir()
+
+ def _resolve_folder_to_compress(self, include: str, dst_path: Path, ignore_file: IgnoreFile) -> None:
+ """resolve the zip additional include, need to compress corresponding folder.
+
+ :param include: The path, relative to :attr:`AdditionalIncludes.base_path`, to zip
+ :type include: str
+ :param dst_path: The path to write the zipfile to
+ :type dst_path: Path
+ :param ignore_file: The ignore file to use to filter files
+ :type ignore_file: IgnoreFile
+ """
+ zip_additional_include = (self.base_path / include).resolve()
+ folder_to_zip = zip_additional_include.parent / zip_additional_include.stem
+ zip_file = dst_path / zip_additional_include.name
+ with zipfile.ZipFile(zip_file, "w") as zf:
+ zf.write(folder_to_zip, os.path.relpath(folder_to_zip, folder_to_zip.parent)) # write root in zip
+ paths = [path for path, _ in get_upload_files_from_folder(folder_to_zip, ignore_file=ignore_file)]
+ # sort the paths to make sure the zip file (namelist) is deterministic
+ for path in sorted(paths):
+ zf.write(path, os.path.relpath(path, folder_to_zip.parent))
+
+ def _get_resolved_additional_include_configs(self) -> List[str]:
+ """
+ Resolve additional include configs to a list of local_paths and return it.
+
+ Addition includes is a list of include files, including local paths and Azure Devops Artifacts.
+ Yaml format of additional_includes looks like below:
+ additional_includes:
+ - your/local/path
+ - type: artifact
+ organization: devops_organization
+ project: devops_project
+ feed: artifacts_feed_name
+ name: universal_package_name
+ version: package_version
+ scope: scope_type
+ The artifacts package will be downloaded from devops to the local in this function and transferred to
+ the local paths of downloaded artifacts;
+ The local paths will be returned directly.
+ If there are conflicts among artifacts, runtime error will be raised. Note that we won't check the
+ conflicts between artifacts and local paths and conflicts among local paths. Reasons are:
+ 1. There can be ignore_file in local paths, which makes it hard to check the conflict and may lead to breaking
+ changes;
+ 2. Conflicts among artifacts are more likely to happen, since user may refer to 2 artifacts of the same name
+ but with different version & feed.
+ 3. According to current design, folders in local paths will be merged; while artifact conflicts can be
+ identified by folder name conflicts and are not allowed.
+
+ :return additional_includes: Path list of additional_includes
+ :rtype additional_includes: List[str]
+ """
+ additional_include_configs_in_local_path = []
+
+ artifact_additional_include_configs = []
+ for additional_include_config in self.origin_configs:
+ if isinstance(additional_include_config, str):
+ # add local additional include configs directly
+ additional_include_configs_in_local_path.append(additional_include_config)
+ else:
+ # artifact additional include config will be downloaded and resolved to a local path later
+ # note that there is no more validation for artifact additional include config here, since it has
+ # already been validated in _validate_additional_include_config
+ artifact_additional_include_configs.append(additional_include_config)
+
+ artifact_additional_include_info_tuples = self._resolve_artifact_additional_include_configs(
+ artifact_additional_include_configs
+ )
+ additional_include_configs_in_local_path.extend(
+ local_path for local_path, _ in artifact_additional_include_info_tuples
+ )
+
+ # check file conflicts among artifact package
+ # given this is not in validate stage, we will raise error if there are conflict files
+ conflict_files: dict = defaultdict(set)
+ for local_path, config_info in artifact_additional_include_info_tuples:
+ file_name = Path(local_path).name
+ conflict_files[file_name].add(config_info)
+
+ conflict_files = {k: v for k, v in conflict_files.items() if len(v) > 1}
+ if conflict_files:
+ raise RuntimeError(f"There are conflict files in additional include: {conflict_files}")
+
+ return additional_include_configs_in_local_path
+
+ def _validate_local_additional_include_config(
+ self, local_path: str, config_info: Optional[str] = None
+ ) -> MutableValidationResult:
+ """Validate local additional include config.
+
+ Note that we will check the file conflicts between each local additional includes and origin code, but
+ won't check the file conflicts among local additional includes fo now.
+
+ :param local_path: The local path
+ :type local_path: str
+ :param config_info: The config info
+ :type config_info: Optional[str]
+ :return: The validation result.
+ :rtype: ~azure.ai.ml.entities._validation.MutableValidationResult
+ """
+ validation_result = ValidationResultBuilder.success()
+ include_path = self.base_path / local_path
+ # if additional include has not supported characters, resolve will fail and raise OSError
+ try:
+ src_path = include_path.resolve()
+ except OSError:
+ # no need to include potential yaml file name in error message as it will be covered by
+ # validation message construction.
+ error_msg = (
+ f"Failed to resolve additional include " f"{config_info or local_path} " f"based on {self.base_path}."
+ )
+ validation_result.append_error(message=error_msg)
+ return validation_result
+
+ if not src_path.exists() and not self._is_folder_to_compress(src_path):
+ error_msg = f"Unable to find additional include {config_info or local_path}"
+ validation_result.append_error(message=error_msg)
+ return validation_result
+
+ if len(src_path.parents) == 0:
+ error_msg = "Root directory is not supported for additional includes."
+ validation_result.append_error(message=error_msg)
+ return validation_result
+
+ dst_path = Path(self.resolved_code_path) / src_path.name if self.resolved_code_path else None
+ if dst_path:
+ if dst_path.is_symlink():
+ # if destination path is symbolic link, check if it points to the same file/folder as source path
+ if dst_path.resolve() != src_path.resolve():
+ error_msg = f"A symbolic link already exists for additional include {config_info or local_path}."
+ validation_result.append_error(message=error_msg)
+ return validation_result
+ elif dst_path.exists():
+ error_msg = f"A file already exists for additional include {config_info or local_path}."
+ validation_result.append_error(message=error_msg)
+ return validation_result
+
+ def validate(self) -> MutableValidationResult:
+ """Validate the AdditionalIncludes object.
+
+ :return: The validation result.
+ :rtype: ~azure.ai.ml.entities._validation.MutableValidationResult
+ """
+ validation_result = ValidationResultBuilder.success()
+ for additional_include_config in self.origin_configs:
+ validation_result.merge_with(self._validate_additional_include_config(additional_include_config))
+ return validation_result
+
+ def _copy_origin_code(self, target_path: Path) -> ComponentIgnoreFile:
+ """Copy origin code to target path.
+
+ :param target_path: The destination to copy to
+ :type target_path: Path
+ :return: The component ignore file for the origin path
+ :rtype: ComponentIgnoreFile
+ """
+ # code can be either file or folder, as additional includes exists, need to copy to temporary folder
+ if self.resolved_code_path is None:
+ # if additional include configs exist but no origin code path, return a dummy ignore file
+ return ComponentIgnoreFile(
+ self.base_path,
+ )
+
+ if Path(self.resolved_code_path).is_file():
+ # use a dummy ignore file to save base path
+ root_ignore_file = ComponentIgnoreFile(
+ Path(self.resolved_code_path).parent,
+ skip_ignore_file=True,
+ )
+ self._copy(
+ Path(self.resolved_code_path),
+ target_path / Path(self.resolved_code_path).name,
+ ignore_file=root_ignore_file,
+ )
+ else:
+ # current implementation of ignore file is based on absolute path, so it cannot be shared
+ root_ignore_file = ComponentIgnoreFile(self.resolved_code_path)
+ self._copy(self.resolved_code_path, target_path, ignore_file=root_ignore_file)
+ return root_ignore_file
+
+ @contextmanager
+ def merge_local_code_and_additional_includes(self) -> Generator:
+ """Merge code and potential additional includes into a temporary folder and return the absolute path of it.
+
+ If no additional includes are specified, just return the absolute path of the original code path.
+ If no original code path is specified, return None.
+
+ :return: The absolute path of the merged code and additional includes.
+ :rtype: Path
+ """
+ if not self.with_includes:
+ if self.resolved_code_path is None:
+ yield None
+ else:
+ yield self.resolved_code_path.absolute()
+ return
+
+ # for now, upload path of a code asset will include the folder name of the code path (name of folder or
+ # parent name of file). For example, if code path is /mnt/c/code-a, upload path will be xxx/code-a
+ # which means that the upload path will change every time as we will merge additional includes into a temp
+ # folder. To avoid this, we will copy the code path to a child folder with a fixed name under the temp folder,
+ # then the child folder will be used in upload path.
+ # This issue shouldn't impact users as there is a separate asset existence check before uploading.
+ # We still make this change as:
+ # 1. We will always need to record for twice as upload path will be changed for first time uploading
+ # 2. This will improve the stability of the code asset existence check - AssetNotChanged check in
+ # BlobStorageClient will be a backup check
+ tmp_folder_path = Path(tempfile.mkdtemp(), "code_with_additional_includes")
+ tmp_folder_path.mkdir(parents=True, exist_ok=True)
+
+ root_ignore_file = self._copy_origin_code(tmp_folder_path)
+
+ # resolve additional includes
+ base_path = self.base_path
+ # additional includes from artifact will be downloaded to a temp local path on calling
+ # self.includes, so no need to add specific logic for artifact
+
+ # TODO: skip ignored files defined in code when copying additional includes
+ # copy additional includes disregarding ignore files as current ignore file implementation
+ # is based on absolute path, which is not suitable for additional includes
+ for additional_include_local_path in self._get_resolved_additional_include_configs():
+ src_path = Path(additional_include_local_path)
+ if not src_path.is_absolute():
+ src_path = (base_path / additional_include_local_path).resolve()
+ dst_path = (tmp_folder_path / src_path.name).resolve()
+
+ root_ignore_file.rebase(src_path.parent)
+ if self._is_folder_to_compress(src_path):
+ self._resolve_folder_to_compress(
+ additional_include_local_path,
+ Path(tmp_folder_path),
+ # actual src path is without .zip suffix
+ ignore_file=root_ignore_file.merge(src_path.parent / src_path.stem),
+ )
+ # early continue as the folder is compressed as a zip file
+ continue
+
+ # no need to check if src_path exists as it is already validated
+ if src_path.is_file():
+ self._copy(src_path, dst_path, ignore_file=root_ignore_file)
+ elif src_path.is_dir():
+ self._copy(
+ src_path,
+ dst_path,
+ # root ignore file on parent + ignore file on src_path
+ ignore_file=root_ignore_file.merge(src_path),
+ )
+ else:
+ raise ValueError(f"Unable to find additional include {additional_include_local_path}.")
+ try:
+ yield tmp_folder_path.absolute()
+
+ finally:
+ # clean up tmp folder as it can be very disk space consuming
+ shutil.rmtree(tmp_folder_path, ignore_errors=True)
+
+
+class AdditionalIncludesMixin(ComponentCodeMixin):
+ @classmethod
+ def _get_additional_includes_field_name(cls) -> str:
+ """Get the field name for additional includes.
+
+ :return: The field name
+ :rtype: str
+ """
+ return "additional_includes"
+
+ def _get_all_additional_includes_configs(self) -> List:
+ return getattr(self, self._get_additional_includes_field_name(), [])
+
+ def _append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+ self, base_validation_result: Optional[MutableValidationResult] = None
+ ) -> bool:
+ is_reliable: bool = super()._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+ base_validation_result
+ )
+ additional_includes_obj = self._generate_additional_includes_obj()
+
+ if base_validation_result is not None:
+ base_validation_result.merge_with(
+ additional_includes_obj.validate(), field_name=self._get_additional_includes_field_name()
+ )
+ # if additional includes is specified, origin code will be merged with additional includes into a temp folder
+ # before registered as a code asset, so origin code value is not reliable for local path validation
+ if additional_includes_obj.with_includes:
+ return False
+ return is_reliable
+
+ def _generate_additional_includes_obj(self) -> AdditionalIncludes:
+ return AdditionalIncludes(
+ base_path=self._get_base_path_for_code(),
+ configs=self._get_all_additional_includes_configs(),
+ origin_code_value=self._get_origin_code_in_str(),
+ )
+
+ @contextmanager
+ def _try_build_local_code(self) -> Generator:
+ """Build final code when origin code is a local code.
+
+ Will merge code path with additional includes into a temp folder if additional includes is specified.
+
+ :return: The built Code object
+ :rtype: Iterable[Optional[Code]]
+ """
+ # will try to merge code and additional includes even if code is None
+ tmp_code_dir: Any
+ with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir:
+ if tmp_code_dir is None:
+ yield None
+ else:
+ yield Code(
+ base_path=self._get_base_path_for_code(),
+ path=tmp_code_dir,
+ ignore_file=ComponentIgnoreFile(tmp_code_dir),
+ )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py
new file mode 100644
index 00000000..3e7be727
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py
@@ -0,0 +1,42 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Optional
+
+from azure.ai.ml._schema import PathAwareSchema
+from azure.ai.ml._schema.component.automl_component import AutoMLComponentSchema
+from azure.ai.ml.constants._common import COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.entities._component.component import Component
+
+
+class AutoMLComponent(Component):
+ """AutoML component entity, used to define an automl component.
+
+ AutoML Component will only be used "internally" for the mentioned scenarios that need it. AutoML Component schema is
+ not intended to be used by the end users and therefore it won't be provided to the end users and it won't have
+ public documentation for the users.
+
+ :param task: Task of the component.
+ :type task: str
+ """
+
+ def __init__(
+ self,
+ *,
+ task: Optional[str] = None,
+ **kwargs: Any,
+ ) -> None:
+ kwargs[COMPONENT_TYPE] = NodeType.AUTOML
+ super(AutoMLComponent, self).__init__(**kwargs)
+ self._task = task
+
+ @property
+ def task(self) -> Optional[str]:
+ """Returns task of the component."""
+ return self._task
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
+ return AutoMLComponentSchema(context=context)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py
new file mode 100644
index 00000000..1f838bec
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py
@@ -0,0 +1,297 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os
+from contextlib import contextmanager
+from enum import Enum
+from pathlib import Path
+from typing import Any, Generator, List, Optional, Union
+
+from azure.ai.ml._utils._arm_id_utils import is_ARM_id_for_resource, is_registry_id_for_resource
+from azure.ai.ml._utils._asset_utils import IgnoreFile, get_ignore_file
+from azure.ai.ml._utils.utils import is_private_preview_enabled
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AzureMLResourceType
+from azure.ai.ml.entities._assets import Code
+from azure.ai.ml.entities._validation import MutableValidationResult
+
+
+class ComponentIgnoreFile(IgnoreFile):
+ _COMPONENT_CODE_IGNORES = ["__pycache__"]
+ """Component-specific ignore file used for ignoring files in a component directory.
+
+ :param directory_path: The directory path for the ignore file.
+ :type directory_path: Union[str, Path]
+ :param additional_includes_file_name: Name of the additional includes file in the root directory to be ignored.
+ :type additional_includes_file_name: str
+ :param skip_ignore_file: Whether to skip the ignore file, defaults to False.
+ :type skip_ignore_file: bool
+ :param extra_ignore_list: List of additional ignore files to be considered during file exclusion.
+ :type extra_ignore_list: List[~azure.ai.ml._utils._asset_utils.IgnoreFile]
+ :raises ValueError: If additional include file is not found.
+ :return: The ComponentIgnoreFile object.
+ :rtype: ComponentIgnoreFile
+ """
+
+ def __init__(
+ self,
+ directory_path: Union[str, Path],
+ *,
+ additional_includes_file_name: Optional[str] = None,
+ skip_ignore_file: bool = False,
+ extra_ignore_list: Optional[List[IgnoreFile]] = None,
+ ):
+ self._base_path: Union[str, Path] = Path(directory_path)
+ self._extra_ignore_list: List[IgnoreFile] = extra_ignore_list or []
+ # only the additional include file in root directory is ignored
+ # additional include files in subdirectories are not processed so keep them
+ self._additional_includes_file_name = additional_includes_file_name
+ # note: the parameter changes to directory path in this class, rather than file path
+ file_path = None if skip_ignore_file else get_ignore_file(directory_path).path
+ super(ComponentIgnoreFile, self).__init__(file_path=file_path)
+
+ def exists(self) -> bool:
+ """Check if the ignore file exists.
+
+ :return: True
+ :rtype: bool
+ """
+ return True
+
+ @property
+ def base_path(self) -> Union[str, Path]:
+ """Get the base path of the ignore file.
+
+ :return: The base path.
+ :rtype: Path
+ """
+ # for component ignore file, the base path can be different from file.parent
+ return self._base_path
+
+ def rebase(self, directory_path: Union[str, Path]) -> "ComponentIgnoreFile":
+ """Rebase the ignore file to a new directory.
+
+ :param directory_path: The new directory path.
+ :type directory_path: Union[str, Path]
+ :return: The rebased ComponentIgnoreFile object.
+ :rtype: ComponentIgnoreFile
+ """
+ self._base_path = directory_path
+ return self
+
+ def is_file_excluded(self, file_path: Union[str, Path]) -> bool:
+ """Check if a file should be excluded based on the ignore file rules.
+
+ :param file_path: The file path.
+ :type file_path: Union[str, Path]
+ :return: True if the file should be excluded, False otherwise.
+ :rtype: bool
+ """
+ if self._additional_includes_file_name and self._get_rel_path(file_path) == self._additional_includes_file_name:
+ return True
+ for ignore_file in self._extra_ignore_list:
+ if ignore_file.is_file_excluded(file_path):
+ return True
+ res: bool = super(ComponentIgnoreFile, self).is_file_excluded(file_path)
+ return res
+
+ def merge(self, other_path: Path) -> "ComponentIgnoreFile":
+ """Merge the ignore list from another ComponentIgnoreFile object.
+
+ :param other_path: The path of the other ignore file.
+ :type other_path: Path
+ :return: The merged ComponentIgnoreFile object.
+ :rtype: ComponentIgnoreFile
+ """
+ if other_path.is_file():
+ return self
+ return ComponentIgnoreFile(other_path, extra_ignore_list=self._extra_ignore_list + [self])
+
+ def _get_ignore_list(self) -> List[str]:
+ """Retrieves the list of ignores from ignore file
+
+ Override to add custom ignores.
+
+ :return: The ignore rules
+ :rtype: List[str]
+ """
+ if not super(ComponentIgnoreFile, self).exists():
+ return self._COMPONENT_CODE_IGNORES
+ res: list = super(ComponentIgnoreFile, self)._get_ignore_list() + self._COMPONENT_CODE_IGNORES
+ return res
+
+
+class CodeType(Enum):
+ """Code type."""
+
+ LOCAL = "local"
+ NONE = "none"
+ GIT = "git"
+ ARM_ID = "arm_id"
+ UNKNOWN = "unknown"
+
+
+def _get_code_type(origin_code_value: Optional[str]) -> CodeType:
+ if origin_code_value is None:
+ return CodeType.NONE
+ if not isinstance(origin_code_value, str):
+ # note that:
+ # 1. Code & CodeOperation are not public for now
+ # 2. AnonymousCodeSchema is not within CodeField
+ # 3. Code will be returned as an arm id as an attribute of a component when getting a component from remote
+ # So origin_code_value should never be a Code object, or an exception will be raised
+ # in validation stage.
+ return CodeType.UNKNOWN
+ if is_ARM_id_for_resource(origin_code_value, AzureMLResourceType.CODE) or is_registry_id_for_resource(
+ origin_code_value
+ ):
+ return CodeType.ARM_ID
+ if origin_code_value.startswith("git+"):
+ return CodeType.GIT
+ return CodeType.LOCAL
+
+
+class ComponentCodeMixin:
+ """Mixin class for components with local files as part of the component. Those local files will be uploaded to
+ blob storage and further referenced as a code asset in arm id. In below docstring, we will refer to those local
+ files as "code".
+
+ The major interface of this mixin is self._customized_code_validate and self._build_code.
+ self._customized_code_validate will return a validation result indicating whether the code is valid.
+ self._build_code will return a temp Code object for server-side code asset creation.
+ """
+
+ def _get_base_path_for_code(self) -> Path:
+ """Get base path for additional includes.
+
+ :return: The base path
+ :rtype: Path
+ """
+ if hasattr(self, BASE_PATH_CONTEXT_KEY):
+ return Path(getattr(self, BASE_PATH_CONTEXT_KEY))
+ raise NotImplementedError(
+ "Component must have a base_path attribute to use ComponentCodeMixin. "
+ "Please set base_path in __init__ or override _get_base_path_for_code."
+ )
+
+ @classmethod
+ def _get_code_field_name(cls) -> str:
+ """Get the field name for code.
+
+ Will be used to get origin code value by default and will be used as field name of validation diagnostics.
+
+ :return: Code field name
+ :rtype: str
+ """
+ return "code"
+
+ def _get_origin_code_value(self) -> Union[str, os.PathLike, None]:
+ """Get origin code value.
+ Origin code value is either an absolute path or a relative path to base path if it's a local path.
+ Additional includes are only supported for component types with code attribute. Origin code path will be copied
+ to a temp folder along with additional includes to form a new code content.
+ """
+ return getattr(self, self._get_code_field_name(), None)
+
+ def _fill_back_code_value(self, value: str) -> None:
+ """Fill resolved code value back to the component.
+
+ :param value: resolved code value
+ :type value: str
+ :return: no return
+ :rtype: None
+ """
+ return setattr(self, self._get_code_field_name(), value)
+
+ def _get_origin_code_in_str(self) -> Optional[str]:
+ """Get origin code value in str to simplify following logic."""
+ origin_code_value = self._get_origin_code_value()
+ if origin_code_value is None:
+ return None
+ if isinstance(origin_code_value, Path):
+ return origin_code_value.as_posix()
+ return str(origin_code_value)
+
+ def _append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+ self, base_validation_result: Optional[MutableValidationResult] = None
+ ) -> bool:
+ """Append diagnostics from customized validation logic to the base validation result and check if origin code
+ value is valid for path validation.
+
+ For customized validation logic, this method shouldn't cover the validation logic duplicated with schema
+ validation, like local code existence check.
+ For the check, as "code" includes file dependencies of a component, other fields may depend on those files.
+ However, the origin code value may not be reliable for validation of those fields. For example:
+ 1. origin code value can be a remote git path or an arm id of a code asset.
+ 2. some file operations may be done during build_code, which makes final code content different from what we can
+ get from origin code value.
+ So, we use this function to check if origin code value is reliable for further local path validation.
+
+ :param base_validation_result: base validation result to append diagnostics to.
+ :type base_validation_result: MutableValidationResult
+ :return: whether origin code value is reliable for further local path validation.
+ :rtype: bool
+ """
+ # If private features are enable and component has code value of type str we need to check
+ # that it is a valid git path case. Otherwise, we should throw a ValidationError
+ # saying that the code value is not valid
+ code_type = _get_code_type(self._get_origin_code_in_str())
+ if code_type == CodeType.GIT and not is_private_preview_enabled():
+ if base_validation_result is not None:
+ base_validation_result.append_error(
+ message="Not a valid code value: git paths are not supported.",
+ yaml_path=self._get_code_field_name(),
+ )
+ return code_type == CodeType.LOCAL
+
+ @contextmanager
+ def _build_code(self) -> Generator:
+ """Create a Code object if necessary based on origin code value and yield it.
+
+ :return: If built code is the same as its origin value, do nothing and yield None.
+ Otherwise, yield a Code object pointing to the code.
+ :rtype: Iterable[Optional[Code]]
+ """
+ origin_code_value = self._get_origin_code_in_str()
+ code_type = _get_code_type(origin_code_value)
+
+ if code_type == CodeType.GIT:
+ # git also need to be resolved into arm id
+ yield Code(path=origin_code_value)
+ elif code_type in [CodeType.LOCAL, CodeType.NONE]:
+ code: Any
+ # false-positive by pylint, hence disable it
+ # (https://github.com/pylint-dev/pylint/blob/main/doc/data/messages
+ # /c/contextmanager-generator-missing-cleanup/details.rst)
+ with self._try_build_local_code() as code: # pylint:disable=contextmanager-generator-missing-cleanup
+ yield code
+ else:
+ # arm id, None and unknown need no extra resolution
+ yield None
+
+ @contextmanager
+ def _try_build_local_code(self) -> Generator:
+ """Extract the logic of _build_code for local code for further override.
+
+ :return: The Code object if could be constructed, None otherwise
+ :rtype: Iterable[Optional[Code]]
+ """
+ origin_code_value = self._get_origin_code_in_str()
+ if origin_code_value is None:
+ yield None
+ else:
+ base_path = self._get_base_path_for_code()
+ absolute_path: Union[str, Path] = (
+ origin_code_value if os.path.isabs(origin_code_value) else base_path / origin_code_value
+ )
+
+ yield Code(
+ base_path=base_path,
+ path=origin_code_value,
+ ignore_file=ComponentIgnoreFile(absolute_path),
+ )
+
+ def _with_local_code(self) -> bool:
+ # TODO: remove this method after we have a better way to do this judge in cache_utils
+ origin_code_value = self._get_origin_code_in_str()
+ code_type = _get_code_type(origin_code_value)
+ return code_type == CodeType.LOCAL
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py
new file mode 100644
index 00000000..9bdcd3d1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py
@@ -0,0 +1,300 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os
+from typing import Any, Dict, List, Optional, Union, cast
+
+from marshmallow import Schema
+
+from azure.ai.ml._schema.component.command_component import CommandComponentSchema
+from azure.ai.ml.constants._common import COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.entities._assets import Environment
+from azure.ai.ml.entities._job.distribution import (
+ DistributionConfiguration,
+ MpiDistribution,
+ PyTorchDistribution,
+ RayDistribution,
+ TensorFlowDistribution,
+)
+from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration
+from azure.ai.ml.entities._job.parameterized_command import ParameterizedCommand
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+from ..._restclient.v2022_10_01.models import ComponentVersion
+from ..._schema import PathAwareSchema
+from ..._utils.utils import get_all_data_binding_expressions, parse_args_description_from_docstring
+from .._util import convert_ordered_dict_to_dict, validate_attribute_type
+from .._validation import MutableValidationResult
+from ._additional_includes import AdditionalIncludesMixin
+from .component import Component
+
+# pylint: disable=protected-access
+
+
+class CommandComponent(Component, ParameterizedCommand, AdditionalIncludesMixin):
+ """Command component version, used to define a Command Component or Job.
+
+ :keyword name: The name of the Command job or component.
+ :paramtype name: Optional[str]
+ :keyword version: The version of the Command job or component.
+ :paramtype version: Optional[str]
+ :keyword description: The description of the component. Defaults to None.
+ :paramtype description: Optional[str]
+ :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None.
+ :paramtype tags: Optional[dict]
+ :keyword display_name: The display name of the component.
+ :paramtype display_name: Optional[str]
+ :keyword command: The command to be executed.
+ :paramtype command: Optional[str]
+ :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing
+ to a remote location.
+ :type code: Optional[str]
+ :keyword environment: The environment that the job will run in.
+ :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+ :keyword distribution: The configuration for distributed jobs. Defaults to None.
+ :paramtype distribution: Optional[Union[~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
+ ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]]
+ :keyword resources: The compute resource configuration for the command.
+ :paramtype resources: Optional[~azure.ai.ml.entities.JobResourceConfiguration]
+ :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None.
+ :paramtype inputs: Optional[dict[str, Union[
+ ~azure.ai.ml.Input,
+ str,
+ bool,
+ int,
+ float,
+ Enum,
+ ]]]
+ :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None.
+ :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
+ :keyword instance_count: The number of instances or nodes to be used by the compute target. Defaults to 1.
+ :paramtype instance_count: Optional[int]
+ :keyword is_deterministic: Specifies whether the Command will return the same output given the same input.
+ Defaults to True. When True, if a Command (component) is deterministic and has been run before in the
+ current workspace with the same input and settings, it will reuse results from a previous submitted job
+ when used as a node or step in a pipeline. In that scenario, no compute resources will be used.
+ :paramtype is_deterministic: Optional[bool]
+ :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None.
+ :paramtype additional_includes: Optional[List[str]]
+ :keyword properties: The job property dictionary. Defaults to None.
+ :paramtype properties: Optional[dict[str, str]]
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if CommandComponent cannot be successfully validated.
+ Details will be provided in the error message.
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_command_configurations.py
+ :start-after: [START command_component_definition]
+ :end-before: [END command_component_definition]
+ :language: python
+ :dedent: 8
+ :caption: Creating a CommandComponent.
+ """
+
+ def __init__(
+ self,
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict] = None,
+ display_name: Optional[str] = None,
+ command: Optional[str] = None,
+ code: Optional[Union[str, os.PathLike]] = None,
+ environment: Optional[Union[str, Environment]] = None,
+ distribution: Optional[
+ Union[
+ Dict,
+ MpiDistribution,
+ TensorFlowDistribution,
+ PyTorchDistribution,
+ RayDistribution,
+ DistributionConfiguration,
+ ]
+ ] = None,
+ resources: Optional[JobResourceConfiguration] = None,
+ inputs: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ instance_count: Optional[int] = None, # promoted property from resources.instance_count
+ is_deterministic: bool = True,
+ additional_includes: Optional[List] = None,
+ properties: Optional[Dict] = None,
+ **kwargs: Any,
+ ) -> None:
+ # validate init params are valid type
+ validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+ kwargs[COMPONENT_TYPE] = NodeType.COMMAND
+
+ # Component backend doesn't support environment_variables yet,
+ # this is to support the case of CommandComponent being the trial of
+ # a SweepJob, where environment_variables is stored as part of trial
+ environment_variables = kwargs.pop("environment_variables", None)
+ super().__init__(
+ name=name,
+ version=version,
+ description=description,
+ tags=tags,
+ display_name=display_name,
+ inputs=inputs,
+ outputs=outputs,
+ is_deterministic=is_deterministic,
+ properties=properties,
+ **kwargs,
+ )
+
+ # No validation on value passed here because in pipeline job, required code&environment maybe absent
+ # and fill in later with job defaults.
+ self.command = command
+ self.code = code
+ self.environment_variables = environment_variables
+ self.environment = environment
+ self.resources = resources # type: ignore[assignment]
+ self.distribution = distribution
+
+ # check mutual exclusivity of promoted properties
+ if self.resources is not None and instance_count is not None:
+ msg = "instance_count and resources are mutually exclusive"
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.COMPONENT,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ self.instance_count = instance_count
+ self.additional_includes = additional_includes or []
+
+ def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+ """Dump the component content into a sorted yaml string.
+
+ :return: The ordered dict
+ :rtype: Dict
+ """
+
+ obj: dict = super()._to_ordered_dict_for_yaml_dump()
+ # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+ if self.code and isinstance(self.code, str):
+ obj["code"] = self.code
+ return obj
+
+ @property
+ def instance_count(self) -> Optional[int]:
+ """The number of instances or nodes to be used by the compute target.
+
+ :return: The number of instances or nodes.
+ :rtype: int
+ """
+ return self.resources.instance_count if self.resources and not isinstance(self.resources, dict) else None
+
+ @instance_count.setter
+ def instance_count(self, value: int) -> None:
+ """Sets the number of instances or nodes to be used by the compute target.
+
+ :param value: The number of instances of nodes to be used by the compute target. Defaults to 1.
+ :type value: int
+ """
+ if not value:
+ return
+ if not self.resources:
+ self.resources = JobResourceConfiguration(instance_count=value)
+ else:
+ if not isinstance(self.resources, dict):
+ self.resources.instance_count = value
+
+ @classmethod
+ def _attr_type_map(cls) -> dict:
+ return {
+ "environment": (str, Environment),
+ "environment_variables": dict,
+ "resources": (dict, JobResourceConfiguration),
+ "code": (str, os.PathLike),
+ }
+
+ def _to_dict(self) -> Dict:
+ return cast(
+ dict, convert_ordered_dict_to_dict({**self._other_parameter, **super(CommandComponent, self)._to_dict()})
+ )
+
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+ # put it here as distribution is shared by some components, e.g. command
+ distribution = obj.properties.component_spec.pop("distribution", None)
+ init_kwargs: dict = super()._from_rest_object_to_init_params(obj)
+ if distribution:
+ init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution)
+ return init_kwargs
+
+ def _get_environment_id(self) -> Union[str, None]:
+ # Return environment id of environment
+ # handle case when environment is defined inline
+ if isinstance(self.environment, Environment):
+ _id: Optional[str] = self.environment.id
+ return _id
+ return self.environment
+
+ # region SchemaValidatableMixin
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return CommandComponentSchema(context=context)
+
+ def _customized_validate(self) -> MutableValidationResult:
+ validation_result = super(CommandComponent, self)._customized_validate()
+ self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
+ validation_result.merge_with(self._validate_command())
+ validation_result.merge_with(self._validate_early_available_output())
+ return validation_result
+
+ def _validate_command(self) -> MutableValidationResult:
+ validation_result = self._create_empty_validation_result()
+ # command
+ if self.command:
+ invalid_expressions = []
+ for data_binding_expression in get_all_data_binding_expressions(self.command, is_singular=False):
+ if not self._is_valid_data_binding_expression(data_binding_expression):
+ invalid_expressions.append(data_binding_expression)
+
+ if invalid_expressions:
+ validation_result.append_error(
+ yaml_path="command",
+ message="Invalid data binding expression: {}".format(", ".join(invalid_expressions)),
+ )
+ return validation_result
+
+ def _validate_early_available_output(self) -> MutableValidationResult:
+ validation_result = self._create_empty_validation_result()
+ for name, output in self.outputs.items():
+ if output.early_available is True and output._is_primitive_type is not True:
+ msg = (
+ f"Early available output {name!r} requires output is primitive type, "
+ f"got {output._is_primitive_type!r}."
+ )
+ validation_result.append_error(message=msg, yaml_path=f"outputs.{name}")
+ return validation_result
+
+ def _is_valid_data_binding_expression(self, data_binding_expression: str) -> bool:
+ current_obj: Any = self
+ for item in data_binding_expression.split("."):
+ if hasattr(current_obj, item):
+ current_obj = getattr(current_obj, item)
+ else:
+ try:
+ current_obj = current_obj[item]
+ except Exception: # pylint: disable=W0718
+ return False
+ return True
+
+ # endregion
+
+ @classmethod
+ def _parse_args_description_from_docstring(cls, docstring: str) -> Dict:
+ res: dict = parse_args_description_from_docstring(docstring)
+ return res
+
+ def __str__(self) -> str:
+ try:
+ toYaml: str = self._to_yaml()
+ return toYaml
+ except BaseException: # pylint: disable=W0718
+ toStr: str = super(CommandComponent, self).__str__()
+ return toStr
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py
new file mode 100644
index 00000000..c02a3a33
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py
@@ -0,0 +1,641 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import re
+import uuid
+from os import PathLike
+from pathlib import Path
+from typing import IO, TYPE_CHECKING, Any, AnyStr, Callable, Dict, Iterable, Optional, Tuple, Union
+
+from marshmallow import INCLUDE
+
+from ..._restclient.v2024_01_01_preview.models import (
+ ComponentContainer,
+ ComponentContainerProperties,
+ ComponentVersion,
+ ComponentVersionProperties,
+)
+from ..._schema import PathAwareSchema
+from ..._schema.component import ComponentSchema
+from ..._utils.utils import dump_yaml_to_file, hash_dict
+from ...constants._common import (
+ ANONYMOUS_COMPONENT_NAME,
+ BASE_PATH_CONTEXT_KEY,
+ PARAMS_OVERRIDE_KEY,
+ REGISTRY_URI_FORMAT,
+ SOURCE_PATH_CONTEXT_KEY,
+ CommonYamlFields,
+ SchemaUrl,
+)
+from ...constants._component import ComponentSource, IOConstants, NodeType
+from ...entities._assets.asset import Asset
+from ...entities._inputs_outputs import Input, Output
+from ...entities._mixins import LocalizableMixin, TelemetryMixin, YamlTranslatableMixin
+from ...entities._system_data import SystemData
+from ...entities._util import find_type_in_override
+from ...entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin, RemoteValidatableMixin
+from ...exceptions import ErrorCategory, ErrorTarget, ValidationException
+from .._inputs_outputs import GroupInput
+
+if TYPE_CHECKING:
+ from ...entities.builders import BaseNode
+# pylint: disable=protected-access, redefined-builtin
+# disable redefined-builtin to use id/type as argument name
+
+
+COMPONENT_PLACEHOLDER = "COMPONENT_PLACEHOLDER"
+
+
+class Component(
+ Asset,
+ RemoteValidatableMixin,
+ TelemetryMixin,
+ YamlTranslatableMixin,
+ PathAwareSchemaValidatableMixin,
+ LocalizableMixin,
+):
+ """Base class for component version, used to define a component. Can't be instantiated directly.
+
+ :param name: Name of the resource.
+ :type name: str
+ :param version: Version of the resource.
+ :type version: str
+ :param id: Global ID of the resource, Azure Resource Manager ID.
+ :type id: str
+ :param type: Type of the command, supported is 'command'.
+ :type type: str
+ :param description: Description of the resource.
+ :type description: str
+ :param tags: Tag dictionary. Tags can be added, removed, and updated.
+ :type tags: dict
+ :param properties: Internal use only.
+ :type properties: dict
+ :param display_name: Display name of the component.
+ :type display_name: str
+ :param is_deterministic: Whether the component is deterministic. Defaults to True.
+ :type is_deterministic: bool
+ :param inputs: Inputs of the component.
+ :type inputs: dict
+ :param outputs: Outputs of the component.
+ :type outputs: dict
+ :param yaml_str: The YAML string of the component.
+ :type yaml_str: str
+ :param _schema: Schema of the component.
+ :type _schema: str
+ :param creation_context: Creation metadata of the component.
+ :type creation_context: ~azure.ai.ml.entities.SystemData
+ :param kwargs: Additional parameters for the component.
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully validated.
+ Details will be provided in the error message.
+ """
+
+ # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ id: Optional[str] = None,
+ type: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict] = None,
+ properties: Optional[Dict] = None,
+ display_name: Optional[str] = None,
+ is_deterministic: bool = True,
+ inputs: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ yaml_str: Optional[str] = None,
+ _schema: Optional[str] = None,
+ creation_context: Optional[SystemData] = None,
+ **kwargs: Any,
+ ) -> None:
+ self.latest_version = None
+ self._intellectual_property = kwargs.pop("intellectual_property", None)
+ # Setting this before super init because when asset init version, _auto_increment_version's value may change
+ self._auto_increment_version = kwargs.pop("auto_increment", False)
+ # Get source from id first, then kwargs.
+ self._source = (
+ self._resolve_component_source_from_id(id) if id else kwargs.pop("_source", ComponentSource.CLASS)
+ )
+ # use ANONYMOUS_COMPONENT_NAME instead of guid
+ is_anonymous = kwargs.pop("is_anonymous", False)
+ if not name and version is None:
+ name = ANONYMOUS_COMPONENT_NAME
+ version = "1"
+ is_anonymous = True
+
+ super().__init__(
+ name=name,
+ version=version,
+ id=id,
+ description=description,
+ tags=tags,
+ properties=properties,
+ creation_context=creation_context,
+ is_anonymous=is_anonymous,
+ base_path=kwargs.pop(BASE_PATH_CONTEXT_KEY, None),
+ source_path=kwargs.pop(SOURCE_PATH_CONTEXT_KEY, None),
+ )
+ # store kwargs to self._other_parameter instead of pop to super class to allow component have extra
+ # fields not defined in current schema.
+
+ inputs = inputs if inputs else {}
+ outputs = outputs if outputs else {}
+
+ self.name = name
+ self._schema = _schema
+ self._type = type
+ self._display_name = display_name
+ self._is_deterministic = is_deterministic
+ self._inputs = self._build_io(inputs, is_input=True)
+ self._outputs = self._build_io(outputs, is_input=False)
+ # Store original yaml
+ self._yaml_str = yaml_str
+ self._other_parameter = kwargs
+
+ @property
+ def _func(self) -> Callable[..., "BaseNode"]:
+ from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function
+
+ # validate input/output names before creating component function
+ validation_result = self._validate_io_names(self.inputs)
+ validation_result.merge_with(self._validate_io_names(self.outputs))
+ self._try_raise(validation_result)
+
+ res: Callable = _generate_component_function(self)
+ return res
+
+ @property
+ def type(self) -> Optional[str]:
+ """Type of the component, default is 'command'.
+
+ :return: Type of the component.
+ :rtype: str
+ """
+ return self._type
+
+ @property
+ def display_name(self) -> Optional[str]:
+ """Display name of the component.
+
+ :return: Display name of the component.
+ :rtype: str
+ """
+ return self._display_name
+
+ @display_name.setter
+ def display_name(self, custom_display_name: str) -> None:
+ """Set display_name of the component.
+
+ :param custom_display_name: The new display name
+ :type custom_display_name: str
+ """
+ self._display_name = custom_display_name
+
+ @property
+ def is_deterministic(self) -> Optional[bool]:
+ """Whether the component is deterministic.
+
+ :return: Whether the component is deterministic
+ :rtype: bool
+ """
+ return self._is_deterministic
+
+ @property
+ def inputs(self) -> Dict:
+ """Inputs of the component.
+
+ :return: Inputs of the component.
+ :rtype: dict
+ """
+ res: dict = self._inputs
+ return res
+
+ @property
+ def outputs(self) -> Dict:
+ """Outputs of the component.
+
+ :return: Outputs of the component.
+ :rtype: dict
+ """
+ return self._outputs
+
+ @property
+ def version(self) -> Optional[str]:
+ """Version of the component.
+
+ :return: Version of the component.
+ :rtype: str
+ """
+ return self._version
+
+ @version.setter
+ def version(self, value: str) -> None:
+ """Set the version of the component.
+
+ :param value: The version of the component.
+ :type value: str
+ """
+ if value:
+ if not isinstance(value, str):
+ msg = f"Component version must be a string, not type {type(value)}."
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.COMPONENT,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ self._version = value
+ self._auto_increment_version = self.name and not self._version
+
+ def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
+ """Dump the component content into a file in yaml format.
+
+ :param dest: The destination to receive this component's content.
+ Must be either a path to a local file, or an already-open file stream.
+ If dest is a file path, a new file will be created,
+ and an exception is raised if the file exists.
+ If dest is an open file, the file will be written to directly,
+ and an exception will be raised if the file is not writable.
+ :type dest: Union[PathLike, str, IO[AnyStr]]
+ """
+ path = kwargs.pop("path", None)
+ yaml_serialized = self._to_dict()
+ dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)
+
+ @staticmethod
+ def _resolve_component_source_from_id( # pylint: disable=docstring-type-do-not-use-class
+ id: Optional[Union["Component", str]],
+ ) -> Any:
+ """Resolve the component source from id.
+
+ :param id: The component ID
+ :type id: Optional[str]
+ :return: The component source
+ :rtype: Literal[
+ ComponentSource.CLASS,
+ ComponentSource.REMOTE_REGISTRY,
+ ComponentSource.REMOTE_WORKSPACE_COMPONENT
+
+ ]
+ """
+ if id is None:
+ return ComponentSource.CLASS
+ # Consider default is workspace source, as
+ # azureml: prefix will be removed for arm versioned id.
+ return (
+ ComponentSource.REMOTE_REGISTRY
+ if not isinstance(id, Component) and id.startswith(REGISTRY_URI_FORMAT)
+ else ComponentSource.REMOTE_WORKSPACE_COMPONENT
+ )
+
+ @classmethod
+ def _validate_io_names(cls, io_names: Iterable[str], raise_error: bool = False) -> MutableValidationResult:
+ """Validate input/output names, raise exception if invalid.
+
+ :param io_names: The names to validate
+ :type io_names: Iterable[str]
+ :param raise_error: Whether to raise if validation fails. Defaults to False
+ :type raise_error: bool
+ :return: The validation result
+ :rtype: MutableValidationResult
+ """
+ validation_result = cls._create_empty_validation_result()
+ lower2original_kwargs: dict = {}
+
+ for name in io_names:
+ if re.match(IOConstants.VALID_KEY_PATTERN, name) is None:
+ msg = "{!r} is not a valid parameter name, must be composed letters, numbers, and underscores."
+ validation_result.append_error(message=msg.format(name), yaml_path=f"inputs.{name}")
+ # validate name conflict
+ lower_key = name.lower()
+ if lower_key in lower2original_kwargs:
+ msg = "Invalid component input names {!r} and {!r}, which are equal ignore case."
+ validation_result.append_error(
+ message=msg.format(name, lower2original_kwargs[lower_key]), yaml_path=f"inputs.{name}"
+ )
+ else:
+ lower2original_kwargs[lower_key] = name
+ return cls._try_raise(validation_result, raise_error=raise_error)
+
+ @classmethod
+ def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool) -> Dict:
+ component_io: dict = {}
+ for name, port in io_dict.items():
+ if is_input:
+ component_io[name] = port if isinstance(port, Input) else Input(**port)
+ else:
+ component_io[name] = port if isinstance(port, Output) else Output(**port)
+
+ if is_input:
+ # Restore flattened parameters to group
+ res: dict = GroupInput.restore_flattened_inputs(component_io)
+ return res
+ return component_io
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
+ return ComponentSchema(context=context)
+
+ @classmethod
+ def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
+ return ValidationException(
+ message=message,
+ no_personal_data_message=no_personal_data_message,
+ target=ErrorTarget.COMPONENT,
+ )
+
+ @classmethod
+ def _is_flow(cls, data: Any) -> bool:
+ _schema = data.get(CommonYamlFields.SCHEMA, None)
+
+ if _schema and _schema in [SchemaUrl.PROMPTFLOW_FLOW, SchemaUrl.PROMPTFLOW_RUN]:
+ return True
+ return False
+
+ @classmethod
+ def _load(
+ cls,
+ data: Optional[Dict] = None,
+ yaml_path: Optional[Union[PathLike, str]] = None,
+ params_override: Optional[list] = None,
+ **kwargs: Any,
+ ) -> "Component":
+ data = data or {}
+ params_override = params_override or []
+ base_path = Path(yaml_path).parent if yaml_path else Path("./")
+
+ type_in_override = find_type_in_override(params_override)
+
+ # type_in_override > type_in_yaml > default (command)
+ if type_in_override is None:
+ type_in_override = data.get(CommonYamlFields.TYPE, None)
+ if type_in_override is None and cls._is_flow(data):
+ type_in_override = NodeType.FLOW_PARALLEL
+ if type_in_override is None:
+ type_in_override = NodeType.COMMAND
+ data[CommonYamlFields.TYPE] = type_in_override
+
+ from azure.ai.ml.entities._component.component_factory import component_factory
+
+ create_instance_func, _ = component_factory.get_create_funcs(
+ data,
+ for_load=True,
+ )
+ new_instance: Component = create_instance_func()
+ # specific keys must be popped before loading with schema using kwargs
+ init_kwargs = {
+ "yaml_str": kwargs.pop("yaml_str", None),
+ "_source": kwargs.pop("_source", ComponentSource.YAML_COMPONENT),
+ }
+ init_kwargs.update(
+ new_instance._load_with_schema( # pylint: disable=protected-access
+ data,
+ context={
+ BASE_PATH_CONTEXT_KEY: base_path,
+ SOURCE_PATH_CONTEXT_KEY: yaml_path,
+ PARAMS_OVERRIDE_KEY: params_override,
+ },
+ unknown=INCLUDE,
+ raise_original_exception=True,
+ **kwargs,
+ )
+ )
+ # Set base path separately to avoid doing this in post load, as return types of post load are not unified,
+ # could be object or dict.
+ # base_path in context can be changed in loading, so we use original base_path here.
+ init_kwargs[BASE_PATH_CONTEXT_KEY] = base_path.absolute()
+ if yaml_path:
+ init_kwargs[SOURCE_PATH_CONTEXT_KEY] = Path(yaml_path).absolute().as_posix()
+ # TODO: Bug Item number: 2883415
+ new_instance.__init__( # type: ignore
+ **init_kwargs,
+ )
+ return new_instance
+
+ @classmethod
+ def _from_container_rest_object(cls, component_container_rest_object: ComponentContainer) -> "Component":
+ component_container_details: ComponentContainerProperties = component_container_rest_object.properties
+ component = Component(
+ id=component_container_rest_object.id,
+ name=component_container_rest_object.name,
+ description=component_container_details.description,
+ creation_context=SystemData._from_rest_object(component_container_rest_object.system_data),
+ tags=component_container_details.tags,
+ properties=component_container_details.properties,
+ type=NodeType._CONTAINER,
+ # Set this field to None as it hold a default True in init.
+ is_deterministic=None, # type: ignore[arg-type]
+ )
+ component.latest_version = component_container_details.latest_version
+ return component
+
+ @classmethod
+ def _from_rest_object(cls, obj: ComponentVersion) -> "Component":
+ # TODO: Remove in PuP with native import job/component type support in MFE/Designer
+ # Convert command component back to import component private preview
+ component_spec = obj.properties.component_spec
+ if component_spec[CommonYamlFields.TYPE] == NodeType.COMMAND and component_spec["command"] == NodeType.IMPORT:
+ component_spec[CommonYamlFields.TYPE] = NodeType.IMPORT
+ component_spec["source"] = component_spec.pop("inputs")
+ component_spec["output"] = component_spec.pop("outputs")["output"]
+
+ # shouldn't block serialization when name is not valid
+ # maybe override serialization method for name field?
+ from azure.ai.ml.entities._component.component_factory import component_factory
+
+ create_instance_func, _ = component_factory.get_create_funcs(obj.properties.component_spec, for_load=True)
+
+ instance: Component = create_instance_func()
+ # TODO: Bug Item number: 2883415
+ instance.__init__(**instance._from_rest_object_to_init_params(obj)) # type: ignore
+ return instance
+
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+ # Object got from rest data contain _source, we delete it.
+ if "_source" in obj.properties.component_spec:
+ del obj.properties.component_spec["_source"]
+
+ rest_component_version = obj.properties
+ _type = rest_component_version.component_spec[CommonYamlFields.TYPE]
+
+ # inputs/outputs will be parsed by instance._build_io in instance's __init__
+ inputs = rest_component_version.component_spec.pop("inputs", {})
+ # parse String -> string, Integer -> integer, etc
+ for _input in inputs.values():
+ _input["type"] = Input._map_from_rest_type(_input["type"])
+ outputs = rest_component_version.component_spec.pop("outputs", {})
+
+ origin_name = rest_component_version.component_spec[CommonYamlFields.NAME]
+ rest_component_version.component_spec[CommonYamlFields.NAME] = ANONYMOUS_COMPONENT_NAME
+ init_kwargs = cls._load_with_schema(
+ rest_component_version.component_spec, context={BASE_PATH_CONTEXT_KEY: Path.cwd()}, unknown=INCLUDE
+ )
+ init_kwargs.update(
+ {
+ "id": obj.id,
+ "is_anonymous": rest_component_version.is_anonymous,
+ "creation_context": obj.system_data,
+ "inputs": inputs,
+ "outputs": outputs,
+ "name": origin_name,
+ }
+ )
+
+ # remove empty values, because some property only works for specific component, eg: distribution for command
+ # note that there is an issue that environment == {} will always be true, so use isinstance here
+ return {k: v for k, v in init_kwargs.items() if v is not None and not (isinstance(v, dict) and not v)}
+
+ def _get_anonymous_hash(self) -> str:
+ """Return the hash of anonymous component.
+
+ Anonymous Components (same code and interface) will have same hash.
+
+ :return: The component hash
+ :rtype: str
+ """
+ # omit version since anonymous component's version is random guid
+ # omit name since name doesn't impact component's uniqueness
+ return self._get_component_hash(keys_to_omit=["name", "id", "version"])
+
+ def _get_component_hash(self, keys_to_omit: Optional[Iterable[str]] = None) -> str:
+ """Return the hash of component.
+
+ :param keys_to_omit: An iterable of keys to omit when computing the component hash
+ :type keys_to_omit: Optional[Iterable[str]]
+ :return: The component hash
+ :rtype: str
+ """
+ component_interface_dict = self._to_dict()
+ res: str = hash_dict(component_interface_dict, keys_to_omit=keys_to_omit)
+ return res
+
+ @classmethod
+ def _get_resource_type(cls) -> str:
+ return "Microsoft.MachineLearningServices/workspaces/components/versions"
+
+ def _get_resource_name_version(self) -> Tuple:
+ version: Optional[str] = None
+ if not self.version and not self._auto_increment_version:
+ version = str(uuid.uuid4())
+ else:
+ version = self.version
+ return self.name or ANONYMOUS_COMPONENT_NAME, version
+
+ def _validate(self, raise_error: Optional[bool] = False) -> MutableValidationResult:
+ origin_name = self.name
+ # skip name validation for anonymous component as ANONYMOUS_COMPONENT_NAME will be used in component creation
+ if self._is_anonymous:
+ self.name = ANONYMOUS_COMPONENT_NAME
+ try:
+ return super()._validate(raise_error)
+ finally:
+ self.name = origin_name
+
+ def _customized_validate(self) -> MutableValidationResult:
+ validation_result = super(Component, self)._customized_validate()
+
+ # validate inputs names
+ validation_result.merge_with(self._validate_io_names(self.inputs, raise_error=False))
+ validation_result.merge_with(self._validate_io_names(self.outputs, raise_error=False))
+
+ return validation_result
+
+ def _get_anonymous_component_name_version(self) -> Tuple:
+ return ANONYMOUS_COMPONENT_NAME, self._get_anonymous_hash()
+
+ def _get_rest_name_version(self) -> Tuple:
+ if self._is_anonymous:
+ return self._get_anonymous_component_name_version()
+ return self.name, self.version
+
+ def _to_rest_object(self) -> ComponentVersion:
+ component = self._to_dict()
+
+ # TODO: Remove in PuP with native import job/component type support in MFE/Designer
+ # Convert import component to command component private preview
+ if component.get(CommonYamlFields.TYPE, None) == NodeType.IMPORT:
+ component[CommonYamlFields.TYPE] = NodeType.COMMAND
+ component["inputs"] = component.pop("source")
+ component["outputs"] = dict({"output": component.pop("output")})
+ # method _to_dict() will remove empty keys
+ if "tags" not in component:
+ component["tags"] = {}
+ component["tags"]["component_type_overwrite"] = NodeType.IMPORT
+ component["command"] = NodeType.IMPORT
+
+ # add source type to component rest object
+ component["_source"] = self._source
+ if self._intellectual_property:
+ # hack while full pass through supported is worked on for IPP fields
+ component.pop("intellectual_property")
+ component["intellectualProperty"] = self._intellectual_property._to_rest_object().serialize()
+ properties = ComponentVersionProperties(
+ component_spec=component,
+ description=self.description,
+ is_anonymous=self._is_anonymous,
+ properties=dict(self.properties) if self.properties else {},
+ tags=self.tags,
+ )
+ result = ComponentVersion(properties=properties)
+ if self._is_anonymous:
+ result.name = ANONYMOUS_COMPONENT_NAME
+ else:
+ result.name = self.name
+ result.properties.properties["client_component_hash"] = self._get_component_hash(keys_to_omit=["version"])
+ return result
+
+ def _to_dict(self) -> Dict:
+ # Replace the name of $schema to schema.
+ component_schema_dict: dict = self._dump_for_validation()
+ component_schema_dict.pop(BASE_PATH_CONTEXT_KEY, None)
+
+ # TODO: handle other_parameters and remove override from subclass
+ return component_schema_dict
+
+ def _localize(self, base_path: str) -> None:
+ """Called on an asset got from service to clean up remote attributes like id, creation_context, etc. and update
+ base_path.
+
+ :param base_path: The base_path
+ :type base_path: str
+ """
+ if not getattr(self, "id", None):
+ raise ValueError("Only remote asset can be localize but got a {} without id.".format(type(self)))
+ self._id = None
+ self._creation_context = None
+ self._base_path = base_path
+
+ def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict:
+ # Note: the is_anonymous is not reliable here, create_or_update will log is_anonymous from parameter.
+ is_anonymous = self.name is None or ANONYMOUS_COMPONENT_NAME in self.name
+ return {"type": self.type, "source": self._source, "is_anonymous": is_anonymous}
+
+ # pylint: disable-next=docstring-missing-param
+ def __call__(self, *args: Any, **kwargs: Any) -> "BaseNode":
+ """Call ComponentVersion as a function and get a Component object.
+
+ :return: The component object
+ :rtype: BaseNode
+ """
+ if args:
+ # raise clear error message for unsupported positional args
+ if self._func._has_parameters: # type: ignore
+ _error = f"got {args} for {self.name}"
+ msg = (
+ f"Component function doesn't support positional arguments, {_error}. " # type: ignore
+ f"Please use keyword arguments like: {self._func._func_calling_example}."
+ )
+ else:
+ msg = (
+ "Component function doesn't has any parameters, "
+ f"please make sure component {self.name} has inputs. "
+ )
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.COMPONENT,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ return self._func(*args, **kwargs) # pylint: disable=not-callable
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py
new file mode 100644
index 00000000..012dd260
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py
@@ -0,0 +1,171 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+from typing import Any, Callable, Dict, Optional, Tuple
+
+from marshmallow import Schema
+
+from ..._restclient.v2022_10_01.models import ComponentVersion
+from ..._utils.utils import is_internal_component_data
+from ...constants._common import SOURCE_PATH_CONTEXT_KEY
+from ...constants._component import DataTransferTaskType, NodeType
+from ...entities._component.automl_component import AutoMLComponent
+from ...entities._component.command_component import CommandComponent
+from ...entities._component.component import Component
+from ...entities._component.datatransfer_component import (
+ DataTransferCopyComponent,
+ DataTransferExportComponent,
+ DataTransferImportComponent,
+)
+from ...entities._component.import_component import ImportComponent
+from ...entities._component.parallel_component import ParallelComponent
+from ...entities._component.pipeline_component import PipelineComponent
+from ...entities._component.spark_component import SparkComponent
+from ...entities._util import get_type_from_spec
+from .flow import FlowComponent
+
+
+class _ComponentFactory:
+ """A class to create component instances from yaml dict or rest objects without hard-coded type check."""
+
+ def __init__(self) -> None:
+ self._create_instance_funcs: Dict = {}
+ self._create_schema_funcs: Dict = {}
+
+ self.register_type(
+ _type=NodeType.PARALLEL,
+ create_instance_func=lambda: ParallelComponent.__new__(ParallelComponent),
+ create_schema_func=ParallelComponent._create_schema_for_validation,
+ )
+ self.register_type(
+ _type=NodeType.COMMAND,
+ create_instance_func=lambda: CommandComponent.__new__(CommandComponent),
+ create_schema_func=CommandComponent._create_schema_for_validation,
+ )
+ self.register_type(
+ _type=NodeType.IMPORT,
+ create_instance_func=lambda: ImportComponent.__new__(ImportComponent),
+ create_schema_func=ImportComponent._create_schema_for_validation,
+ )
+ self.register_type(
+ _type=NodeType.PIPELINE,
+ create_instance_func=lambda: PipelineComponent.__new__(PipelineComponent),
+ create_schema_func=PipelineComponent._create_schema_for_validation,
+ )
+ self.register_type(
+ _type=NodeType.AUTOML,
+ create_instance_func=lambda: AutoMLComponent.__new__(AutoMLComponent),
+ create_schema_func=AutoMLComponent._create_schema_for_validation,
+ )
+ self.register_type(
+ _type=NodeType.SPARK,
+ create_instance_func=lambda: SparkComponent.__new__(SparkComponent),
+ create_schema_func=SparkComponent._create_schema_for_validation,
+ )
+ self.register_type(
+ _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.COPY_DATA]),
+ create_instance_func=lambda: DataTransferCopyComponent.__new__(DataTransferCopyComponent),
+ create_schema_func=DataTransferCopyComponent._create_schema_for_validation,
+ )
+
+ self.register_type(
+ _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.IMPORT_DATA]),
+ create_instance_func=lambda: DataTransferImportComponent.__new__(DataTransferImportComponent),
+ create_schema_func=DataTransferImportComponent._create_schema_for_validation,
+ )
+
+ self.register_type(
+ _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.EXPORT_DATA]),
+ create_instance_func=lambda: DataTransferExportComponent.__new__(DataTransferExportComponent),
+ create_schema_func=DataTransferExportComponent._create_schema_for_validation,
+ )
+
+ self.register_type(
+ _type=NodeType.FLOW_PARALLEL,
+ create_instance_func=lambda: FlowComponent.__new__(FlowComponent),
+ create_schema_func=FlowComponent._create_schema_for_validation,
+ )
+
+ def get_create_funcs(
+ self, yaml_spec: dict, for_load: bool = False
+ ) -> Tuple[Callable[..., Component], Callable[[Any], Schema]]:
+ """Get registered functions to create an instance and its corresponding schema for the given type.
+
+ :param yaml_spec: The YAML specification.
+ :type yaml_spec: dict
+ :param for_load: Whether the function is called for loading a component. Defaults to False.
+ :type for_load: bool
+ :return: A tuple containing the create_instance_func and create_schema_func.
+ :rtype: tuple
+ """
+
+ _type = get_type_from_spec(yaml_spec, valid_keys=self._create_instance_funcs)
+ # SparkComponent and InternalSparkComponent share the same type name, but they are different types.
+ if for_load and is_internal_component_data(yaml_spec, raise_if_not_enabled=True) and _type == NodeType.SPARK:
+ from azure.ai.ml._internal._schema.node import NodeType as InternalNodeType
+
+ _type = InternalNodeType.SPARK
+
+ create_instance_func = self._create_instance_funcs[_type]
+ create_schema_func = self._create_schema_funcs[_type]
+ return create_instance_func, create_schema_func
+
+ def register_type(
+ self,
+ _type: str,
+ create_instance_func: Callable[..., Component],
+ create_schema_func: Callable[[Any], Schema],
+ ) -> None:
+ """Register a new component type.
+
+ :param _type: The type name of the component.
+ :type _type: str
+ :param create_instance_func: A function to create an instance of the component.
+ :type create_instance_func: Callable[..., ~azure.ai.ml.entities.Component]
+ :param create_schema_func: A function to create a schema for the component.
+ :type create_schema_func: Callable[[Any], Schema]
+ """
+ self._create_instance_funcs[_type] = create_instance_func
+ self._create_schema_funcs[_type] = create_schema_func
+
+ @classmethod
+ def load_from_dict(cls, *, data: Dict, context: Dict, _type: Optional[str] = None, **kwargs: Any) -> Component:
+ """Load a component from a YAML dict.
+
+ :keyword data: The YAML dict.
+ :paramtype data: dict
+ :keyword context: The context of the YAML dict.
+ :paramtype context: dict
+ :keyword _type: The type name of the component. When None, it will be inferred from the YAML dict.
+ :paramtype _type: str
+ :return: The loaded component.
+ :rtype: ~azure.ai.ml.entities.Component
+ """
+
+ return Component._load(
+ data=data,
+ yaml_path=context.get(SOURCE_PATH_CONTEXT_KEY, None),
+ params_override=[{"type": _type}] if _type is not None else [],
+ **kwargs,
+ )
+
+ @classmethod
+ def load_from_rest(cls, *, obj: ComponentVersion, _type: Optional[str] = None) -> Component:
+ """Load a component from a REST object.
+
+ :keyword obj: The REST object.
+ :paramtype obj: ComponentVersion
+ :keyword _type: The type name of the component. When None, it will be inferred from the REST object.
+ :paramtype _type: str
+ :return: The loaded component.
+ :rtype: ~azure.ai.ml.entities.Component
+ """
+ if _type is not None:
+ obj.properties.component_spec["type"] = _type
+ return Component._from_rest_object(obj)
+
+
+component_factory = _ComponentFactory()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py
new file mode 100644
index 00000000..e71712ab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py
@@ -0,0 +1,325 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from pathlib import Path
+from typing import Any, Dict, NoReturn, Optional, Union, cast
+
+from marshmallow import Schema
+
+from azure.ai.ml._schema.component.data_transfer_component import (
+ DataTransferCopyComponentSchema,
+ DataTransferExportComponentSchema,
+ DataTransferImportComponentSchema,
+)
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, COMPONENT_TYPE, AssetTypes
+from azure.ai.ml.constants._component import DataTransferTaskType, ExternalDataType, NodeType
+from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
+from azure.ai.ml.entities._inputs_outputs.output import Output
+from azure.ai.ml.entities._validation.core import MutableValidationResult
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
+
+from ..._schema import PathAwareSchema
+from .._util import convert_ordered_dict_to_dict, validate_attribute_type
+from .component import Component
+
+
+class DataTransferComponent(Component):
+ """DataTransfer component version, used to define a data transfer component.
+
+ :param task: Task type in the data transfer component. Possible values are "copy_data",
+ "import_data", and "export_data".
+ :type task: str
+ :param inputs: Mapping of input data bindings used in the job.
+ :type inputs: dict
+ :param outputs: Mapping of output data bindings used in the job.
+ :type outputs: dict
+ :param kwargs: Additional parameters for the data transfer component.
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
+ Details will be provided in the error message.
+ """
+
+ def __init__(
+ self,
+ *,
+ task: Optional[str] = None,
+ inputs: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ **kwargs: Any,
+ ) -> None:
+ # validate init params are valid type
+ validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+ kwargs[COMPONENT_TYPE] = NodeType.DATA_TRANSFER
+ # Set default base path
+ if BASE_PATH_CONTEXT_KEY not in kwargs:
+ kwargs[BASE_PATH_CONTEXT_KEY] = Path(".")
+
+ super().__init__(
+ inputs=inputs,
+ outputs=outputs,
+ **kwargs,
+ )
+ self._task = task
+
+ @classmethod
+ def _attr_type_map(cls) -> dict:
+ return {}
+
+ @property
+ def task(self) -> Optional[str]:
+ """Task type of the component.
+
+ :return: Task type of the component.
+ :rtype: str
+ """
+ return self._task
+
+ def _to_dict(self) -> Dict:
+ return cast(
+ dict,
+ convert_ordered_dict_to_dict({**self._other_parameter, **super(DataTransferComponent, self)._to_dict()}),
+ )
+
+ def __str__(self) -> str:
+ try:
+ _toYaml: str = self._to_yaml()
+ return _toYaml
+ except BaseException: # pylint: disable=W0718
+ _toStr: str = super(DataTransferComponent, self).__str__()
+ return _toStr
+
+ @classmethod
+ def _build_source_sink(cls, io_dict: Union[Dict, Database, FileSystem]) -> Union[Database, FileSystem]:
+ component_io: Union[Database, FileSystem] = Database()
+
+ if isinstance(io_dict, Database):
+ component_io = Database()
+ elif isinstance(io_dict, FileSystem):
+ component_io = FileSystem()
+ else:
+ if isinstance(io_dict, dict):
+ data_type = io_dict.pop("type", None)
+ if data_type == ExternalDataType.DATABASE:
+ component_io = Database()
+ elif data_type == ExternalDataType.FILE_SYSTEM:
+ component_io = FileSystem()
+ else:
+ msg = "Type in source or sink only support {} and {}, currently got {}."
+ raise ValidationException(
+ message=msg.format(
+ ExternalDataType.DATABASE,
+ ExternalDataType.FILE_SYSTEM,
+ data_type,
+ ),
+ no_personal_data_message=msg.format(
+ ExternalDataType.DATABASE,
+ ExternalDataType.FILE_SYSTEM,
+ "data_type",
+ ),
+ target=ErrorTarget.COMPONENT,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.INVALID_VALUE,
+ )
+ else:
+ msg = "Source or sink only support dict, Database and FileSystem"
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.COMPONENT,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.INVALID_VALUE,
+ )
+
+ return component_io
+
+
+@experimental
+class DataTransferCopyComponent(DataTransferComponent):
+ """DataTransfer copy component version, used to define a data transfer copy component.
+
+ :param data_copy_mode: Data copy mode in the copy task.
+ Possible values are "merge_with_overwrite" and "fail_if_conflict".
+ :type data_copy_mode: str
+ :param inputs: Mapping of input data bindings used in the job.
+ :type inputs: dict
+ :param outputs: Mapping of output data bindings used in the job.
+ :type outputs: dict
+ :param kwargs: Additional parameters for the data transfer copy component.
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
+ Details will be provided in the error message.
+ """
+
+ def __init__(
+ self,
+ *,
+ data_copy_mode: Optional[str] = None,
+ inputs: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ **kwargs: Any,
+ ) -> None:
+ kwargs["task"] = DataTransferTaskType.COPY_DATA
+ super().__init__(
+ inputs=inputs,
+ outputs=outputs,
+ **kwargs,
+ )
+
+ self._data_copy_mode = data_copy_mode
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return DataTransferCopyComponentSchema(context=context)
+
+ @property
+ def data_copy_mode(self) -> Optional[str]:
+ """Data copy mode of the component.
+
+ :return: Data copy mode of the component.
+ :rtype: str
+ """
+ return self._data_copy_mode
+
+ def _customized_validate(self) -> MutableValidationResult:
+ validation_result = super(DataTransferCopyComponent, self)._customized_validate()
+ validation_result.merge_with(self._validate_input_output_mapping())
+ return validation_result
+
+ def _validate_input_output_mapping(self) -> MutableValidationResult:
+ validation_result = self._create_empty_validation_result()
+ inputs_count = len(self.inputs)
+ outputs_count = len(self.outputs)
+ if outputs_count != 1:
+ msg = "Only support single output in {}, but there're {} outputs."
+ validation_result.append_error(
+ message=msg.format(DataTransferTaskType.COPY_DATA, outputs_count),
+ yaml_path="outputs",
+ )
+ else:
+ input_type = None
+ output_type = None
+ if inputs_count == 1:
+ for _, input_data in self.inputs.items():
+ input_type = input_data.type
+ for _, output_data in self.outputs.items():
+ output_type = output_data.type
+ if input_type is None or output_type is None or input_type != output_type:
+ msg = "Input type {} doesn't exactly match with output type {} in task {}"
+ validation_result.append_error(
+ message=msg.format(input_type, output_type, DataTransferTaskType.COPY_DATA),
+ yaml_path="outputs",
+ )
+ elif inputs_count > 1:
+ for _, output_data in self.outputs.items():
+ output_type = output_data.type
+ if output_type is None or output_type != AssetTypes.URI_FOLDER:
+ msg = "output type {} need to be {} in task {}"
+ validation_result.append_error(
+ message=msg.format(
+ output_type,
+ AssetTypes.URI_FOLDER,
+ DataTransferTaskType.COPY_DATA,
+ ),
+ yaml_path="outputs",
+ )
+ else:
+ msg = "Inputs must be set in task {}."
+ validation_result.append_error(
+ message=msg.format(DataTransferTaskType.COPY_DATA),
+ yaml_path="inputs",
+ )
+ return validation_result
+
+
+@experimental
+class DataTransferImportComponent(DataTransferComponent):
+ """DataTransfer import component version, used to define a data transfer import component.
+
+ :param source: The data source of the file system or database.
+ :type source: dict
+ :param outputs: Mapping of output data bindings used in the job.
+ Default value is an output port with the key "sink" and the type "mltable".
+ :type outputs: dict
+ :param kwargs: Additional parameters for the data transfer import component.
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
+ Details will be provided in the error message.
+ """
+
+ def __init__(
+ self,
+ *,
+ source: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ **kwargs: Any,
+ ) -> None:
+ outputs = outputs or {"sink": Output(type=AssetTypes.MLTABLE)}
+ kwargs["task"] = DataTransferTaskType.IMPORT_DATA
+ super().__init__(
+ outputs=outputs,
+ **kwargs,
+ )
+
+ source = source if source else {}
+ self.source = self._build_source_sink(source)
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return DataTransferImportComponentSchema(context=context)
+
+ # pylint: disable-next=docstring-missing-param
+ def __call__(self, *args: Any, **kwargs: Any) -> NoReturn:
+ """Call ComponentVersion as a function and get a Component object."""
+
+ msg = "DataTransfer component is not callable for import task."
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.COMPONENT,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+
+@experimental
+class DataTransferExportComponent(DataTransferComponent):
+ """DataTransfer export component version, used to define a data transfer export component.
+
+ :param sink: The sink of external data and databases.
+ :type sink: Union[Dict, Database, FileSystem]
+ :param inputs: Mapping of input data bindings used in the job.
+ :type inputs: dict
+ :param kwargs: Additional parameters for the data transfer export component.
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
+ Details will be provided in the error message.
+ """
+
+ def __init__(
+ self,
+ *,
+ inputs: Optional[Dict] = None,
+ sink: Optional[Dict] = None,
+ **kwargs: Any,
+ ) -> None:
+ kwargs["task"] = DataTransferTaskType.EXPORT_DATA
+ super().__init__(
+ inputs=inputs,
+ **kwargs,
+ )
+
+ sink = sink if sink else {}
+ self.sink = self._build_source_sink(sink)
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return DataTransferExportComponentSchema(context=context)
+
+ # pylint: disable-next=docstring-missing-param
+ def __call__(self, *args: Any, **kwargs: Any) -> NoReturn:
+ """Call ComponentVersion as a function and get a Component object."""
+
+ msg = "DataTransfer component is not callable for export task."
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.COMPONENT,
+ error_category=ErrorCategory.USER_ERROR,
+ )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py
new file mode 100644
index 00000000..e4ff06cc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py
@@ -0,0 +1,553 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import contextlib
+import json
+import os
+from collections import defaultdict
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Union
+
+import yaml # type: ignore[import]
+from marshmallow import EXCLUDE, Schema, ValidationError
+
+from azure.ai.ml.constants._common import (
+ BASE_PATH_CONTEXT_KEY,
+ COMPONENT_TYPE,
+ PROMPTFLOW_AZUREML_OVERRIDE_KEY,
+ SOURCE_PATH_CONTEXT_KEY,
+ AssetTypes,
+ SchemaUrl,
+)
+from azure.ai.ml.constants._component import ComponentParameterTypes, NodeType
+
+from ..._restclient.v2022_10_01.models import ComponentVersion
+from ..._schema import PathAwareSchema
+from ..._schema.component.flow import FlowComponentSchema, FlowSchema, RunSchema
+from ...exceptions import ErrorCategory, ErrorTarget, ValidationException
+from .. import Environment
+from .._inputs_outputs import GroupInput, Input, Output
+from ._additional_includes import AdditionalIncludesMixin
+from .component import Component
+
+# avoid circular import error
+if TYPE_CHECKING:
+ from azure.ai.ml.entities._builders.parallel import Parallel
+
+# pylint: disable=protected-access
+
+
+class _FlowPortNames:
+ """Common yaml fields.
+
+ Common yaml fields are used to define the common fields in yaml files. It can be one of the following values: type,
+ name, $schema.
+ """
+
+ DATA = "data"
+ RUN_OUTPUTS = "run_outputs"
+ CONNECTIONS = "connections"
+
+ FLOW_OUTPUTS = "flow_outputs"
+ DEBUG_INFO = "debug_info"
+
+
+class _FlowComponentPortDict(dict):
+ def __init__(self, ports: Dict):
+ self._allow_update_item = True
+ super().__init__()
+ for input_port_name, input_port in ports.items():
+ self[input_port_name] = input_port
+ self._allow_update_item = False
+
+ def __setitem__(self, key: Any, value: Any) -> None:
+ if not self._allow_update_item:
+ raise RuntimeError("Ports of flow component are not editable.")
+ super().__setitem__(key, value)
+
+ def __delitem__(self, key: Any) -> None:
+ if not self._allow_update_item:
+ raise RuntimeError("Ports of flow component are not editable.")
+ super().__delitem__(key)
+
+
+class FlowComponentInputDict(_FlowComponentPortDict):
+ """Input port dictionary for FlowComponent, with fixed input ports."""
+
+ def __init__(self) -> None:
+ super().__init__(
+ {
+ _FlowPortNames.CONNECTIONS: GroupInput(values={}, _group_class=None),
+ _FlowPortNames.DATA: Input(type=AssetTypes.URI_FOLDER, optional=False),
+ _FlowPortNames.FLOW_OUTPUTS: Input(type=AssetTypes.URI_FOLDER, optional=True),
+ }
+ )
+
+ @contextlib.contextmanager
+ def _fit_inputs(self, inputs: Optional[Dict]) -> Generator:
+ """Add dynamic input ports to the input port dictionary.
+ Input ports of a flow component include:
+ 1. data: required major uri_folder input
+ 2. run_output: optional uri_folder input
+ 3. connections.xxx.xxx: group of string parameters, first layer key can be any node name,
+ but we won't resolve the exact keys in SDK
+ 4. xxx: input_mapping parameters, key can be any node name, but we won't resolve the exact keys in SDK
+
+ #3 will be grouped into connections, we make it a fixed group input port.
+ #4 are dynamic input ports, we will add them temporarily in this context manager and remove them
+ after the context manager is finished.
+
+ :param inputs: The dynamic input to fit.
+ :type inputs: Dict[str, Any]
+ :return: None
+ :rtype: None
+ """
+ dynamic_columns_mapping_keys = []
+ dynamic_connections_inputs = defaultdict(list)
+ from azure.ai.ml.entities._job.pipeline._io import _GroupAttrDict
+ from azure.ai.ml.entities._job.pipeline._io.mixin import flatten_dict
+
+ flattened_inputs = flatten_dict(inputs, _GroupAttrDict, allow_dict_fields=[_FlowPortNames.CONNECTIONS])
+
+ for flattened_input_key in flattened_inputs:
+ if flattened_input_key.startswith(f"{_FlowPortNames.CONNECTIONS}."):
+ if flattened_input_key.count(".") != 2:
+ raise ValidationException(
+ message="flattened connection input prot name must be "
+ "in the format of connections.<node_name>.<port_name>, "
+ "but got %s" % flattened_input_key,
+ no_personal_data_message="flattened connection input prot name must be in the format of "
+ "connections.<node_name>.<port_name>",
+ target=ErrorTarget.COMPONENT,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ _, node_name, param_name = flattened_input_key.split(".")
+ dynamic_connections_inputs[node_name].append(param_name)
+ continue
+ if flattened_input_key not in self:
+ dynamic_columns_mapping_keys.append(flattened_input_key)
+
+ self._allow_update_item = True
+ for flattened_input_key in dynamic_columns_mapping_keys:
+ self[flattened_input_key] = Input(type=ComponentParameterTypes.STRING, optional=True)
+ if dynamic_connections_inputs:
+ self[_FlowPortNames.CONNECTIONS] = GroupInput(
+ values={
+ node_name: GroupInput(
+ values={
+ parameter_name: Input(
+ type=ComponentParameterTypes.STRING,
+ )
+ for parameter_name in param_names
+ },
+ _group_class=None,
+ )
+ for node_name, param_names in dynamic_connections_inputs.items()
+ },
+ _group_class=None,
+ )
+ self._allow_update_item = False
+
+ yield
+
+ self._allow_update_item = True
+ for flattened_input_key in dynamic_columns_mapping_keys:
+ del self[flattened_input_key]
+ self[_FlowPortNames.CONNECTIONS] = GroupInput(values={}, _group_class=None)
+ self._allow_update_item = False
+
+
+class FlowComponentOutputDict(_FlowComponentPortDict):
+ """Output port dictionary for FlowComponent, with fixed output ports."""
+
+ def __init__(self) -> None:
+ super().__init__(
+ {
+ _FlowPortNames.FLOW_OUTPUTS: Output(type=AssetTypes.URI_FOLDER),
+ _FlowPortNames.DEBUG_INFO: Output(type=AssetTypes.URI_FOLDER),
+ }
+ )
+
+
+class FlowComponent(Component, AdditionalIncludesMixin):
+ """Flow component version, used to define a Flow Component or Job.
+
+ :keyword name: The name of the Flow job or component.
+ :type name: Optional[str]
+ :keyword version: The version of the Flow job or component.
+ :type version: Optional[str]
+ :keyword description: The description of the component. Defaults to None.
+ :type description: Optional[str]
+ :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None.
+ :type tags: Optional[dict]
+ :keyword display_name: The display name of the component.
+ :type display_name: Optional[str]
+ :keyword flow: The path to the flow directory or flow definition file. Defaults to None and base path of this
+ component will be used as flow directory.
+ :type flow: Optional[Union[str, Path]]
+ :keyword column_mappings: The column mapping for the flow. Defaults to None.
+ :type column_mapping: Optional[dict[str, str]]
+ :keyword variant: The variant of the flow. Defaults to None.
+ :type variant: Optional[str]
+ :keyword connections: The connections for the flow. Defaults to None.
+ :type connections: Optional[dict[str, dict[str, str]]]
+ :keyword environment_variables: The environment variables for the flow. Defaults to None.
+ :type environment_variables: Optional[dict[str, str]]
+ :keyword environment: The environment for the flow component. Defaults to None.
+ :type environment: Optional[Union[str, Environment])
+ :keyword is_deterministic: Specifies whether the Flow will return the same output given the same input.
+ Defaults to True. When True, if a Flow (component) is deterministic and has been run before in the
+ current workspace with the same input and settings, it will reuse results from a previous submitted job
+ when used as a node or step in a pipeline. In that scenario, no compute resources will be used.
+ :type is_deterministic: Optional[bool]
+ :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None.
+ :type additional_includes: Optional[list[str]]
+ :keyword properties: The job property dictionary. Defaults to None.
+ :type properties: Optional[dict[str, str]]
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if FlowComponent cannot be successfully validated.
+ Details will be provided in the error message.
+ """
+
+ def __init__(
+ self,
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict] = None,
+ display_name: Optional[str] = None,
+ flow: Optional[Union[str, Path]] = None,
+ column_mapping: Optional[Dict[str, str]] = None,
+ variant: Optional[str] = None,
+ connections: Optional[Dict[str, Dict[str, str]]] = None,
+ environment_variables: Optional[Dict[str, str]] = None,
+ environment: Optional[Union[str, Environment]] = None,
+ is_deterministic: bool = True,
+ additional_includes: Optional[List] = None,
+ properties: Optional[Dict] = None,
+ **kwargs: Any,
+ ) -> None:
+ # validate init params are valid type
+ kwargs[COMPONENT_TYPE] = NodeType.FLOW_PARALLEL
+
+ # always use flow directory as base path
+ # Note: we suppose that there is no relative path in run.yaml other than flow.
+ # If there are any, we will need to rebase them so that they have the same base path as attributes in
+ # flow.dag.yaml
+ flow_dir, self._flow = self._get_flow_definition(
+ flow=flow,
+ base_path=kwargs.pop(BASE_PATH_CONTEXT_KEY, Path.cwd()),
+ source_path=kwargs.get(SOURCE_PATH_CONTEXT_KEY, None),
+ )
+ kwargs[BASE_PATH_CONTEXT_KEY] = flow_dir
+
+ super().__init__(
+ name=name or self._normalize_component_name(flow_dir.name),
+ version=version or "1",
+ description=description,
+ tags=tags,
+ display_name=display_name,
+ inputs={},
+ outputs={},
+ is_deterministic=is_deterministic,
+ properties=properties,
+ **kwargs,
+ )
+ self._environment = environment
+ self._column_mapping = column_mapping or {}
+ self._variant = variant
+ self._connections = connections or {}
+
+ self._inputs = FlowComponentInputDict()
+ self._outputs = FlowComponentOutputDict()
+
+ if flow:
+ # file existence has been checked in _get_flow_definition
+ # we don't need to rebase additional_includes as we have updated base_path
+ with open(Path(self.base_path, self._flow), "r", encoding="utf-8") as f:
+ flow_content = yaml.safe_load(f.read())
+ additional_includes = flow_content.get("additional_includes", None)
+ # environment variables in run.yaml have higher priority than those in flow.dag.yaml
+ self._environment_variables = flow_content.get("environment_variables", {})
+ self._environment_variables.update(environment_variables or {})
+ else:
+ self._environment_variables = environment_variables or {}
+
+ self._additional_includes = additional_includes or []
+
+ # unlike other Component, code is a private property in FlowComponent and
+ # will be used to store the arm id of the created code before constructing rest object
+ # we haven't used self.flow directly as self.flow can be a path to the flow dag yaml file instead of a directory
+ self._code_arm_id: Optional[str] = None
+
+ # region valid properties
+ @property
+ def flow(self) -> str:
+ """The path to the flow definition file relative to the flow directory.
+
+ :rtype: str
+ """
+ return self._flow
+
+ @property
+ def environment(self) -> Optional[Union[str, Environment]]:
+ """The environment for the flow component. Defaults to None.
+
+ :rtype: Union[str, Environment])
+ """
+ return self._environment
+
+ @environment.setter
+ def environment(self, value: Union[str, Environment]) -> None:
+ """The environment for the flow component. Defaults to None.
+
+ :param value: The column mapping for the flow.
+ :type value: Union[str, Environment])
+ """
+ self._environment = value
+
+ @property
+ def column_mapping(self) -> Dict[str, str]:
+ """The column mapping for the flow. Defaults to None.
+
+ :rtype: Dict[str, str]
+ """
+ return self._column_mapping
+
+ @column_mapping.setter
+ def column_mapping(self, value: Optional[Dict[str, str]]) -> None:
+ """
+ The column mapping for the flow. Defaults to None.
+
+ :param value: The column mapping for the flow.
+ :type value: Optional[Dict[str, str]]
+ """
+ self._column_mapping = value or {}
+
+ @property
+ def variant(self) -> Optional[str]:
+ """The variant of the flow. Defaults to None.
+
+ :rtype: Optional[str]
+ """
+ return self._variant
+
+ @variant.setter
+ def variant(self, value: Optional[str]) -> None:
+ """The variant of the flow. Defaults to None.
+
+ :param value: The variant of the flow.
+ :type value: Optional[str]
+ """
+ self._variant = value
+
+ @property
+ def connections(self) -> Dict[str, Dict[str, str]]:
+ """The connections for the flow. Defaults to None.
+
+ :rtype: Dict[str, Dict[str, str]]
+ """
+ return self._connections
+
+ @connections.setter
+ def connections(self, value: Optional[Dict[str, Dict[str, str]]]) -> None:
+ """
+ The connections for the flow. Defaults to None.
+
+ :param value: The connections for the flow.
+ :type value: Optional[Dict[str, Dict[str, str]]]
+ """
+ self._connections = value or {}
+
+ @property
+ def environment_variables(self) -> Dict[str, str]:
+ """The environment variables for the flow. Defaults to None.
+
+ :rtype: Dict[str, str]
+ """
+ return self._environment_variables
+
+ @environment_variables.setter
+ def environment_variables(self, value: Optional[Dict[str, str]]) -> None:
+ """The environment variables for the flow. Defaults to None.
+
+ :param value: The environment variables for the flow.
+ :type value: Optional[Dict[str, str]]
+ """
+ self._environment_variables = value or {}
+
+ @property
+ def additional_includes(self) -> List:
+ """A list of shared additional files to be included in the component. Defaults to None.
+
+ :rtype: List
+ """
+ return self._additional_includes
+
+ @additional_includes.setter
+ def additional_includes(self, value: Optional[List]) -> None:
+ """A list of shared additional files to be included in the component. Defaults to None.
+ All local additional includes should be relative to the flow directory.
+
+ :param value: A list of shared additional files to be included in the component.
+ :type value: Optional[List]
+ """
+ self._additional_includes = value or []
+
+ # endregion
+
+ @classmethod
+ def _normalize_component_name(cls, value: str) -> str:
+ return value.replace("-", "_")
+
+ # region Component
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+ raise RuntimeError("FlowComponent does not support loading from REST object.")
+
+ def _to_rest_object(self) -> ComponentVersion:
+ rest_obj = super()._to_rest_object()
+ rest_obj.properties.component_spec["code"] = self._code_arm_id
+ rest_obj.properties.component_spec["flow_file_name"] = self._flow
+ return rest_obj
+
+ def _func(self, **kwargs: Any) -> "Parallel": # pylint: disable=invalid-overridden-method
+ from azure.ai.ml.entities._builders.parallel import Parallel
+
+ with self._inputs._fit_inputs(kwargs): # type: ignore[attr-defined]
+ # pylint: disable=not-callable
+ return super()._func(**kwargs) # type: ignore
+
+ @classmethod
+ def _get_flow_definition(
+ cls,
+ base_path: Path,
+ *,
+ flow: Optional[Union[str, os.PathLike]] = None,
+ source_path: Optional[Union[str, os.PathLike]] = None,
+ ) -> Tuple[Path, str]:
+ """
+ Get the path to the flow directory and the file name of the flow dag yaml file.
+ If flow is not specified, we will assume that the source_path is the path to the flow dag yaml file.
+ If flow is specified, it can be either a path to the flow dag yaml file or a path to the flow directory.
+ If flow is a path to the flow directory, we will assume that the flow dag yaml file is named flow.dag.yaml.
+
+ :param base_path: The base path of the flow component.
+ :type base_path: Path
+ :keyword flow: The path to the flow directory or flow definition file. Defaults to None and base path of this
+ component will be used as flow directory.
+ :type flow: Optional[Union[str, Path]]
+ :keyword source_path: The source path of the flow component, should be path to the flow dag yaml file
+ if specified.
+ :type source_path: Optional[Union[str, os.PathLike]]
+ :return: The path to the flow directory and the file name of the flow dag yaml file.
+ :rtype: Tuple[Path, str]
+ """
+ flow_file_name = "flow.dag.yaml"
+
+ if flow is None and source_path is None:
+ raise cls._create_validation_error(
+ message="Either flow or source_path must be specified.",
+ no_personal_data_message="Either flow or source_path must be specified.",
+ )
+
+ if flow is None:
+ # Flow component must be created with a local yaml file, so no need to check if source_path exists
+ if isinstance(source_path, (os.PathLike, str)):
+ flow_file_name = os.path.basename(source_path)
+ return Path(base_path), flow_file_name
+
+ flow_path = Path(flow)
+ if not flow_path.is_absolute():
+ # if flow_path points to a symlink, we still use the parent of the symlink as origin code
+ flow_path = Path(base_path, flow)
+
+ if flow_path.is_dir() and (flow_path / flow_file_name).is_file():
+ return flow_path, flow_file_name
+
+ if flow_path.is_file():
+ return flow_path.parent, flow_path.name
+
+ raise cls._create_validation_error(
+ message="Flow path must be a directory containing flow.dag.yaml or a file, but got %s" % flow_path,
+ no_personal_data_message="Flow path must be a directory or a file",
+ )
+
+ # endregion
+
+ # region SchemaValidatableMixin
+ @classmethod
+ def _load_with_schema(
+ cls, data: Any, *, context: Optional[Any] = None, raise_original_exception: bool = False, **kwargs: Any
+ ) -> Any:
+ # FlowComponent should be loaded with FlowSchema or FlowRunSchema instead of FlowComponentSchema
+ context = context or {BASE_PATH_CONTEXT_KEY: Path.cwd()}
+ _schema = data.get("$schema", None)
+ if _schema == SchemaUrl.PROMPTFLOW_RUN:
+ schema = RunSchema(context=context)
+ elif _schema == SchemaUrl.PROMPTFLOW_FLOW:
+ schema = FlowSchema(context=context)
+ else:
+ raise cls._create_validation_error(
+ message="$schema must be specified correctly for loading component from flow, but got %s" % _schema,
+ no_personal_data_message="$schema must be specified for loading component from flow",
+ )
+
+ # unlike other component, we should ignore unknown fields in flow to keep init_params clean and avoid
+ # too much understanding of flow.dag.yaml & run.yaml
+ kwargs["unknown"] = EXCLUDE
+ try:
+ loaded_dict = schema.load(data, **kwargs)
+ except ValidationError as e:
+ if raise_original_exception:
+ raise e
+ msg = "Trying to load data with schema failed. Data:\n%s\nError: %s" % (
+ json.dumps(data, indent=4) if isinstance(data, dict) else data,
+ json.dumps(e.messages, indent=4),
+ )
+ raise cls._create_validation_error(
+ message=msg,
+ no_personal_data_message=str(e),
+ ) from e
+ loaded_dict.update(loaded_dict.pop(PROMPTFLOW_AZUREML_OVERRIDE_KEY, {}))
+ return loaded_dict
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return FlowComponentSchema(context=context)
+
+ # endregion
+
+ # region AdditionalIncludesMixin
+ def _get_origin_code_value(self) -> Union[str, os.PathLike, None]:
+ if self._code_arm_id:
+ return self._code_arm_id
+ res: Union[str, os.PathLike, None] = self.base_path
+ return res
+
+ def _fill_back_code_value(self, value: str) -> None:
+ self._code_arm_id = value
+
+ @contextlib.contextmanager
+ def _try_build_local_code(self) -> Generator:
+ # false-positive by pylint, hence disable it
+ # (https://github.com/pylint-dev/pylint/blob/main/doc/data/messages
+ # /c/contextmanager-generator-missing-cleanup/details.rst)
+ with super()._try_build_local_code() as code: # pylint:disable=contextmanager-generator-missing-cleanup
+ if not code or not code.path:
+ yield code
+ return
+
+ if not (Path(code.path) / ".promptflow" / "flow.tools.json").is_file():
+ raise self._create_validation_error(
+ message="Flow component must be created with a ./promptflow/flow.tools.json, "
+ "please run `pf flow validate` to generate it or skip it in your ignore file.",
+ no_personal_data_message="Flow component must be created with a ./promptflow/flow.tools.json, "
+ "please run `pf flow validate` to generate it or skip it in your ignore file.",
+ )
+ # TODO: should we remove additional includes from flow.dag.yaml? for now we suppose it will be removed
+ # by mldesigner compile if needed
+
+ yield code
+
+ # endregion
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py
new file mode 100644
index 00000000..13464a06
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py
@@ -0,0 +1,96 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from pathlib import Path
+from typing import Any, Dict, Optional, Union
+
+from marshmallow import Schema
+
+from azure.ai.ml._schema.component.import_component import ImportComponentSchema
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+
+from ..._schema import PathAwareSchema
+from ..._utils.utils import parse_args_description_from_docstring
+from .._util import convert_ordered_dict_to_dict
+from .component import Component
+
+
+class ImportComponent(Component):
+ """Import component version, used to define an import component.
+
+ :param name: Name of the component.
+ :type name: str
+ :param version: Version of the component.
+ :type version: str
+ :param description: Description of the component.
+ :type description: str
+ :param tags: Tag dictionary. Tags can be added, removed, and updated.
+ :type tags: dict
+ :param display_name: Display name of the component.
+ :type display_name: str
+ :param source: Input source parameters of the component.
+ :type source: dict
+ :param output: Output of the component.
+ :type output: dict
+ :param is_deterministic: Whether the command component is deterministic. Defaults to True.
+ :type is_deterministic: bool
+ :param kwargs: Additional parameters for the import component.
+ """
+
+ def __init__(
+ self,
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict] = None,
+ display_name: Optional[str] = None,
+ source: Optional[Dict] = None,
+ output: Optional[Dict] = None,
+ is_deterministic: bool = True,
+ **kwargs: Any,
+ ) -> None:
+ kwargs[COMPONENT_TYPE] = NodeType.IMPORT
+ # Set default base path
+ if BASE_PATH_CONTEXT_KEY not in kwargs:
+ kwargs[BASE_PATH_CONTEXT_KEY] = Path(".")
+
+ super().__init__(
+ name=name,
+ version=version,
+ description=description,
+ tags=tags,
+ display_name=display_name,
+ inputs=source,
+ outputs={"output": output} if output else None,
+ is_deterministic=is_deterministic,
+ **kwargs,
+ )
+
+ self.source = source
+ self.output = output
+
+ def _to_dict(self) -> Dict:
+ # TODO: Bug Item number: 2897665
+ res: Dict = convert_ordered_dict_to_dict( # type: ignore
+ {**self._other_parameter, **super(ImportComponent, self)._to_dict()}
+ )
+ return res
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return ImportComponentSchema(context=context)
+
+ @classmethod
+ def _parse_args_description_from_docstring(cls, docstring: str) -> Dict:
+ res: dict = parse_args_description_from_docstring(docstring)
+ return res
+
+ def __str__(self) -> str:
+ try:
+ toYaml: str = self._to_yaml()
+ return toYaml
+ except BaseException: # pylint: disable=W0718
+ toStr: str = super(ImportComponent, self).__str__()
+ return toStr
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py
new file mode 100644
index 00000000..3f29b1e1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py
@@ -0,0 +1,305 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import json
+import os
+import re
+from typing import Any, Dict, List, Optional, Union, cast
+
+from marshmallow import Schema
+
+from azure.ai.ml._restclient.v2022_10_01.models import ComponentVersion
+from azure.ai.ml._schema.component.parallel_component import ParallelComponentSchema
+from azure.ai.ml.constants._common import COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration
+from azure.ai.ml.entities._job.parallel.parallel_task import ParallelTask
+from azure.ai.ml.entities._job.parallel.parameterized_parallel import ParameterizedParallel
+from azure.ai.ml.entities._job.parallel.retry_settings import RetrySettings
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+from ..._schema import PathAwareSchema
+from .._util import validate_attribute_type
+from .._validation import MutableValidationResult
+from .code import ComponentCodeMixin
+from .component import Component
+
+
+class ParallelComponent(
+ Component, ParameterizedParallel, ComponentCodeMixin
+): # pylint: disable=too-many-instance-attributes
+ """Parallel component version, used to define a parallel component.
+
+ :param name: Name of the component. Defaults to None
+ :type name: str
+ :param version: Version of the component. Defaults to None
+ :type version: str
+ :param description: Description of the component. Defaults to None
+ :type description: str
+ :param tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None
+ :type tags: dict
+ :param display_name: Display name of the component. Defaults to None
+ :type display_name: str
+ :param retry_settings: parallel component run failed retry. Defaults to None
+ :type retry_settings: BatchRetrySettings
+ :param logging_level: A string of the logging level name. Defaults to None
+ :type logging_level: str
+ :param max_concurrency_per_instance: The max parallellism that each compute instance has. Defaults to None
+ :type max_concurrency_per_instance: int
+ :param error_threshold: The number of item processing failures should be ignored. Defaults to None
+ :type error_threshold: int
+ :param mini_batch_error_threshold: The number of mini batch processing failures should be ignored. Defaults to None
+ :type mini_batch_error_threshold: int
+ :param task: The parallel task. Defaults to None
+ :type task: ParallelTask
+ :param mini_batch_size: For FileDataset input, this field is the number of files a user script can process
+ in one run() call. For TabularDataset input, this field is the approximate size of data the user script
+ can process in one run() call. Example values are 1024, 1024KB, 10MB, and 1GB.
+ (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
+ through PipelineParameter.
+ :type mini_batch_size: str
+ :param partition_keys: The keys used to partition dataset into mini-batches. Defaults to None
+ If specified, the data with the same key will be partitioned into the same mini-batch.
+ If both partition_keys and mini_batch_size are specified, partition_keys will take effect.
+ The input(s) must be partitioned dataset(s),
+ and the partition_keys must be a subset of the keys of every input dataset for this to work.
+ :type partition_keys: list
+ :param input_data: The input data. Defaults to None
+ :type input_data: str
+ :param resources: Compute Resource configuration for the component. Defaults to None
+ :type resources: Union[dict, ~azure.ai.ml.entities.JobResourceConfiguration]
+ :param inputs: Inputs of the component. Defaults to None
+ :type inputs: dict
+ :param outputs: Outputs of the component. Defaults to None
+ :type outputs: dict
+ :param code: promoted property from task.code
+ :type code: str
+ :param instance_count: promoted property from resources.instance_count. Defaults to None
+ :type instance_count: int
+ :param is_deterministic: Whether the parallel component is deterministic. Defaults to True
+ :type is_deterministic: bool
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if ParallelComponent cannot be successfully validated.
+ Details will be provided in the error message.
+ """
+
+ def __init__( # pylint: disable=too-many-locals
+ self,
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict[str, Any]] = None,
+ display_name: Optional[str] = None,
+ retry_settings: Optional[RetrySettings] = None,
+ logging_level: Optional[str] = None,
+ max_concurrency_per_instance: Optional[int] = None,
+ error_threshold: Optional[int] = None,
+ mini_batch_error_threshold: Optional[int] = None,
+ task: Optional[ParallelTask] = None,
+ mini_batch_size: Optional[str] = None,
+ partition_keys: Optional[List] = None,
+ input_data: Optional[str] = None,
+ resources: Optional[JobResourceConfiguration] = None,
+ inputs: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ code: Optional[str] = None, # promoted property from task.code
+ instance_count: Optional[int] = None, # promoted property from resources.instance_count
+ is_deterministic: bool = True,
+ **kwargs: Any,
+ ):
+ # validate init params are valid type
+ validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+ kwargs[COMPONENT_TYPE] = NodeType.PARALLEL
+
+ super().__init__(
+ name=name,
+ version=version,
+ description=description,
+ tags=tags,
+ display_name=display_name,
+ inputs=inputs,
+ outputs=outputs,
+ is_deterministic=is_deterministic,
+ **kwargs,
+ )
+
+ # No validation on value passed here because in pipeline job, required code&environment maybe absent
+ # and fill in later with job defaults.
+ self.task = task
+ self.mini_batch_size: int = 0
+ self.partition_keys = partition_keys
+ self.input_data = input_data
+ self.retry_settings = retry_settings
+ self.logging_level = logging_level
+ self.max_concurrency_per_instance = max_concurrency_per_instance
+ self.error_threshold = error_threshold
+ self.mini_batch_error_threshold = mini_batch_error_threshold
+ self.resources = resources
+
+ # check mutual exclusivity of promoted properties
+ if self.resources is not None and instance_count is not None:
+ msg = "instance_count and resources are mutually exclusive"
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.COMPONENT,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ self.instance_count = instance_count
+ self.code = code
+
+ if mini_batch_size is not None:
+ # Convert str to int.
+ pattern = re.compile(r"^\d+([kKmMgG][bB])*$")
+ if not pattern.match(mini_batch_size):
+ raise ValueError(r"Parameter mini_batch_size must follow regex rule ^\d+([kKmMgG][bB])*$")
+
+ try:
+ self.mini_batch_size = int(mini_batch_size)
+ except ValueError as e:
+ unit = mini_batch_size[-2:].lower()
+ if unit == "kb":
+ self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024
+ elif unit == "mb":
+ self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024
+ elif unit == "gb":
+ self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024 * 1024
+ else:
+ raise ValueError("mini_batch_size unit must be kb, mb or gb") from e
+
+ @property
+ def instance_count(self) -> Optional[int]:
+ """Return value of promoted property resources.instance_count.
+
+ :return: Value of resources.instance_count.
+ :rtype: Optional[int]
+ """
+ return self.resources.instance_count if self.resources and not isinstance(self.resources, dict) else None
+
+ @instance_count.setter
+ def instance_count(self, value: int) -> None:
+ """Set the value of the promoted property resources.instance_count.
+
+ :param value: The value to set for resources.instance_count.
+ :type value: int
+ """
+ if not value:
+ return
+ if not self.resources:
+ self.resources = JobResourceConfiguration(instance_count=value)
+ else:
+ if not isinstance(self.resources, dict):
+ self.resources.instance_count = value
+
+ @property
+ def code(self) -> Optional[str]:
+ """Return value of promoted property task.code, which is a local or
+ remote path pointing at source code.
+
+ :return: Value of task.code.
+ :rtype: Optional[str]
+ """
+ return self.task.code if self.task else None
+
+ @code.setter
+ def code(self, value: str) -> None:
+ """Set the value of the promoted property task.code.
+
+ :param value: The value to set for task.code.
+ :type value: str
+ """
+ if not value:
+ return
+ if not self.task:
+ self.task = ParallelTask(code=value)
+ else:
+ self.task.code = value
+
+ def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+ """Dump the component content into a sorted yaml string.
+
+ :return: The ordered dict
+ :rtype: Dict
+ """
+
+ obj: dict = super()._to_ordered_dict_for_yaml_dump()
+ # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+ if self.code and isinstance(self.code, str):
+ obj["task"]["code"] = self.code
+ return obj
+
+ @property
+ def environment(self) -> Optional[str]:
+ """Return value of promoted property task.environment, indicate the
+ environment that training job will run in.
+
+ :return: Value of task.environment.
+ :rtype: Optional[Environment, str]
+ """
+ if self.task:
+ return cast(Optional[str], self.task.environment)
+ return None
+
+ @environment.setter
+ def environment(self, value: str) -> None:
+ """Set the value of the promoted property task.environment.
+
+ :param value: The value to set for task.environment.
+ :type value: str
+ """
+ if not value:
+ return
+ if not self.task:
+ self.task = ParallelTask(environment=value)
+ else:
+ self.task.environment = value
+
+ def _customized_validate(self) -> MutableValidationResult:
+ validation_result = super()._customized_validate()
+ self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
+ return validation_result
+
+ @classmethod
+ def _attr_type_map(cls) -> dict:
+ return {
+ "retry_settings": (dict, RetrySettings),
+ "task": (dict, ParallelTask),
+ "logging_level": str,
+ "max_concurrency_per_instance": int,
+ "input_data": str,
+ "error_threshold": int,
+ "mini_batch_error_threshold": int,
+ "code": (str, os.PathLike),
+ "resources": (dict, JobResourceConfiguration),
+ }
+
+ def _to_rest_object(self) -> ComponentVersion:
+ rest_object = super()._to_rest_object()
+ # schema required list while backend accept json string
+ if self.partition_keys:
+ rest_object.properties.component_spec["partition_keys"] = json.dumps(self.partition_keys)
+ return rest_object
+
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+ # schema required list while backend accept json string
+ # update rest obj as it will be
+ partition_keys = obj.properties.component_spec.get("partition_keys", None)
+ if partition_keys:
+ obj.properties.component_spec["partition_keys"] = json.loads(partition_keys)
+ res: dict = super()._from_rest_object_to_init_params(obj)
+ return res
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return ParallelComponentSchema(context=context)
+
+ def __str__(self) -> str:
+ try:
+ toYaml: str = self._to_yaml()
+ return toYaml
+ except BaseException: # pylint: disable=W0718
+ toStr: str = super(ParallelComponent, self).__str__()
+ return toStr
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py
new file mode 100644
index 00000000..229b714d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py
@@ -0,0 +1,529 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import json
+import logging
+import os
+import re
+import time
+import typing
+from collections import Counter
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+from marshmallow import Schema
+
+from azure.ai.ml._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties
+from azure.ai.ml._schema import PathAwareSchema
+from azure.ai.ml._schema.pipeline.pipeline_component import PipelineComponentSchema
+from azure.ai.ml._utils._asset_utils import get_object_hash
+from azure.ai.ml._utils.utils import hash_dict, is_data_binding_expression
+from azure.ai.ml.constants._common import ARM_ID_PREFIX, ASSET_ARM_ID_REGEX_FORMAT, COMPONENT_TYPE
+from azure.ai.ml.constants._component import ComponentSource, NodeType
+from azure.ai.ml.constants._job.pipeline import ValidationErrorCode
+from azure.ai.ml.entities._builders import BaseNode, Command
+from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode, LoopNode
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._inputs_outputs import GroupInput, Input
+from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+from azure.ai.ml.entities._job.pipeline._attr_dict import has_attr_safe, try_get_non_arbitrary_attr
+from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression
+from azure.ai.ml.entities._validation import MutableValidationResult
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+module_logger = logging.getLogger(__name__)
+
+
+class PipelineComponent(Component):
+ """Pipeline component, currently used to store components in an azure.ai.ml.dsl.pipeline.
+
+ :param name: Name of the component.
+ :type name: str
+ :param version: Version of the component.
+ :type version: str
+ :param description: Description of the component.
+ :type description: str
+ :param tags: Tag dictionary. Tags can be added, removed, and updated.
+ :type tags: dict
+ :param display_name: Display name of the component.
+ :type display_name: str
+ :param inputs: Component inputs.
+ :type inputs: dict
+ :param outputs: Component outputs.
+ :type outputs: dict
+ :param jobs: Id to components dict inside the pipeline definition.
+ :type jobs: Dict[str, ~azure.ai.ml.entities._builders.BaseNode]
+ :param is_deterministic: Whether the pipeline component is deterministic.
+ :type is_deterministic: bool
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if PipelineComponent cannot be successfully validated.
+ Details will be provided in the error message.
+ """
+
+ def __init__(
+ self,
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict] = None,
+ display_name: Optional[str] = None,
+ inputs: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ jobs: Optional[Dict[str, BaseNode]] = None,
+ is_deterministic: Optional[bool] = None,
+ **kwargs: Any,
+ ) -> None:
+ kwargs[COMPONENT_TYPE] = NodeType.PIPELINE
+ super().__init__(
+ name=name,
+ version=version,
+ description=description,
+ tags=tags,
+ display_name=display_name,
+ inputs=inputs,
+ outputs=outputs,
+ is_deterministic=is_deterministic, # type: ignore[arg-type]
+ **kwargs,
+ )
+ self._jobs = self._process_jobs(jobs) if jobs else {}
+ # for telemetry
+ self._job_types, self._job_sources = self._get_job_type_and_source()
+ # Private support: create pipeline component from pipeline job
+ self._source_job_id = kwargs.pop("source_job_id", None)
+ # TODO: set anonymous hash for reuse
+
+ def _process_jobs(self, jobs: Dict[str, BaseNode]) -> Dict[str, BaseNode]:
+ """Process and validate jobs.
+
+ :param jobs: A map of node name to node
+ :type jobs: Dict[str, BaseNode]
+ :return: The processed jobs
+ :rtype: Dict[str, BaseNode]
+ """
+ # Remove swept Command
+ node_names_to_skip = []
+ for node_name, job_instance in jobs.items():
+ if isinstance(job_instance, Command) and job_instance._swept is True:
+ node_names_to_skip.append(node_name)
+
+ for key in node_names_to_skip:
+ del jobs[key]
+
+ # Set path and validate node type.
+ for _, job_instance in jobs.items():
+ if isinstance(job_instance, BaseNode):
+ job_instance._set_base_path(self.base_path)
+
+ if not isinstance(job_instance, (BaseNode, AutoMLJob, ControlFlowNode)):
+ msg = f"Not supported pipeline job type: {type(job_instance)}"
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.PIPELINE,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ return jobs
+
+ def _customized_validate(self) -> MutableValidationResult:
+ """Validate pipeline component structure.
+
+ :return: The validation result
+ :rtype: MutableValidationResult
+ """
+ validation_result = super(PipelineComponent, self)._customized_validate()
+
+ # Validate inputs
+ for input_name, input_value in self.inputs.items():
+ if input_value.type is None:
+ validation_result.append_error(
+ yaml_path="inputs.{}".format(input_name),
+ message="Parameter type unknown, please add type annotation or specify input default value.",
+ error_code=ValidationErrorCode.PARAMETER_TYPE_UNKNOWN,
+ )
+
+ # Validate all nodes
+ for node_name, node in self.jobs.items():
+ if isinstance(node, BaseNode):
+ # Node inputs will be validated.
+ validation_result.merge_with(node._validate(), "jobs.{}".format(node_name))
+ if isinstance(node.component, Component):
+ # Validate binding if not remote resource.
+ validation_result.merge_with(self._validate_binding_inputs(node))
+ elif isinstance(node, AutoMLJob):
+ pass
+ elif isinstance(node, ControlFlowNode):
+ # Validate control flow node.
+ validation_result.merge_with(node._validate(), "jobs.{}".format(node_name))
+ else:
+ validation_result.append_error(
+ yaml_path="jobs.{}".format(node_name),
+ message=f"Not supported pipeline job type: {type(node)}",
+ )
+
+ return validation_result
+
+ def _validate_compute_is_set(self, *, parent_node_name: Optional[str] = None) -> MutableValidationResult:
+ """Validate compute in pipeline component.
+
+ This function will only be called from pipeline_job._validate_compute_is_set
+ when both of the pipeline_job.compute and pipeline_job.settings.default_compute is None.
+ Rules:
+ - For pipeline node: will call node._component._validate_compute_is_set to validate node compute in sub graph.
+ - For general node:
+ - If _skip_required_compute_missing_validation is True, validation will be skipped.
+ - All the rest of cases without compute will add compute not set error to validation result.
+
+ :keyword parent_node_name: The name of the parent node.
+ :type parent_node_name: Optional[str]
+ :return: The validation result
+ :rtype: MutableValidationResult
+ """
+
+ # Note: do not put this into customized validate, as we would like call
+ # this from pipeline_job._validate_compute_is_set
+ validation_result = self._create_empty_validation_result()
+ no_compute_nodes = []
+ parent_node_name = parent_node_name if parent_node_name else ""
+ for node_name, node in self.jobs.items():
+ full_node_name = f"{parent_node_name}{node_name}.jobs."
+ if node.type == NodeType.PIPELINE and isinstance(node._component, PipelineComponent):
+ validation_result.merge_with(node._component._validate_compute_is_set(parent_node_name=full_node_name))
+ continue
+ if isinstance(node, BaseNode) and node._skip_required_compute_missing_validation:
+ continue
+ if has_attr_safe(node, "compute") and node.compute is None:
+ no_compute_nodes.append(node_name)
+
+ for node_name in no_compute_nodes:
+ validation_result.append_error(
+ yaml_path=f"jobs.{parent_node_name}{node_name}.compute",
+ message="Compute not set",
+ )
+ return validation_result
+
+ def _get_input_binding_dict(self, node: BaseNode) -> Tuple[dict, dict]:
+ """Return the input binding dict for each node.
+
+ :param node: The node
+ :type node: BaseNode
+ :return: A 2-tuple of (binding_dict, optional_binding_in_expression_dict)
+ :rtype: Tuple[dict, dict]
+ """
+ # pylint: disable=too-many-nested-blocks
+ binding_inputs = node._build_inputs()
+ # Collect binding relation dict {'pipeline_input': ['node_input']}
+ binding_dict: dict = {}
+ optional_binding_in_expression_dict: dict = {}
+ for component_input_name, component_binding_input in binding_inputs.items():
+ if isinstance(component_binding_input, PipelineExpression):
+ for pipeline_input_name in component_binding_input._inputs.keys():
+ if pipeline_input_name not in self.inputs:
+ continue
+ if pipeline_input_name not in binding_dict:
+ binding_dict[pipeline_input_name] = []
+ binding_dict[pipeline_input_name].append(component_input_name)
+ if pipeline_input_name not in optional_binding_in_expression_dict:
+ optional_binding_in_expression_dict[pipeline_input_name] = []
+ optional_binding_in_expression_dict[pipeline_input_name].append(pipeline_input_name)
+ else:
+ if isinstance(component_binding_input, Input):
+ component_binding_input = component_binding_input.path
+ if is_data_binding_expression(component_binding_input, ["parent"]):
+ # data binding may have more than one PipelineInput now
+ for pipeline_input_name in PipelineExpression.parse_pipeline_inputs_from_data_binding(
+ component_binding_input
+ ):
+ if pipeline_input_name not in self.inputs:
+ continue
+ if pipeline_input_name not in binding_dict:
+ binding_dict[pipeline_input_name] = []
+ binding_dict[pipeline_input_name].append(component_input_name)
+ # for data binding expression "${{parent.inputs.pipeline_input}}", it should not be optional
+ if len(component_binding_input.replace("${{parent.inputs." + pipeline_input_name + "}}", "")):
+ if pipeline_input_name not in optional_binding_in_expression_dict:
+ optional_binding_in_expression_dict[pipeline_input_name] = []
+ optional_binding_in_expression_dict[pipeline_input_name].append(pipeline_input_name)
+ return binding_dict, optional_binding_in_expression_dict
+
+ def _validate_binding_inputs(self, node: BaseNode) -> MutableValidationResult:
+ """Validate pipeline binding inputs and return all used pipeline input names.
+
+ Mark input as optional if all binding is optional and optional not set. Raise error if pipeline input is
+ optional but link to required inputs.
+
+ :param node: The node to validate
+ :type node: BaseNode
+ :return: The validation result
+ :rtype: MutableValidationResult
+ """
+ component_definition_inputs = {}
+ # Add flattened group input into definition inputs.
+ # e.g. Add {'group_name.item': PipelineInput} for {'group_name': GroupInput}
+ for name, val in node.component.inputs.items():
+ if isinstance(val, GroupInput):
+ component_definition_inputs.update(val.flatten(group_parameter_name=name))
+ component_definition_inputs[name] = val
+ # Collect binding relation dict {'pipeline_input': ['node_input']}
+ validation_result = self._create_empty_validation_result()
+ binding_dict, optional_binding_in_expression_dict = self._get_input_binding_dict(node)
+
+ # Validate links required and optional
+ for pipeline_input_name, binding_inputs in binding_dict.items():
+ pipeline_input = self.inputs[pipeline_input_name]
+ required_bindings = []
+ for name in binding_inputs:
+ # not check optional/required for pipeline input used in pipeline expression
+ if name in optional_binding_in_expression_dict.get(pipeline_input_name, []):
+ continue
+ if name in component_definition_inputs and component_definition_inputs[name].optional is not True:
+ required_bindings.append(f"{node.name}.inputs.{name}")
+ if pipeline_input.optional is None and not required_bindings:
+ # Set input as optional if all binding is optional and optional not set.
+ pipeline_input.optional = True
+ pipeline_input._is_inferred_optional = True
+ elif pipeline_input.optional is True and required_bindings:
+ if pipeline_input._is_inferred_optional:
+ # Change optional=True to None if is inferred by us
+ pipeline_input.optional = None
+ else:
+ # Raise exception if pipeline input is optional set by user but link to required inputs.
+ validation_result.append_error(
+ yaml_path="inputs.{}".format(pipeline_input._port_name),
+ message=f"Pipeline optional Input binding to required inputs: {required_bindings}",
+ )
+ return validation_result
+
+ def _get_job_type_and_source(self) -> Tuple[Dict[str, int], Dict[str, int]]:
+ """Get job types and sources for telemetry.
+
+ :return: A 2-tuple of
+ * A map of job type to the number of occurrences
+ * A map of job source to the number of occurrences
+ :rtype: Tuple[Dict[str, int], Dict[str, int]]
+ """
+ job_types: list = []
+ job_sources = []
+ for job in self.jobs.values():
+ job_types.append(job.type)
+ if isinstance(job, BaseNode):
+ job_sources.append(job._source)
+ elif isinstance(job, AutoMLJob):
+ # Consider all automl_job has builder type for now,
+ # as it's not easy to distinguish their source(yaml/builder).
+ job_sources.append(ComponentSource.BUILDER)
+ else:
+ # Fall back to CLASS
+ job_sources.append(ComponentSource.CLASS)
+ return dict(Counter(job_types)), dict(Counter(job_sources))
+
+ @property
+ def jobs(self) -> Dict[str, BaseNode]:
+ """Return a dictionary from component variable name to component object.
+
+ :return: Dictionary mapping component variable names to component objects.
+ :rtype: Dict[str, ~azure.ai.ml.entities._builders.BaseNode]
+ """
+ return self._jobs
+
+ def _get_anonymous_hash(self) -> str:
+ """Get anonymous hash for pipeline component.
+
+ :return: The anonymous hash of the pipeline component
+ :rtype: str
+ """
+ # ideally we should always use rest object to generate hash as it's the same as
+ # what we send to server-side, but changing the hash function will break reuse of
+ # existing components except for command component (hash result is the same for
+ # command component), so we just use rest object to generate hash for pipeline component,
+ # which doesn't have reuse issue.
+ component_interface_dict = self._to_rest_object().properties.component_spec
+ # Hash local inputs in pipeline component jobs
+ for job_name, job in self.jobs.items():
+ if getattr(job, "inputs", None):
+ for input_name, input_value in job.inputs.items():
+ try:
+ if (
+ getattr(input_value, "_data", None)
+ and isinstance(input_value._data, Input)
+ and input_value.path
+ and os.path.exists(input_value.path)
+ ):
+ start_time = time.time()
+ component_interface_dict["jobs"][job_name]["inputs"][input_name]["content_hash"] = (
+ get_object_hash(input_value.path)
+ )
+ module_logger.debug(
+ "Takes %s seconds to calculate the content hash of local input %s",
+ time.time() - start_time,
+ input_value.path,
+ )
+ except ValidationException:
+ pass
+ hash_value: str = hash_dict(
+ component_interface_dict,
+ keys_to_omit=[
+ # omit name since anonymous component will have same name
+ "name",
+ # omit _source since it doesn't impact component's uniqueness
+ "_source",
+ # omit id since it will be set after component is registered
+ "id",
+ # omit version since it will be set to this hash later
+ "version",
+ ],
+ )
+ return hash_value
+
+ @classmethod
+ def _load_from_rest_pipeline_job(cls, data: Dict) -> "PipelineComponent":
+ # TODO: refine this?
+ # Set type as None here to avoid schema validation failed
+ definition_inputs = {p: {"type": None} for p in data.get("inputs", {}).keys()}
+ definition_outputs = {p: {"type": None} for p in data.get("outputs", {}).keys()}
+ return PipelineComponent(
+ display_name=data.get("display_name"),
+ description=data.get("description"),
+ inputs=definition_inputs,
+ outputs=definition_outputs,
+ jobs=data.get("jobs"),
+ _source=ComponentSource.REMOTE_WORKSPACE_JOB,
+ )
+
+ @classmethod
+ def _resolve_sub_nodes(cls, rest_jobs: Dict) -> Dict:
+ from azure.ai.ml.entities._job.pipeline._load_component import pipeline_node_factory
+
+ sub_nodes = {}
+ if rest_jobs is None:
+ return sub_nodes
+ for node_name, node in rest_jobs.items():
+ # TODO: Remove this ad-hoc fix after unified arm id format in object
+ component_id = node.get("componentId", "")
+ if isinstance(component_id, str) and re.match(ASSET_ARM_ID_REGEX_FORMAT, component_id):
+ node["componentId"] = component_id[len(ARM_ID_PREFIX) :]
+ if not LoopNode._is_loop_node_dict(node):
+ # skip resolve LoopNode first since it may reference other nodes
+ # use node factory instead of BaseNode._from_rest_object here as AutoMLJob is not a BaseNode
+ sub_nodes[node_name] = pipeline_node_factory.load_from_rest_object(obj=node)
+ for node_name, node in rest_jobs.items():
+ if LoopNode._is_loop_node_dict(node):
+ # resolve LoopNode after all other nodes are resolved
+ sub_nodes[node_name] = pipeline_node_factory.load_from_rest_object(obj=node, pipeline_jobs=sub_nodes)
+ return sub_nodes
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return PipelineComponentSchema(context=context)
+
+ @classmethod
+ def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]:
+ # jobs validations are done in _customized_validate()
+ return ["jobs"]
+
+ @classmethod
+ def _check_ignored_keys(cls, obj: object) -> List[str]:
+ """Return ignored keys in obj as a pipeline component when its value be set.
+
+ :param obj: The object to examine
+ :type obj: object
+ :return: List of keys to ignore
+ :rtype: List[str]
+ """
+ examine_mapping = {
+ "compute": lambda val: val is not None,
+ "settings": lambda val: val is not None and any(v is not None for v in val._to_dict().values()),
+ }
+ # Avoid new attr added by use `try_get_non...` instead of `hasattr` or `getattr` directly.
+ return [k for k, has_set in examine_mapping.items() if has_set(try_get_non_arbitrary_attr(obj, k))]
+
+ def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict:
+ telemetry_values: dict = super()._get_telemetry_values()
+ telemetry_values.update(
+ {
+ "source": self._source,
+ "node_count": len(self.jobs),
+ "node_type": json.dumps(self._job_types),
+ "node_source": json.dumps(self._job_sources),
+ }
+ )
+ return telemetry_values
+
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+ # Pop jobs to avoid it goes with schema load
+ jobs = obj.properties.component_spec.pop("jobs", None)
+ init_params_dict: dict = super()._from_rest_object_to_init_params(obj)
+ if jobs:
+ try:
+ init_params_dict["jobs"] = PipelineComponent._resolve_sub_nodes(jobs)
+ except Exception as e: # pylint: disable=W0718
+ # Skip parse jobs if error exists.
+ # TODO: https://msdata.visualstudio.com/Vienna/_workitems/edit/2052262
+ module_logger.debug("Parse pipeline component jobs failed with: %s", e)
+ return init_params_dict
+
+ def _to_dict(self) -> Dict:
+ return {**self._other_parameter, **super()._to_dict()}
+
+ def _build_rest_component_jobs(self) -> Dict[str, dict]:
+ """Build pipeline component jobs to rest.
+
+ :return: A map of job name to rest objects
+ :rtype: Dict[str, dict]
+ """
+ # Build the jobs to dict
+ rest_component_jobs = {}
+ for job_name, job in self.jobs.items():
+ if isinstance(job, (BaseNode, ControlFlowNode)):
+ rest_node_dict = job._to_rest_object()
+ elif isinstance(job, AutoMLJob):
+ rest_node_dict = json.loads(json.dumps(job._to_dict(inside_pipeline=True)))
+ else:
+ msg = f"Non supported job type in Pipeline jobs: {type(job)}"
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.PIPELINE,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ rest_component_jobs[job_name] = rest_node_dict
+ return rest_component_jobs
+
+ def _to_rest_object(self) -> ComponentVersion:
+ """Check ignored keys and return rest object.
+
+ :return: The component version
+ :rtype: ComponentVersion
+ """
+ ignored_keys = self._check_ignored_keys(self)
+ if ignored_keys:
+ module_logger.warning("%s ignored on pipeline component %r.", ignored_keys, self.name)
+ component = self._to_dict()
+ # add source type to component rest object
+ component["_source"] = self._source
+ component["jobs"] = self._build_rest_component_jobs()
+ component["sourceJobId"] = self._source_job_id
+ if self._intellectual_property:
+ # hack while full pass through supported is worked on for IPP fields
+ component.pop("intellectual_property")
+ component["intellectualProperty"] = self._intellectual_property._to_rest_object().serialize()
+ properties = ComponentVersionProperties(
+ component_spec=component,
+ description=self.description,
+ is_anonymous=self._is_anonymous,
+ properties=self.properties,
+ tags=self.tags,
+ )
+ result = ComponentVersion(properties=properties)
+ result.name = self.name
+ return result
+
+ def __str__(self) -> str:
+ try:
+ toYaml: str = self._to_yaml()
+ return toYaml
+ except BaseException: # pylint: disable=W0718
+ toStr: str = super(PipelineComponent, self).__str__()
+ return toStr
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py
new file mode 100644
index 00000000..7da65fb6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py
@@ -0,0 +1,211 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os
+from typing import Any, Dict, List, Optional, Union
+
+from marshmallow import Schema
+
+from azure.ai.ml._schema.component.spark_component import SparkComponentSchema
+from azure.ai.ml.constants._common import COMPONENT_TYPE
+from azure.ai.ml.constants._component import NodeType
+from azure.ai.ml.constants._job.job import RestSparkConfKey
+from azure.ai.ml.entities._assets import Environment
+from azure.ai.ml.entities._job.parameterized_spark import ParameterizedSpark
+
+from ..._schema import PathAwareSchema
+from .._job.spark_job_entry_mixin import SparkJobEntry, SparkJobEntryMixin
+from .._util import convert_ordered_dict_to_dict, validate_attribute_type
+from .._validation import MutableValidationResult
+from ._additional_includes import AdditionalIncludesMixin
+from .component import Component
+
+
+class SparkComponent(
+ Component, ParameterizedSpark, SparkJobEntryMixin, AdditionalIncludesMixin
+): # pylint: disable=too-many-instance-attributes
+ """Spark component version, used to define a Spark Component or Job.
+
+ :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing
+ to a remote location. Defaults to ".", indicating the current directory.
+ :type code: Union[str, os.PathLike]
+ :keyword entry: The file or class entry point.
+ :paramtype entry: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkJobEntry]]
+ :keyword py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps. Defaults to None.
+ :paramtype py_files: Optional[List[str]]
+ :keyword jars: The list of .JAR files to include on the driver and executor classpaths. Defaults to None.
+ :paramtype jars: Optional[List[str]]
+ :keyword files: The list of files to be placed in the working directory of each executor. Defaults to None.
+ :paramtype files: Optional[List[str]]
+ :keyword archives: The list of archives to be extracted into the working directory of each executor.
+ Defaults to None.
+ :paramtype archives: Optional[List[str]]
+ :keyword driver_cores: The number of cores to use for the driver process, only in cluster mode.
+ :paramtype driver_cores: Optional[int]
+ :keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
+ suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+ :paramtype driver_memory: Optional[str]
+ :keyword executor_cores: The number of cores to use on each executor.
+ :paramtype executor_cores: Optional[int]
+ :keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
+ suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+ :paramtype executor_memory: Optional[str]
+ :keyword executor_instances: The initial number of executors.
+ :paramtype executor_instances: Optional[int]
+ :keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
+ executors registered with this application up and down based on the workload. Defaults to False.
+ :paramtype dynamic_allocation_enabled: Optional[bool]
+ :keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is
+ enabled.
+ :paramtype dynamic_allocation_min_executors: Optional[int]
+ :keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is
+ enabled.
+ :paramtype dynamic_allocation_max_executors: Optional[int]
+ :keyword conf: A dictionary with pre-defined Spark configurations key and values. Defaults to None.
+ :paramtype conf: Optional[dict[str, str]]
+ :keyword environment: The Azure ML environment to run the job in.
+ :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+ :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None.
+ :paramtype inputs: Optional[dict[str, Union[
+ ~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
+ ~azure.ai.ml.Input,
+ str,
+ bool,
+ int,
+ float,
+ Enum,
+ ]]]
+ :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None.
+ :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
+ :keyword args: The arguments for the job. Defaults to None.
+ :paramtype args: Optional[str]
+ :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None.
+ :paramtype additional_includes: Optional[List[str]]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_spark_configurations.py
+ :start-after: [START spark_component_definition]
+ :end-before: [END spark_component_definition]
+ :language: python
+ :dedent: 8
+ :caption: Creating SparkComponent.
+ """
+
+ def __init__(
+ self,
+ *,
+ code: Optional[Union[str, os.PathLike]] = ".",
+ entry: Optional[Union[Dict[str, str], SparkJobEntry]] = None,
+ py_files: Optional[List[str]] = None,
+ jars: Optional[List[str]] = None,
+ files: Optional[List[str]] = None,
+ archives: Optional[List[str]] = None,
+ driver_cores: Optional[Union[int, str]] = None,
+ driver_memory: Optional[str] = None,
+ executor_cores: Optional[Union[int, str]] = None,
+ executor_memory: Optional[str] = None,
+ executor_instances: Optional[Union[int, str]] = None,
+ dynamic_allocation_enabled: Optional[Union[bool, str]] = None,
+ dynamic_allocation_min_executors: Optional[Union[int, str]] = None,
+ dynamic_allocation_max_executors: Optional[Union[int, str]] = None,
+ conf: Optional[Dict[str, str]] = None,
+ environment: Optional[Union[str, Environment]] = None,
+ inputs: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ args: Optional[str] = None,
+ additional_includes: Optional[List] = None,
+ **kwargs: Any,
+ ) -> None:
+ # validate init params are valid type
+ validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
+
+ kwargs[COMPONENT_TYPE] = NodeType.SPARK
+
+ super().__init__(
+ inputs=inputs,
+ outputs=outputs,
+ **kwargs,
+ )
+
+ self.code: Optional[Union[str, os.PathLike]] = code
+ self.entry = entry
+ self.py_files = py_files
+ self.jars = jars
+ self.files = files
+ self.archives = archives
+ self.conf = conf
+ self.environment = environment
+ self.args = args
+ self.additional_includes = additional_includes or []
+ # For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf.
+ # If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent
+ # verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node
+ # in pipeline sdk
+ conf = conf or {}
+ self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None)
+ self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None)
+ self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None)
+ self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None)
+ self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None)
+ self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None
+ )
+ self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None
+ )
+ self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None
+ )
+
+ @classmethod
+ def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
+ return SparkComponentSchema(context=context)
+
+ @classmethod
+ def _attr_type_map(cls) -> dict:
+ return {
+ "environment": (str, Environment),
+ "code": (str, os.PathLike),
+ }
+
+ def _customized_validate(self) -> MutableValidationResult:
+ validation_result = super()._customized_validate()
+ self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
+ return validation_result
+
+ def _to_dict(self) -> Dict:
+ # TODO: Bug Item number: 2897665
+ res: Dict = convert_ordered_dict_to_dict( # type: ignore
+ {**self._other_parameter, **super(SparkComponent, self)._to_dict()}
+ )
+ return res
+
+ def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+ """Dump the component content into a sorted yaml string.
+
+ :return: The ordered dict
+ :rtype: Dict
+ """
+
+ obj: dict = super()._to_ordered_dict_for_yaml_dump()
+ # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+ if self.code and isinstance(self.code, str):
+ obj["code"] = self.code
+ return obj
+
+ def _get_environment_id(self) -> Union[str, None]:
+ # Return environment id of environment
+ # handle case when environment is defined inline
+ if isinstance(self.environment, Environment):
+ res: Optional[str] = self.environment.id
+ return res
+ return self.environment
+
+ def __str__(self) -> str:
+ try:
+ toYaml: str = self._to_yaml()
+ return toYaml
+ except BaseException: # pylint: disable=W0718
+ toStr: str = super(SparkComponent, self).__str__()
+ return toStr