diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component')
13 files changed, 4016 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py new file mode 100644 index 00000000..fdf8caba --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/__init__.py @@ -0,0 +1,5 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py new file mode 100644 index 00000000..85f609ca --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/_additional_includes.py @@ -0,0 +1,541 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import json +import os +import shutil +import tempfile +import zipfile +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from multiprocessing import cpu_count +from pathlib import Path +from typing import Any, Dict, Generator, List, Optional, Tuple, Union + +from azure.ai.ml.constants._common import AzureDevopsArtifactsType +from azure.ai.ml.entities._validation import MutableValidationResult, ValidationResultBuilder + +from ..._utils._artifact_utils import ArtifactCache +from ..._utils._asset_utils import IgnoreFile, get_upload_files_from_folder +from ..._utils.utils import is_concurrent_component_registration_enabled, is_private_preview_enabled +from ...entities._util import _general_copy +from .._assets import Code +from .code import ComponentCodeMixin, ComponentIgnoreFile + +PLACEHOLDER_FILE_NAME = "_placeholder_spec.yaml" + + +class AdditionalIncludes: + """Initialize the AdditionalIncludes object. + + :param origin_code_value: The origin code value. + :type origin_code_value: Optional[str] + :param base_path: The base path for origin code path and additional include configs. + :type base_path: Path + :param configs: The additional include configs. + :type configs: List[Union[str, dict]] + """ + + def __init__( + self, + *, + origin_code_value: Optional[str], + base_path: Path, + configs: Optional[List[Union[str, dict]]] = None, + ) -> None: + self._base_path = base_path + self._origin_code_value = origin_code_value + self._origin_configs = configs + + @property + def origin_configs(self) -> List: + """The origin additional include configs. + Artifact additional include configs haven't been resolved in this property. + + :return: The origin additional include configs. + :rtype: List[Union[str, dict]] + """ + return self._origin_configs or [] + + @property + def resolved_code_path(self) -> Union[None, Path]: + """The resolved origin code path based on base path, if code path is not specified, return None. + We shouldn't change this property name given it's referenced in mldesigner. + + :return: The resolved origin code path. + :rtype: Union[None, Path] + """ + if self._origin_code_value is None: + return None + if os.path.isabs(self._origin_code_value): + return Path(self._origin_code_value) + return (self.base_path / self._origin_code_value).resolve() + + @property + def base_path(self) -> Path: + """Base path for origin code path and additional include configs. + + :return: The base path. + :rtype: Path + """ + return self._base_path + + @property + def with_includes(self) -> bool: + """Whether the additional include configs have been provided. + + :return: True if additional include configs have been provided, False otherwise. + :rtype: bool + """ + return len(self.origin_configs) != 0 + + @classmethod + def _get_artifacts_by_config(cls, artifact_config: Dict[str, str]) -> Union[str, os.PathLike]: + # config key existence has been validated in _validate_additional_include_config + res: Union[str, os.PathLike] = ArtifactCache().get( + organization=artifact_config.get("organization", None), + project=artifact_config.get("project", None), + feed=artifact_config["feed"], + name=artifact_config["name"], + version=artifact_config["version"], + scope=artifact_config.get("scope", "organization"), + resolve=True, + ) + return res + + def _validate_additional_include_config( + self, additional_include_config: Union[Dict, str] + ) -> MutableValidationResult: + validation_result = ValidationResultBuilder.success() + if ( + isinstance(additional_include_config, dict) + and additional_include_config.get("type") == AzureDevopsArtifactsType.ARTIFACT + ): + # for artifact additional include, we validate the required fields in config but won't validate the + # artifact content to avoid downloading it in validation stage + # note that runtime error will be thrown when loading the artifact + for item in ["feed", "name", "version"]: + if item not in additional_include_config: + # TODO: add yaml path after we support list index in yaml path + validation_result.append_error( + "{} are required for artifacts config but got {}.".format( + item, json.dumps(additional_include_config) + ) + ) + elif isinstance(additional_include_config, str): + validation_result.merge_with(self._validate_local_additional_include_config(additional_include_config)) + else: + validation_result.append_error( + message=f"Unexpected format in additional_includes, {additional_include_config}" + ) + return validation_result + + @classmethod + def _resolve_artifact_additional_include_config( + cls, artifact_additional_include_config: Dict[str, str] + ) -> List[Tuple[str, str]]: + """Resolve an artifact additional include config into a list of (local_path, config_info) tuples. + + Configured artifact will be downloaded to local path first; the config_info will be in below format: + %name%:%version% in %feed% + + :param artifact_additional_include_config: Additional include config for an artifact + :type artifact_additional_include_config: Dict[str, str] + :return: A list of 2-tuples of local_path and config_info + :rtype: List[Tuple[str, str]] + """ + result = [] + # Note that we don't validate the artifact config here, since it has already been validated in + # _validate_additional_include_config + artifact_path = cls._get_artifacts_by_config(artifact_additional_include_config) + for item in os.listdir(artifact_path): + config_info = ( + f"{artifact_additional_include_config['name']}:{artifact_additional_include_config['version']} in " + f"{artifact_additional_include_config['feed']}" + ) + result.append((os.path.join(artifact_path, item), config_info)) + return result + + def _resolve_artifact_additional_include_configs( + self, artifact_additional_includes_configs: List[Dict[str, str]] + ) -> List: + additional_include_info_tuples = [] + # Unlike component registration, artifact downloading is a pure download progress; so we can use + # more threads to speed up the downloading process. + # We use 5 threads per CPU core plus 5 extra threads, and the max number of threads is 64. + num_threads = min(64, (int(cpu_count()) * 5) + 5) + if ( + len(artifact_additional_includes_configs) > 1 + and is_concurrent_component_registration_enabled() + and is_private_preview_enabled() + ): + with ThreadPoolExecutor(max_workers=num_threads) as executor: + all_artifact_pairs_itr = executor.map( + self._resolve_artifact_additional_include_config, artifact_additional_includes_configs + ) + + for artifact_pairs in all_artifact_pairs_itr: + additional_include_info_tuples.extend(artifact_pairs) + else: + all_artifact_pairs_list = list( + map(self._resolve_artifact_additional_include_config, artifact_additional_includes_configs) + ) + + for artifact_pairs in all_artifact_pairs_list: + additional_include_info_tuples.extend(artifact_pairs) + + return additional_include_info_tuples + + @staticmethod + def _copy(src: Path, dst: Path, *, ignore_file: Optional[Any] = None) -> None: + if ignore_file and ignore_file.is_file_excluded(src): + return + if not src.exists(): + raise ValueError(f"Path {src} does not exist.") + if src.is_file(): + _general_copy(src, dst) + if src.is_dir(): + # TODO: should we cover empty folder? + # use os.walk to replace shutil.copytree, which may raise FileExistsError + # for same folder, the expected behavior is merging + # ignore will be also applied during this process + for name in src.glob("*"): + if ignore_file is not None: + AdditionalIncludes._copy(name, dst / name.name, ignore_file=ignore_file.merge(name)) + + @staticmethod + def _is_folder_to_compress(path: Path) -> bool: + """Check if the additional include needs to compress corresponding folder as a zip. + + For example, given additional include /mnt/c/hello.zip + 1) if a file named /mnt/c/hello.zip already exists, return False (simply copy) + 2) if a folder named /mnt/c/hello exists, return True (compress as a zip and copy) + + :param path: Given path in additional include. + :type path: Path + :return: If the path need to be compressed as a zip file. + :rtype: bool + """ + if path.suffix != ".zip": + return False + # if zip file exists, simply copy as other additional includes + if path.exists(): + return False + # remove .zip suffix and check whether the folder exists + stem_path = path.parent / path.stem + return stem_path.is_dir() + + def _resolve_folder_to_compress(self, include: str, dst_path: Path, ignore_file: IgnoreFile) -> None: + """resolve the zip additional include, need to compress corresponding folder. + + :param include: The path, relative to :attr:`AdditionalIncludes.base_path`, to zip + :type include: str + :param dst_path: The path to write the zipfile to + :type dst_path: Path + :param ignore_file: The ignore file to use to filter files + :type ignore_file: IgnoreFile + """ + zip_additional_include = (self.base_path / include).resolve() + folder_to_zip = zip_additional_include.parent / zip_additional_include.stem + zip_file = dst_path / zip_additional_include.name + with zipfile.ZipFile(zip_file, "w") as zf: + zf.write(folder_to_zip, os.path.relpath(folder_to_zip, folder_to_zip.parent)) # write root in zip + paths = [path for path, _ in get_upload_files_from_folder(folder_to_zip, ignore_file=ignore_file)] + # sort the paths to make sure the zip file (namelist) is deterministic + for path in sorted(paths): + zf.write(path, os.path.relpath(path, folder_to_zip.parent)) + + def _get_resolved_additional_include_configs(self) -> List[str]: + """ + Resolve additional include configs to a list of local_paths and return it. + + Addition includes is a list of include files, including local paths and Azure Devops Artifacts. + Yaml format of additional_includes looks like below: + additional_includes: + - your/local/path + - type: artifact + organization: devops_organization + project: devops_project + feed: artifacts_feed_name + name: universal_package_name + version: package_version + scope: scope_type + The artifacts package will be downloaded from devops to the local in this function and transferred to + the local paths of downloaded artifacts; + The local paths will be returned directly. + If there are conflicts among artifacts, runtime error will be raised. Note that we won't check the + conflicts between artifacts and local paths and conflicts among local paths. Reasons are: + 1. There can be ignore_file in local paths, which makes it hard to check the conflict and may lead to breaking + changes; + 2. Conflicts among artifacts are more likely to happen, since user may refer to 2 artifacts of the same name + but with different version & feed. + 3. According to current design, folders in local paths will be merged; while artifact conflicts can be + identified by folder name conflicts and are not allowed. + + :return additional_includes: Path list of additional_includes + :rtype additional_includes: List[str] + """ + additional_include_configs_in_local_path = [] + + artifact_additional_include_configs = [] + for additional_include_config in self.origin_configs: + if isinstance(additional_include_config, str): + # add local additional include configs directly + additional_include_configs_in_local_path.append(additional_include_config) + else: + # artifact additional include config will be downloaded and resolved to a local path later + # note that there is no more validation for artifact additional include config here, since it has + # already been validated in _validate_additional_include_config + artifact_additional_include_configs.append(additional_include_config) + + artifact_additional_include_info_tuples = self._resolve_artifact_additional_include_configs( + artifact_additional_include_configs + ) + additional_include_configs_in_local_path.extend( + local_path for local_path, _ in artifact_additional_include_info_tuples + ) + + # check file conflicts among artifact package + # given this is not in validate stage, we will raise error if there are conflict files + conflict_files: dict = defaultdict(set) + for local_path, config_info in artifact_additional_include_info_tuples: + file_name = Path(local_path).name + conflict_files[file_name].add(config_info) + + conflict_files = {k: v for k, v in conflict_files.items() if len(v) > 1} + if conflict_files: + raise RuntimeError(f"There are conflict files in additional include: {conflict_files}") + + return additional_include_configs_in_local_path + + def _validate_local_additional_include_config( + self, local_path: str, config_info: Optional[str] = None + ) -> MutableValidationResult: + """Validate local additional include config. + + Note that we will check the file conflicts between each local additional includes and origin code, but + won't check the file conflicts among local additional includes fo now. + + :param local_path: The local path + :type local_path: str + :param config_info: The config info + :type config_info: Optional[str] + :return: The validation result. + :rtype: ~azure.ai.ml.entities._validation.MutableValidationResult + """ + validation_result = ValidationResultBuilder.success() + include_path = self.base_path / local_path + # if additional include has not supported characters, resolve will fail and raise OSError + try: + src_path = include_path.resolve() + except OSError: + # no need to include potential yaml file name in error message as it will be covered by + # validation message construction. + error_msg = ( + f"Failed to resolve additional include " f"{config_info or local_path} " f"based on {self.base_path}." + ) + validation_result.append_error(message=error_msg) + return validation_result + + if not src_path.exists() and not self._is_folder_to_compress(src_path): + error_msg = f"Unable to find additional include {config_info or local_path}" + validation_result.append_error(message=error_msg) + return validation_result + + if len(src_path.parents) == 0: + error_msg = "Root directory is not supported for additional includes." + validation_result.append_error(message=error_msg) + return validation_result + + dst_path = Path(self.resolved_code_path) / src_path.name if self.resolved_code_path else None + if dst_path: + if dst_path.is_symlink(): + # if destination path is symbolic link, check if it points to the same file/folder as source path + if dst_path.resolve() != src_path.resolve(): + error_msg = f"A symbolic link already exists for additional include {config_info or local_path}." + validation_result.append_error(message=error_msg) + return validation_result + elif dst_path.exists(): + error_msg = f"A file already exists for additional include {config_info or local_path}." + validation_result.append_error(message=error_msg) + return validation_result + + def validate(self) -> MutableValidationResult: + """Validate the AdditionalIncludes object. + + :return: The validation result. + :rtype: ~azure.ai.ml.entities._validation.MutableValidationResult + """ + validation_result = ValidationResultBuilder.success() + for additional_include_config in self.origin_configs: + validation_result.merge_with(self._validate_additional_include_config(additional_include_config)) + return validation_result + + def _copy_origin_code(self, target_path: Path) -> ComponentIgnoreFile: + """Copy origin code to target path. + + :param target_path: The destination to copy to + :type target_path: Path + :return: The component ignore file for the origin path + :rtype: ComponentIgnoreFile + """ + # code can be either file or folder, as additional includes exists, need to copy to temporary folder + if self.resolved_code_path is None: + # if additional include configs exist but no origin code path, return a dummy ignore file + return ComponentIgnoreFile( + self.base_path, + ) + + if Path(self.resolved_code_path).is_file(): + # use a dummy ignore file to save base path + root_ignore_file = ComponentIgnoreFile( + Path(self.resolved_code_path).parent, + skip_ignore_file=True, + ) + self._copy( + Path(self.resolved_code_path), + target_path / Path(self.resolved_code_path).name, + ignore_file=root_ignore_file, + ) + else: + # current implementation of ignore file is based on absolute path, so it cannot be shared + root_ignore_file = ComponentIgnoreFile(self.resolved_code_path) + self._copy(self.resolved_code_path, target_path, ignore_file=root_ignore_file) + return root_ignore_file + + @contextmanager + def merge_local_code_and_additional_includes(self) -> Generator: + """Merge code and potential additional includes into a temporary folder and return the absolute path of it. + + If no additional includes are specified, just return the absolute path of the original code path. + If no original code path is specified, return None. + + :return: The absolute path of the merged code and additional includes. + :rtype: Path + """ + if not self.with_includes: + if self.resolved_code_path is None: + yield None + else: + yield self.resolved_code_path.absolute() + return + + # for now, upload path of a code asset will include the folder name of the code path (name of folder or + # parent name of file). For example, if code path is /mnt/c/code-a, upload path will be xxx/code-a + # which means that the upload path will change every time as we will merge additional includes into a temp + # folder. To avoid this, we will copy the code path to a child folder with a fixed name under the temp folder, + # then the child folder will be used in upload path. + # This issue shouldn't impact users as there is a separate asset existence check before uploading. + # We still make this change as: + # 1. We will always need to record for twice as upload path will be changed for first time uploading + # 2. This will improve the stability of the code asset existence check - AssetNotChanged check in + # BlobStorageClient will be a backup check + tmp_folder_path = Path(tempfile.mkdtemp(), "code_with_additional_includes") + tmp_folder_path.mkdir(parents=True, exist_ok=True) + + root_ignore_file = self._copy_origin_code(tmp_folder_path) + + # resolve additional includes + base_path = self.base_path + # additional includes from artifact will be downloaded to a temp local path on calling + # self.includes, so no need to add specific logic for artifact + + # TODO: skip ignored files defined in code when copying additional includes + # copy additional includes disregarding ignore files as current ignore file implementation + # is based on absolute path, which is not suitable for additional includes + for additional_include_local_path in self._get_resolved_additional_include_configs(): + src_path = Path(additional_include_local_path) + if not src_path.is_absolute(): + src_path = (base_path / additional_include_local_path).resolve() + dst_path = (tmp_folder_path / src_path.name).resolve() + + root_ignore_file.rebase(src_path.parent) + if self._is_folder_to_compress(src_path): + self._resolve_folder_to_compress( + additional_include_local_path, + Path(tmp_folder_path), + # actual src path is without .zip suffix + ignore_file=root_ignore_file.merge(src_path.parent / src_path.stem), + ) + # early continue as the folder is compressed as a zip file + continue + + # no need to check if src_path exists as it is already validated + if src_path.is_file(): + self._copy(src_path, dst_path, ignore_file=root_ignore_file) + elif src_path.is_dir(): + self._copy( + src_path, + dst_path, + # root ignore file on parent + ignore file on src_path + ignore_file=root_ignore_file.merge(src_path), + ) + else: + raise ValueError(f"Unable to find additional include {additional_include_local_path}.") + try: + yield tmp_folder_path.absolute() + + finally: + # clean up tmp folder as it can be very disk space consuming + shutil.rmtree(tmp_folder_path, ignore_errors=True) + + +class AdditionalIncludesMixin(ComponentCodeMixin): + @classmethod + def _get_additional_includes_field_name(cls) -> str: + """Get the field name for additional includes. + + :return: The field name + :rtype: str + """ + return "additional_includes" + + def _get_all_additional_includes_configs(self) -> List: + return getattr(self, self._get_additional_includes_field_name(), []) + + def _append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation( + self, base_validation_result: Optional[MutableValidationResult] = None + ) -> bool: + is_reliable: bool = super()._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation( + base_validation_result + ) + additional_includes_obj = self._generate_additional_includes_obj() + + if base_validation_result is not None: + base_validation_result.merge_with( + additional_includes_obj.validate(), field_name=self._get_additional_includes_field_name() + ) + # if additional includes is specified, origin code will be merged with additional includes into a temp folder + # before registered as a code asset, so origin code value is not reliable for local path validation + if additional_includes_obj.with_includes: + return False + return is_reliable + + def _generate_additional_includes_obj(self) -> AdditionalIncludes: + return AdditionalIncludes( + base_path=self._get_base_path_for_code(), + configs=self._get_all_additional_includes_configs(), + origin_code_value=self._get_origin_code_in_str(), + ) + + @contextmanager + def _try_build_local_code(self) -> Generator: + """Build final code when origin code is a local code. + + Will merge code path with additional includes into a temp folder if additional includes is specified. + + :return: The built Code object + :rtype: Iterable[Optional[Code]] + """ + # will try to merge code and additional includes even if code is None + tmp_code_dir: Any + with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir: + if tmp_code_dir is None: + yield None + else: + yield Code( + base_path=self._get_base_path_for_code(), + path=tmp_code_dir, + ignore_file=ComponentIgnoreFile(tmp_code_dir), + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py new file mode 100644 index 00000000..3e7be727 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/automl_component.py @@ -0,0 +1,42 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import Any, Optional + +from azure.ai.ml._schema import PathAwareSchema +from azure.ai.ml._schema.component.automl_component import AutoMLComponentSchema +from azure.ai.ml.constants._common import COMPONENT_TYPE +from azure.ai.ml.constants._component import NodeType +from azure.ai.ml.entities._component.component import Component + + +class AutoMLComponent(Component): + """AutoML component entity, used to define an automl component. + + AutoML Component will only be used "internally" for the mentioned scenarios that need it. AutoML Component schema is + not intended to be used by the end users and therefore it won't be provided to the end users and it won't have + public documentation for the users. + + :param task: Task of the component. + :type task: str + """ + + def __init__( + self, + *, + task: Optional[str] = None, + **kwargs: Any, + ) -> None: + kwargs[COMPONENT_TYPE] = NodeType.AUTOML + super(AutoMLComponent, self).__init__(**kwargs) + self._task = task + + @property + def task(self) -> Optional[str]: + """Returns task of the component.""" + return self._task + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema: + return AutoMLComponentSchema(context=context) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py new file mode 100644 index 00000000..1f838bec --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/code.py @@ -0,0 +1,297 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import os +from contextlib import contextmanager +from enum import Enum +from pathlib import Path +from typing import Any, Generator, List, Optional, Union + +from azure.ai.ml._utils._arm_id_utils import is_ARM_id_for_resource, is_registry_id_for_resource +from azure.ai.ml._utils._asset_utils import IgnoreFile, get_ignore_file +from azure.ai.ml._utils.utils import is_private_preview_enabled +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AzureMLResourceType +from azure.ai.ml.entities._assets import Code +from azure.ai.ml.entities._validation import MutableValidationResult + + +class ComponentIgnoreFile(IgnoreFile): + _COMPONENT_CODE_IGNORES = ["__pycache__"] + """Component-specific ignore file used for ignoring files in a component directory. + + :param directory_path: The directory path for the ignore file. + :type directory_path: Union[str, Path] + :param additional_includes_file_name: Name of the additional includes file in the root directory to be ignored. + :type additional_includes_file_name: str + :param skip_ignore_file: Whether to skip the ignore file, defaults to False. + :type skip_ignore_file: bool + :param extra_ignore_list: List of additional ignore files to be considered during file exclusion. + :type extra_ignore_list: List[~azure.ai.ml._utils._asset_utils.IgnoreFile] + :raises ValueError: If additional include file is not found. + :return: The ComponentIgnoreFile object. + :rtype: ComponentIgnoreFile + """ + + def __init__( + self, + directory_path: Union[str, Path], + *, + additional_includes_file_name: Optional[str] = None, + skip_ignore_file: bool = False, + extra_ignore_list: Optional[List[IgnoreFile]] = None, + ): + self._base_path: Union[str, Path] = Path(directory_path) + self._extra_ignore_list: List[IgnoreFile] = extra_ignore_list or [] + # only the additional include file in root directory is ignored + # additional include files in subdirectories are not processed so keep them + self._additional_includes_file_name = additional_includes_file_name + # note: the parameter changes to directory path in this class, rather than file path + file_path = None if skip_ignore_file else get_ignore_file(directory_path).path + super(ComponentIgnoreFile, self).__init__(file_path=file_path) + + def exists(self) -> bool: + """Check if the ignore file exists. + + :return: True + :rtype: bool + """ + return True + + @property + def base_path(self) -> Union[str, Path]: + """Get the base path of the ignore file. + + :return: The base path. + :rtype: Path + """ + # for component ignore file, the base path can be different from file.parent + return self._base_path + + def rebase(self, directory_path: Union[str, Path]) -> "ComponentIgnoreFile": + """Rebase the ignore file to a new directory. + + :param directory_path: The new directory path. + :type directory_path: Union[str, Path] + :return: The rebased ComponentIgnoreFile object. + :rtype: ComponentIgnoreFile + """ + self._base_path = directory_path + return self + + def is_file_excluded(self, file_path: Union[str, Path]) -> bool: + """Check if a file should be excluded based on the ignore file rules. + + :param file_path: The file path. + :type file_path: Union[str, Path] + :return: True if the file should be excluded, False otherwise. + :rtype: bool + """ + if self._additional_includes_file_name and self._get_rel_path(file_path) == self._additional_includes_file_name: + return True + for ignore_file in self._extra_ignore_list: + if ignore_file.is_file_excluded(file_path): + return True + res: bool = super(ComponentIgnoreFile, self).is_file_excluded(file_path) + return res + + def merge(self, other_path: Path) -> "ComponentIgnoreFile": + """Merge the ignore list from another ComponentIgnoreFile object. + + :param other_path: The path of the other ignore file. + :type other_path: Path + :return: The merged ComponentIgnoreFile object. + :rtype: ComponentIgnoreFile + """ + if other_path.is_file(): + return self + return ComponentIgnoreFile(other_path, extra_ignore_list=self._extra_ignore_list + [self]) + + def _get_ignore_list(self) -> List[str]: + """Retrieves the list of ignores from ignore file + + Override to add custom ignores. + + :return: The ignore rules + :rtype: List[str] + """ + if not super(ComponentIgnoreFile, self).exists(): + return self._COMPONENT_CODE_IGNORES + res: list = super(ComponentIgnoreFile, self)._get_ignore_list() + self._COMPONENT_CODE_IGNORES + return res + + +class CodeType(Enum): + """Code type.""" + + LOCAL = "local" + NONE = "none" + GIT = "git" + ARM_ID = "arm_id" + UNKNOWN = "unknown" + + +def _get_code_type(origin_code_value: Optional[str]) -> CodeType: + if origin_code_value is None: + return CodeType.NONE + if not isinstance(origin_code_value, str): + # note that: + # 1. Code & CodeOperation are not public for now + # 2. AnonymousCodeSchema is not within CodeField + # 3. Code will be returned as an arm id as an attribute of a component when getting a component from remote + # So origin_code_value should never be a Code object, or an exception will be raised + # in validation stage. + return CodeType.UNKNOWN + if is_ARM_id_for_resource(origin_code_value, AzureMLResourceType.CODE) or is_registry_id_for_resource( + origin_code_value + ): + return CodeType.ARM_ID + if origin_code_value.startswith("git+"): + return CodeType.GIT + return CodeType.LOCAL + + +class ComponentCodeMixin: + """Mixin class for components with local files as part of the component. Those local files will be uploaded to + blob storage and further referenced as a code asset in arm id. In below docstring, we will refer to those local + files as "code". + + The major interface of this mixin is self._customized_code_validate and self._build_code. + self._customized_code_validate will return a validation result indicating whether the code is valid. + self._build_code will return a temp Code object for server-side code asset creation. + """ + + def _get_base_path_for_code(self) -> Path: + """Get base path for additional includes. + + :return: The base path + :rtype: Path + """ + if hasattr(self, BASE_PATH_CONTEXT_KEY): + return Path(getattr(self, BASE_PATH_CONTEXT_KEY)) + raise NotImplementedError( + "Component must have a base_path attribute to use ComponentCodeMixin. " + "Please set base_path in __init__ or override _get_base_path_for_code." + ) + + @classmethod + def _get_code_field_name(cls) -> str: + """Get the field name for code. + + Will be used to get origin code value by default and will be used as field name of validation diagnostics. + + :return: Code field name + :rtype: str + """ + return "code" + + def _get_origin_code_value(self) -> Union[str, os.PathLike, None]: + """Get origin code value. + Origin code value is either an absolute path or a relative path to base path if it's a local path. + Additional includes are only supported for component types with code attribute. Origin code path will be copied + to a temp folder along with additional includes to form a new code content. + """ + return getattr(self, self._get_code_field_name(), None) + + def _fill_back_code_value(self, value: str) -> None: + """Fill resolved code value back to the component. + + :param value: resolved code value + :type value: str + :return: no return + :rtype: None + """ + return setattr(self, self._get_code_field_name(), value) + + def _get_origin_code_in_str(self) -> Optional[str]: + """Get origin code value in str to simplify following logic.""" + origin_code_value = self._get_origin_code_value() + if origin_code_value is None: + return None + if isinstance(origin_code_value, Path): + return origin_code_value.as_posix() + return str(origin_code_value) + + def _append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation( + self, base_validation_result: Optional[MutableValidationResult] = None + ) -> bool: + """Append diagnostics from customized validation logic to the base validation result and check if origin code + value is valid for path validation. + + For customized validation logic, this method shouldn't cover the validation logic duplicated with schema + validation, like local code existence check. + For the check, as "code" includes file dependencies of a component, other fields may depend on those files. + However, the origin code value may not be reliable for validation of those fields. For example: + 1. origin code value can be a remote git path or an arm id of a code asset. + 2. some file operations may be done during build_code, which makes final code content different from what we can + get from origin code value. + So, we use this function to check if origin code value is reliable for further local path validation. + + :param base_validation_result: base validation result to append diagnostics to. + :type base_validation_result: MutableValidationResult + :return: whether origin code value is reliable for further local path validation. + :rtype: bool + """ + # If private features are enable and component has code value of type str we need to check + # that it is a valid git path case. Otherwise, we should throw a ValidationError + # saying that the code value is not valid + code_type = _get_code_type(self._get_origin_code_in_str()) + if code_type == CodeType.GIT and not is_private_preview_enabled(): + if base_validation_result is not None: + base_validation_result.append_error( + message="Not a valid code value: git paths are not supported.", + yaml_path=self._get_code_field_name(), + ) + return code_type == CodeType.LOCAL + + @contextmanager + def _build_code(self) -> Generator: + """Create a Code object if necessary based on origin code value and yield it. + + :return: If built code is the same as its origin value, do nothing and yield None. + Otherwise, yield a Code object pointing to the code. + :rtype: Iterable[Optional[Code]] + """ + origin_code_value = self._get_origin_code_in_str() + code_type = _get_code_type(origin_code_value) + + if code_type == CodeType.GIT: + # git also need to be resolved into arm id + yield Code(path=origin_code_value) + elif code_type in [CodeType.LOCAL, CodeType.NONE]: + code: Any + # false-positive by pylint, hence disable it + # (https://github.com/pylint-dev/pylint/blob/main/doc/data/messages + # /c/contextmanager-generator-missing-cleanup/details.rst) + with self._try_build_local_code() as code: # pylint:disable=contextmanager-generator-missing-cleanup + yield code + else: + # arm id, None and unknown need no extra resolution + yield None + + @contextmanager + def _try_build_local_code(self) -> Generator: + """Extract the logic of _build_code for local code for further override. + + :return: The Code object if could be constructed, None otherwise + :rtype: Iterable[Optional[Code]] + """ + origin_code_value = self._get_origin_code_in_str() + if origin_code_value is None: + yield None + else: + base_path = self._get_base_path_for_code() + absolute_path: Union[str, Path] = ( + origin_code_value if os.path.isabs(origin_code_value) else base_path / origin_code_value + ) + + yield Code( + base_path=base_path, + path=origin_code_value, + ignore_file=ComponentIgnoreFile(absolute_path), + ) + + def _with_local_code(self) -> bool: + # TODO: remove this method after we have a better way to do this judge in cache_utils + origin_code_value = self._get_origin_code_in_str() + code_type = _get_code_type(origin_code_value) + return code_type == CodeType.LOCAL diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py new file mode 100644 index 00000000..9bdcd3d1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/command_component.py @@ -0,0 +1,300 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import os +from typing import Any, Dict, List, Optional, Union, cast + +from marshmallow import Schema + +from azure.ai.ml._schema.component.command_component import CommandComponentSchema +from azure.ai.ml.constants._common import COMPONENT_TYPE +from azure.ai.ml.constants._component import NodeType +from azure.ai.ml.entities._assets import Environment +from azure.ai.ml.entities._job.distribution import ( + DistributionConfiguration, + MpiDistribution, + PyTorchDistribution, + RayDistribution, + TensorFlowDistribution, +) +from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration +from azure.ai.ml.entities._job.parameterized_command import ParameterizedCommand +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException + +from ..._restclient.v2022_10_01.models import ComponentVersion +from ..._schema import PathAwareSchema +from ..._utils.utils import get_all_data_binding_expressions, parse_args_description_from_docstring +from .._util import convert_ordered_dict_to_dict, validate_attribute_type +from .._validation import MutableValidationResult +from ._additional_includes import AdditionalIncludesMixin +from .component import Component + +# pylint: disable=protected-access + + +class CommandComponent(Component, ParameterizedCommand, AdditionalIncludesMixin): + """Command component version, used to define a Command Component or Job. + + :keyword name: The name of the Command job or component. + :paramtype name: Optional[str] + :keyword version: The version of the Command job or component. + :paramtype version: Optional[str] + :keyword description: The description of the component. Defaults to None. + :paramtype description: Optional[str] + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None. + :paramtype tags: Optional[dict] + :keyword display_name: The display name of the component. + :paramtype display_name: Optional[str] + :keyword command: The command to be executed. + :paramtype command: Optional[str] + :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing + to a remote location. + :type code: Optional[str] + :keyword environment: The environment that the job will run in. + :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]] + :keyword distribution: The configuration for distributed jobs. Defaults to None. + :paramtype distribution: Optional[Union[~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution, + ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]] + :keyword resources: The compute resource configuration for the command. + :paramtype resources: Optional[~azure.ai.ml.entities.JobResourceConfiguration] + :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None. + :paramtype inputs: Optional[dict[str, Union[ + ~azure.ai.ml.Input, + str, + bool, + int, + float, + Enum, + ]]] + :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None. + :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]] + :keyword instance_count: The number of instances or nodes to be used by the compute target. Defaults to 1. + :paramtype instance_count: Optional[int] + :keyword is_deterministic: Specifies whether the Command will return the same output given the same input. + Defaults to True. When True, if a Command (component) is deterministic and has been run before in the + current workspace with the same input and settings, it will reuse results from a previous submitted job + when used as a node or step in a pipeline. In that scenario, no compute resources will be used. + :paramtype is_deterministic: Optional[bool] + :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None. + :paramtype additional_includes: Optional[List[str]] + :keyword properties: The job property dictionary. Defaults to None. + :paramtype properties: Optional[dict[str, str]] + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if CommandComponent cannot be successfully validated. + Details will be provided in the error message. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_command_configurations.py + :start-after: [START command_component_definition] + :end-before: [END command_component_definition] + :language: python + :dedent: 8 + :caption: Creating a CommandComponent. + """ + + def __init__( + self, + *, + name: Optional[str] = None, + version: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + command: Optional[str] = None, + code: Optional[Union[str, os.PathLike]] = None, + environment: Optional[Union[str, Environment]] = None, + distribution: Optional[ + Union[ + Dict, + MpiDistribution, + TensorFlowDistribution, + PyTorchDistribution, + RayDistribution, + DistributionConfiguration, + ] + ] = None, + resources: Optional[JobResourceConfiguration] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + instance_count: Optional[int] = None, # promoted property from resources.instance_count + is_deterministic: bool = True, + additional_includes: Optional[List] = None, + properties: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + + kwargs[COMPONENT_TYPE] = NodeType.COMMAND + + # Component backend doesn't support environment_variables yet, + # this is to support the case of CommandComponent being the trial of + # a SweepJob, where environment_variables is stored as part of trial + environment_variables = kwargs.pop("environment_variables", None) + super().__init__( + name=name, + version=version, + description=description, + tags=tags, + display_name=display_name, + inputs=inputs, + outputs=outputs, + is_deterministic=is_deterministic, + properties=properties, + **kwargs, + ) + + # No validation on value passed here because in pipeline job, required code&environment maybe absent + # and fill in later with job defaults. + self.command = command + self.code = code + self.environment_variables = environment_variables + self.environment = environment + self.resources = resources # type: ignore[assignment] + self.distribution = distribution + + # check mutual exclusivity of promoted properties + if self.resources is not None and instance_count is not None: + msg = "instance_count and resources are mutually exclusive" + raise ValidationException( + message=msg, + target=ErrorTarget.COMPONENT, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + self.instance_count = instance_count + self.additional_includes = additional_includes or [] + + def _to_ordered_dict_for_yaml_dump(self) -> Dict: + """Dump the component content into a sorted yaml string. + + :return: The ordered dict + :rtype: Dict + """ + + obj: dict = super()._to_ordered_dict_for_yaml_dump() + # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value + if self.code and isinstance(self.code, str): + obj["code"] = self.code + return obj + + @property + def instance_count(self) -> Optional[int]: + """The number of instances or nodes to be used by the compute target. + + :return: The number of instances or nodes. + :rtype: int + """ + return self.resources.instance_count if self.resources and not isinstance(self.resources, dict) else None + + @instance_count.setter + def instance_count(self, value: int) -> None: + """Sets the number of instances or nodes to be used by the compute target. + + :param value: The number of instances of nodes to be used by the compute target. Defaults to 1. + :type value: int + """ + if not value: + return + if not self.resources: + self.resources = JobResourceConfiguration(instance_count=value) + else: + if not isinstance(self.resources, dict): + self.resources.instance_count = value + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "environment": (str, Environment), + "environment_variables": dict, + "resources": (dict, JobResourceConfiguration), + "code": (str, os.PathLike), + } + + def _to_dict(self) -> Dict: + return cast( + dict, convert_ordered_dict_to_dict({**self._other_parameter, **super(CommandComponent, self)._to_dict()}) + ) + + @classmethod + def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict: + # put it here as distribution is shared by some components, e.g. command + distribution = obj.properties.component_spec.pop("distribution", None) + init_kwargs: dict = super()._from_rest_object_to_init_params(obj) + if distribution: + init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution) + return init_kwargs + + def _get_environment_id(self) -> Union[str, None]: + # Return environment id of environment + # handle case when environment is defined inline + if isinstance(self.environment, Environment): + _id: Optional[str] = self.environment.id + return _id + return self.environment + + # region SchemaValidatableMixin + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return CommandComponentSchema(context=context) + + def _customized_validate(self) -> MutableValidationResult: + validation_result = super(CommandComponent, self)._customized_validate() + self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result) + validation_result.merge_with(self._validate_command()) + validation_result.merge_with(self._validate_early_available_output()) + return validation_result + + def _validate_command(self) -> MutableValidationResult: + validation_result = self._create_empty_validation_result() + # command + if self.command: + invalid_expressions = [] + for data_binding_expression in get_all_data_binding_expressions(self.command, is_singular=False): + if not self._is_valid_data_binding_expression(data_binding_expression): + invalid_expressions.append(data_binding_expression) + + if invalid_expressions: + validation_result.append_error( + yaml_path="command", + message="Invalid data binding expression: {}".format(", ".join(invalid_expressions)), + ) + return validation_result + + def _validate_early_available_output(self) -> MutableValidationResult: + validation_result = self._create_empty_validation_result() + for name, output in self.outputs.items(): + if output.early_available is True and output._is_primitive_type is not True: + msg = ( + f"Early available output {name!r} requires output is primitive type, " + f"got {output._is_primitive_type!r}." + ) + validation_result.append_error(message=msg, yaml_path=f"outputs.{name}") + return validation_result + + def _is_valid_data_binding_expression(self, data_binding_expression: str) -> bool: + current_obj: Any = self + for item in data_binding_expression.split("."): + if hasattr(current_obj, item): + current_obj = getattr(current_obj, item) + else: + try: + current_obj = current_obj[item] + except Exception: # pylint: disable=W0718 + return False + return True + + # endregion + + @classmethod + def _parse_args_description_from_docstring(cls, docstring: str) -> Dict: + res: dict = parse_args_description_from_docstring(docstring) + return res + + def __str__(self) -> str: + try: + toYaml: str = self._to_yaml() + return toYaml + except BaseException: # pylint: disable=W0718 + toStr: str = super(CommandComponent, self).__str__() + return toStr diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py new file mode 100644 index 00000000..c02a3a33 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component.py @@ -0,0 +1,641 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import re +import uuid +from os import PathLike +from pathlib import Path +from typing import IO, TYPE_CHECKING, Any, AnyStr, Callable, Dict, Iterable, Optional, Tuple, Union + +from marshmallow import INCLUDE + +from ..._restclient.v2024_01_01_preview.models import ( + ComponentContainer, + ComponentContainerProperties, + ComponentVersion, + ComponentVersionProperties, +) +from ..._schema import PathAwareSchema +from ..._schema.component import ComponentSchema +from ..._utils.utils import dump_yaml_to_file, hash_dict +from ...constants._common import ( + ANONYMOUS_COMPONENT_NAME, + BASE_PATH_CONTEXT_KEY, + PARAMS_OVERRIDE_KEY, + REGISTRY_URI_FORMAT, + SOURCE_PATH_CONTEXT_KEY, + CommonYamlFields, + SchemaUrl, +) +from ...constants._component import ComponentSource, IOConstants, NodeType +from ...entities._assets.asset import Asset +from ...entities._inputs_outputs import Input, Output +from ...entities._mixins import LocalizableMixin, TelemetryMixin, YamlTranslatableMixin +from ...entities._system_data import SystemData +from ...entities._util import find_type_in_override +from ...entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin, RemoteValidatableMixin +from ...exceptions import ErrorCategory, ErrorTarget, ValidationException +from .._inputs_outputs import GroupInput + +if TYPE_CHECKING: + from ...entities.builders import BaseNode +# pylint: disable=protected-access, redefined-builtin +# disable redefined-builtin to use id/type as argument name + + +COMPONENT_PLACEHOLDER = "COMPONENT_PLACEHOLDER" + + +class Component( + Asset, + RemoteValidatableMixin, + TelemetryMixin, + YamlTranslatableMixin, + PathAwareSchemaValidatableMixin, + LocalizableMixin, +): + """Base class for component version, used to define a component. Can't be instantiated directly. + + :param name: Name of the resource. + :type name: str + :param version: Version of the resource. + :type version: str + :param id: Global ID of the resource, Azure Resource Manager ID. + :type id: str + :param type: Type of the command, supported is 'command'. + :type type: str + :param description: Description of the resource. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict + :param properties: Internal use only. + :type properties: dict + :param display_name: Display name of the component. + :type display_name: str + :param is_deterministic: Whether the component is deterministic. Defaults to True. + :type is_deterministic: bool + :param inputs: Inputs of the component. + :type inputs: dict + :param outputs: Outputs of the component. + :type outputs: dict + :param yaml_str: The YAML string of the component. + :type yaml_str: str + :param _schema: Schema of the component. + :type _schema: str + :param creation_context: Creation metadata of the component. + :type creation_context: ~azure.ai.ml.entities.SystemData + :param kwargs: Additional parameters for the component. + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully validated. + Details will be provided in the error message. + """ + + # pylint: disable=too-many-instance-attributes + def __init__( + self, + *, + name: Optional[str] = None, + version: Optional[str] = None, + id: Optional[str] = None, + type: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + properties: Optional[Dict] = None, + display_name: Optional[str] = None, + is_deterministic: bool = True, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + yaml_str: Optional[str] = None, + _schema: Optional[str] = None, + creation_context: Optional[SystemData] = None, + **kwargs: Any, + ) -> None: + self.latest_version = None + self._intellectual_property = kwargs.pop("intellectual_property", None) + # Setting this before super init because when asset init version, _auto_increment_version's value may change + self._auto_increment_version = kwargs.pop("auto_increment", False) + # Get source from id first, then kwargs. + self._source = ( + self._resolve_component_source_from_id(id) if id else kwargs.pop("_source", ComponentSource.CLASS) + ) + # use ANONYMOUS_COMPONENT_NAME instead of guid + is_anonymous = kwargs.pop("is_anonymous", False) + if not name and version is None: + name = ANONYMOUS_COMPONENT_NAME + version = "1" + is_anonymous = True + + super().__init__( + name=name, + version=version, + id=id, + description=description, + tags=tags, + properties=properties, + creation_context=creation_context, + is_anonymous=is_anonymous, + base_path=kwargs.pop(BASE_PATH_CONTEXT_KEY, None), + source_path=kwargs.pop(SOURCE_PATH_CONTEXT_KEY, None), + ) + # store kwargs to self._other_parameter instead of pop to super class to allow component have extra + # fields not defined in current schema. + + inputs = inputs if inputs else {} + outputs = outputs if outputs else {} + + self.name = name + self._schema = _schema + self._type = type + self._display_name = display_name + self._is_deterministic = is_deterministic + self._inputs = self._build_io(inputs, is_input=True) + self._outputs = self._build_io(outputs, is_input=False) + # Store original yaml + self._yaml_str = yaml_str + self._other_parameter = kwargs + + @property + def _func(self) -> Callable[..., "BaseNode"]: + from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function + + # validate input/output names before creating component function + validation_result = self._validate_io_names(self.inputs) + validation_result.merge_with(self._validate_io_names(self.outputs)) + self._try_raise(validation_result) + + res: Callable = _generate_component_function(self) + return res + + @property + def type(self) -> Optional[str]: + """Type of the component, default is 'command'. + + :return: Type of the component. + :rtype: str + """ + return self._type + + @property + def display_name(self) -> Optional[str]: + """Display name of the component. + + :return: Display name of the component. + :rtype: str + """ + return self._display_name + + @display_name.setter + def display_name(self, custom_display_name: str) -> None: + """Set display_name of the component. + + :param custom_display_name: The new display name + :type custom_display_name: str + """ + self._display_name = custom_display_name + + @property + def is_deterministic(self) -> Optional[bool]: + """Whether the component is deterministic. + + :return: Whether the component is deterministic + :rtype: bool + """ + return self._is_deterministic + + @property + def inputs(self) -> Dict: + """Inputs of the component. + + :return: Inputs of the component. + :rtype: dict + """ + res: dict = self._inputs + return res + + @property + def outputs(self) -> Dict: + """Outputs of the component. + + :return: Outputs of the component. + :rtype: dict + """ + return self._outputs + + @property + def version(self) -> Optional[str]: + """Version of the component. + + :return: Version of the component. + :rtype: str + """ + return self._version + + @version.setter + def version(self, value: str) -> None: + """Set the version of the component. + + :param value: The version of the component. + :type value: str + """ + if value: + if not isinstance(value, str): + msg = f"Component version must be a string, not type {type(value)}." + raise ValidationException( + message=msg, + target=ErrorTarget.COMPONENT, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + self._version = value + self._auto_increment_version = self.name and not self._version + + def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None: + """Dump the component content into a file in yaml format. + + :param dest: The destination to receive this component's content. + Must be either a path to a local file, or an already-open file stream. + If dest is a file path, a new file will be created, + and an exception is raised if the file exists. + If dest is an open file, the file will be written to directly, + and an exception will be raised if the file is not writable. + :type dest: Union[PathLike, str, IO[AnyStr]] + """ + path = kwargs.pop("path", None) + yaml_serialized = self._to_dict() + dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs) + + @staticmethod + def _resolve_component_source_from_id( # pylint: disable=docstring-type-do-not-use-class + id: Optional[Union["Component", str]], + ) -> Any: + """Resolve the component source from id. + + :param id: The component ID + :type id: Optional[str] + :return: The component source + :rtype: Literal[ + ComponentSource.CLASS, + ComponentSource.REMOTE_REGISTRY, + ComponentSource.REMOTE_WORKSPACE_COMPONENT + + ] + """ + if id is None: + return ComponentSource.CLASS + # Consider default is workspace source, as + # azureml: prefix will be removed for arm versioned id. + return ( + ComponentSource.REMOTE_REGISTRY + if not isinstance(id, Component) and id.startswith(REGISTRY_URI_FORMAT) + else ComponentSource.REMOTE_WORKSPACE_COMPONENT + ) + + @classmethod + def _validate_io_names(cls, io_names: Iterable[str], raise_error: bool = False) -> MutableValidationResult: + """Validate input/output names, raise exception if invalid. + + :param io_names: The names to validate + :type io_names: Iterable[str] + :param raise_error: Whether to raise if validation fails. Defaults to False + :type raise_error: bool + :return: The validation result + :rtype: MutableValidationResult + """ + validation_result = cls._create_empty_validation_result() + lower2original_kwargs: dict = {} + + for name in io_names: + if re.match(IOConstants.VALID_KEY_PATTERN, name) is None: + msg = "{!r} is not a valid parameter name, must be composed letters, numbers, and underscores." + validation_result.append_error(message=msg.format(name), yaml_path=f"inputs.{name}") + # validate name conflict + lower_key = name.lower() + if lower_key in lower2original_kwargs: + msg = "Invalid component input names {!r} and {!r}, which are equal ignore case." + validation_result.append_error( + message=msg.format(name, lower2original_kwargs[lower_key]), yaml_path=f"inputs.{name}" + ) + else: + lower2original_kwargs[lower_key] = name + return cls._try_raise(validation_result, raise_error=raise_error) + + @classmethod + def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool) -> Dict: + component_io: dict = {} + for name, port in io_dict.items(): + if is_input: + component_io[name] = port if isinstance(port, Input) else Input(**port) + else: + component_io[name] = port if isinstance(port, Output) else Output(**port) + + if is_input: + # Restore flattened parameters to group + res: dict = GroupInput.restore_flattened_inputs(component_io) + return res + return component_io + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema: + return ComponentSchema(context=context) + + @classmethod + def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException: + return ValidationException( + message=message, + no_personal_data_message=no_personal_data_message, + target=ErrorTarget.COMPONENT, + ) + + @classmethod + def _is_flow(cls, data: Any) -> bool: + _schema = data.get(CommonYamlFields.SCHEMA, None) + + if _schema and _schema in [SchemaUrl.PROMPTFLOW_FLOW, SchemaUrl.PROMPTFLOW_RUN]: + return True + return False + + @classmethod + def _load( + cls, + data: Optional[Dict] = None, + yaml_path: Optional[Union[PathLike, str]] = None, + params_override: Optional[list] = None, + **kwargs: Any, + ) -> "Component": + data = data or {} + params_override = params_override or [] + base_path = Path(yaml_path).parent if yaml_path else Path("./") + + type_in_override = find_type_in_override(params_override) + + # type_in_override > type_in_yaml > default (command) + if type_in_override is None: + type_in_override = data.get(CommonYamlFields.TYPE, None) + if type_in_override is None and cls._is_flow(data): + type_in_override = NodeType.FLOW_PARALLEL + if type_in_override is None: + type_in_override = NodeType.COMMAND + data[CommonYamlFields.TYPE] = type_in_override + + from azure.ai.ml.entities._component.component_factory import component_factory + + create_instance_func, _ = component_factory.get_create_funcs( + data, + for_load=True, + ) + new_instance: Component = create_instance_func() + # specific keys must be popped before loading with schema using kwargs + init_kwargs = { + "yaml_str": kwargs.pop("yaml_str", None), + "_source": kwargs.pop("_source", ComponentSource.YAML_COMPONENT), + } + init_kwargs.update( + new_instance._load_with_schema( # pylint: disable=protected-access + data, + context={ + BASE_PATH_CONTEXT_KEY: base_path, + SOURCE_PATH_CONTEXT_KEY: yaml_path, + PARAMS_OVERRIDE_KEY: params_override, + }, + unknown=INCLUDE, + raise_original_exception=True, + **kwargs, + ) + ) + # Set base path separately to avoid doing this in post load, as return types of post load are not unified, + # could be object or dict. + # base_path in context can be changed in loading, so we use original base_path here. + init_kwargs[BASE_PATH_CONTEXT_KEY] = base_path.absolute() + if yaml_path: + init_kwargs[SOURCE_PATH_CONTEXT_KEY] = Path(yaml_path).absolute().as_posix() + # TODO: Bug Item number: 2883415 + new_instance.__init__( # type: ignore + **init_kwargs, + ) + return new_instance + + @classmethod + def _from_container_rest_object(cls, component_container_rest_object: ComponentContainer) -> "Component": + component_container_details: ComponentContainerProperties = component_container_rest_object.properties + component = Component( + id=component_container_rest_object.id, + name=component_container_rest_object.name, + description=component_container_details.description, + creation_context=SystemData._from_rest_object(component_container_rest_object.system_data), + tags=component_container_details.tags, + properties=component_container_details.properties, + type=NodeType._CONTAINER, + # Set this field to None as it hold a default True in init. + is_deterministic=None, # type: ignore[arg-type] + ) + component.latest_version = component_container_details.latest_version + return component + + @classmethod + def _from_rest_object(cls, obj: ComponentVersion) -> "Component": + # TODO: Remove in PuP with native import job/component type support in MFE/Designer + # Convert command component back to import component private preview + component_spec = obj.properties.component_spec + if component_spec[CommonYamlFields.TYPE] == NodeType.COMMAND and component_spec["command"] == NodeType.IMPORT: + component_spec[CommonYamlFields.TYPE] = NodeType.IMPORT + component_spec["source"] = component_spec.pop("inputs") + component_spec["output"] = component_spec.pop("outputs")["output"] + + # shouldn't block serialization when name is not valid + # maybe override serialization method for name field? + from azure.ai.ml.entities._component.component_factory import component_factory + + create_instance_func, _ = component_factory.get_create_funcs(obj.properties.component_spec, for_load=True) + + instance: Component = create_instance_func() + # TODO: Bug Item number: 2883415 + instance.__init__(**instance._from_rest_object_to_init_params(obj)) # type: ignore + return instance + + @classmethod + def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict: + # Object got from rest data contain _source, we delete it. + if "_source" in obj.properties.component_spec: + del obj.properties.component_spec["_source"] + + rest_component_version = obj.properties + _type = rest_component_version.component_spec[CommonYamlFields.TYPE] + + # inputs/outputs will be parsed by instance._build_io in instance's __init__ + inputs = rest_component_version.component_spec.pop("inputs", {}) + # parse String -> string, Integer -> integer, etc + for _input in inputs.values(): + _input["type"] = Input._map_from_rest_type(_input["type"]) + outputs = rest_component_version.component_spec.pop("outputs", {}) + + origin_name = rest_component_version.component_spec[CommonYamlFields.NAME] + rest_component_version.component_spec[CommonYamlFields.NAME] = ANONYMOUS_COMPONENT_NAME + init_kwargs = cls._load_with_schema( + rest_component_version.component_spec, context={BASE_PATH_CONTEXT_KEY: Path.cwd()}, unknown=INCLUDE + ) + init_kwargs.update( + { + "id": obj.id, + "is_anonymous": rest_component_version.is_anonymous, + "creation_context": obj.system_data, + "inputs": inputs, + "outputs": outputs, + "name": origin_name, + } + ) + + # remove empty values, because some property only works for specific component, eg: distribution for command + # note that there is an issue that environment == {} will always be true, so use isinstance here + return {k: v for k, v in init_kwargs.items() if v is not None and not (isinstance(v, dict) and not v)} + + def _get_anonymous_hash(self) -> str: + """Return the hash of anonymous component. + + Anonymous Components (same code and interface) will have same hash. + + :return: The component hash + :rtype: str + """ + # omit version since anonymous component's version is random guid + # omit name since name doesn't impact component's uniqueness + return self._get_component_hash(keys_to_omit=["name", "id", "version"]) + + def _get_component_hash(self, keys_to_omit: Optional[Iterable[str]] = None) -> str: + """Return the hash of component. + + :param keys_to_omit: An iterable of keys to omit when computing the component hash + :type keys_to_omit: Optional[Iterable[str]] + :return: The component hash + :rtype: str + """ + component_interface_dict = self._to_dict() + res: str = hash_dict(component_interface_dict, keys_to_omit=keys_to_omit) + return res + + @classmethod + def _get_resource_type(cls) -> str: + return "Microsoft.MachineLearningServices/workspaces/components/versions" + + def _get_resource_name_version(self) -> Tuple: + version: Optional[str] = None + if not self.version and not self._auto_increment_version: + version = str(uuid.uuid4()) + else: + version = self.version + return self.name or ANONYMOUS_COMPONENT_NAME, version + + def _validate(self, raise_error: Optional[bool] = False) -> MutableValidationResult: + origin_name = self.name + # skip name validation for anonymous component as ANONYMOUS_COMPONENT_NAME will be used in component creation + if self._is_anonymous: + self.name = ANONYMOUS_COMPONENT_NAME + try: + return super()._validate(raise_error) + finally: + self.name = origin_name + + def _customized_validate(self) -> MutableValidationResult: + validation_result = super(Component, self)._customized_validate() + + # validate inputs names + validation_result.merge_with(self._validate_io_names(self.inputs, raise_error=False)) + validation_result.merge_with(self._validate_io_names(self.outputs, raise_error=False)) + + return validation_result + + def _get_anonymous_component_name_version(self) -> Tuple: + return ANONYMOUS_COMPONENT_NAME, self._get_anonymous_hash() + + def _get_rest_name_version(self) -> Tuple: + if self._is_anonymous: + return self._get_anonymous_component_name_version() + return self.name, self.version + + def _to_rest_object(self) -> ComponentVersion: + component = self._to_dict() + + # TODO: Remove in PuP with native import job/component type support in MFE/Designer + # Convert import component to command component private preview + if component.get(CommonYamlFields.TYPE, None) == NodeType.IMPORT: + component[CommonYamlFields.TYPE] = NodeType.COMMAND + component["inputs"] = component.pop("source") + component["outputs"] = dict({"output": component.pop("output")}) + # method _to_dict() will remove empty keys + if "tags" not in component: + component["tags"] = {} + component["tags"]["component_type_overwrite"] = NodeType.IMPORT + component["command"] = NodeType.IMPORT + + # add source type to component rest object + component["_source"] = self._source + if self._intellectual_property: + # hack while full pass through supported is worked on for IPP fields + component.pop("intellectual_property") + component["intellectualProperty"] = self._intellectual_property._to_rest_object().serialize() + properties = ComponentVersionProperties( + component_spec=component, + description=self.description, + is_anonymous=self._is_anonymous, + properties=dict(self.properties) if self.properties else {}, + tags=self.tags, + ) + result = ComponentVersion(properties=properties) + if self._is_anonymous: + result.name = ANONYMOUS_COMPONENT_NAME + else: + result.name = self.name + result.properties.properties["client_component_hash"] = self._get_component_hash(keys_to_omit=["version"]) + return result + + def _to_dict(self) -> Dict: + # Replace the name of $schema to schema. + component_schema_dict: dict = self._dump_for_validation() + component_schema_dict.pop(BASE_PATH_CONTEXT_KEY, None) + + # TODO: handle other_parameters and remove override from subclass + return component_schema_dict + + def _localize(self, base_path: str) -> None: + """Called on an asset got from service to clean up remote attributes like id, creation_context, etc. and update + base_path. + + :param base_path: The base_path + :type base_path: str + """ + if not getattr(self, "id", None): + raise ValueError("Only remote asset can be localize but got a {} without id.".format(type(self))) + self._id = None + self._creation_context = None + self._base_path = base_path + + def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict: + # Note: the is_anonymous is not reliable here, create_or_update will log is_anonymous from parameter. + is_anonymous = self.name is None or ANONYMOUS_COMPONENT_NAME in self.name + return {"type": self.type, "source": self._source, "is_anonymous": is_anonymous} + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> "BaseNode": + """Call ComponentVersion as a function and get a Component object. + + :return: The component object + :rtype: BaseNode + """ + if args: + # raise clear error message for unsupported positional args + if self._func._has_parameters: # type: ignore + _error = f"got {args} for {self.name}" + msg = ( + f"Component function doesn't support positional arguments, {_error}. " # type: ignore + f"Please use keyword arguments like: {self._func._func_calling_example}." + ) + else: + msg = ( + "Component function doesn't has any parameters, " + f"please make sure component {self.name} has inputs. " + ) + raise ValidationException( + message=msg, + target=ErrorTarget.COMPONENT, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + return self._func(*args, **kwargs) # pylint: disable=not-callable diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py new file mode 100644 index 00000000..012dd260 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/component_factory.py @@ -0,0 +1,171 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +from typing import Any, Callable, Dict, Optional, Tuple + +from marshmallow import Schema + +from ..._restclient.v2022_10_01.models import ComponentVersion +from ..._utils.utils import is_internal_component_data +from ...constants._common import SOURCE_PATH_CONTEXT_KEY +from ...constants._component import DataTransferTaskType, NodeType +from ...entities._component.automl_component import AutoMLComponent +from ...entities._component.command_component import CommandComponent +from ...entities._component.component import Component +from ...entities._component.datatransfer_component import ( + DataTransferCopyComponent, + DataTransferExportComponent, + DataTransferImportComponent, +) +from ...entities._component.import_component import ImportComponent +from ...entities._component.parallel_component import ParallelComponent +from ...entities._component.pipeline_component import PipelineComponent +from ...entities._component.spark_component import SparkComponent +from ...entities._util import get_type_from_spec +from .flow import FlowComponent + + +class _ComponentFactory: + """A class to create component instances from yaml dict or rest objects without hard-coded type check.""" + + def __init__(self) -> None: + self._create_instance_funcs: Dict = {} + self._create_schema_funcs: Dict = {} + + self.register_type( + _type=NodeType.PARALLEL, + create_instance_func=lambda: ParallelComponent.__new__(ParallelComponent), + create_schema_func=ParallelComponent._create_schema_for_validation, + ) + self.register_type( + _type=NodeType.COMMAND, + create_instance_func=lambda: CommandComponent.__new__(CommandComponent), + create_schema_func=CommandComponent._create_schema_for_validation, + ) + self.register_type( + _type=NodeType.IMPORT, + create_instance_func=lambda: ImportComponent.__new__(ImportComponent), + create_schema_func=ImportComponent._create_schema_for_validation, + ) + self.register_type( + _type=NodeType.PIPELINE, + create_instance_func=lambda: PipelineComponent.__new__(PipelineComponent), + create_schema_func=PipelineComponent._create_schema_for_validation, + ) + self.register_type( + _type=NodeType.AUTOML, + create_instance_func=lambda: AutoMLComponent.__new__(AutoMLComponent), + create_schema_func=AutoMLComponent._create_schema_for_validation, + ) + self.register_type( + _type=NodeType.SPARK, + create_instance_func=lambda: SparkComponent.__new__(SparkComponent), + create_schema_func=SparkComponent._create_schema_for_validation, + ) + self.register_type( + _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.COPY_DATA]), + create_instance_func=lambda: DataTransferCopyComponent.__new__(DataTransferCopyComponent), + create_schema_func=DataTransferCopyComponent._create_schema_for_validation, + ) + + self.register_type( + _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.IMPORT_DATA]), + create_instance_func=lambda: DataTransferImportComponent.__new__(DataTransferImportComponent), + create_schema_func=DataTransferImportComponent._create_schema_for_validation, + ) + + self.register_type( + _type="_".join([NodeType.DATA_TRANSFER, DataTransferTaskType.EXPORT_DATA]), + create_instance_func=lambda: DataTransferExportComponent.__new__(DataTransferExportComponent), + create_schema_func=DataTransferExportComponent._create_schema_for_validation, + ) + + self.register_type( + _type=NodeType.FLOW_PARALLEL, + create_instance_func=lambda: FlowComponent.__new__(FlowComponent), + create_schema_func=FlowComponent._create_schema_for_validation, + ) + + def get_create_funcs( + self, yaml_spec: dict, for_load: bool = False + ) -> Tuple[Callable[..., Component], Callable[[Any], Schema]]: + """Get registered functions to create an instance and its corresponding schema for the given type. + + :param yaml_spec: The YAML specification. + :type yaml_spec: dict + :param for_load: Whether the function is called for loading a component. Defaults to False. + :type for_load: bool + :return: A tuple containing the create_instance_func and create_schema_func. + :rtype: tuple + """ + + _type = get_type_from_spec(yaml_spec, valid_keys=self._create_instance_funcs) + # SparkComponent and InternalSparkComponent share the same type name, but they are different types. + if for_load and is_internal_component_data(yaml_spec, raise_if_not_enabled=True) and _type == NodeType.SPARK: + from azure.ai.ml._internal._schema.node import NodeType as InternalNodeType + + _type = InternalNodeType.SPARK + + create_instance_func = self._create_instance_funcs[_type] + create_schema_func = self._create_schema_funcs[_type] + return create_instance_func, create_schema_func + + def register_type( + self, + _type: str, + create_instance_func: Callable[..., Component], + create_schema_func: Callable[[Any], Schema], + ) -> None: + """Register a new component type. + + :param _type: The type name of the component. + :type _type: str + :param create_instance_func: A function to create an instance of the component. + :type create_instance_func: Callable[..., ~azure.ai.ml.entities.Component] + :param create_schema_func: A function to create a schema for the component. + :type create_schema_func: Callable[[Any], Schema] + """ + self._create_instance_funcs[_type] = create_instance_func + self._create_schema_funcs[_type] = create_schema_func + + @classmethod + def load_from_dict(cls, *, data: Dict, context: Dict, _type: Optional[str] = None, **kwargs: Any) -> Component: + """Load a component from a YAML dict. + + :keyword data: The YAML dict. + :paramtype data: dict + :keyword context: The context of the YAML dict. + :paramtype context: dict + :keyword _type: The type name of the component. When None, it will be inferred from the YAML dict. + :paramtype _type: str + :return: The loaded component. + :rtype: ~azure.ai.ml.entities.Component + """ + + return Component._load( + data=data, + yaml_path=context.get(SOURCE_PATH_CONTEXT_KEY, None), + params_override=[{"type": _type}] if _type is not None else [], + **kwargs, + ) + + @classmethod + def load_from_rest(cls, *, obj: ComponentVersion, _type: Optional[str] = None) -> Component: + """Load a component from a REST object. + + :keyword obj: The REST object. + :paramtype obj: ComponentVersion + :keyword _type: The type name of the component. When None, it will be inferred from the REST object. + :paramtype _type: str + :return: The loaded component. + :rtype: ~azure.ai.ml.entities.Component + """ + if _type is not None: + obj.properties.component_spec["type"] = _type + return Component._from_rest_object(obj) + + +component_factory = _ComponentFactory() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py new file mode 100644 index 00000000..e71712ab --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/datatransfer_component.py @@ -0,0 +1,325 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from pathlib import Path +from typing import Any, Dict, NoReturn, Optional, Union, cast + +from marshmallow import Schema + +from azure.ai.ml._schema.component.data_transfer_component import ( + DataTransferCopyComponentSchema, + DataTransferExportComponentSchema, + DataTransferImportComponentSchema, +) +from azure.ai.ml._utils._experimental import experimental +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, COMPONENT_TYPE, AssetTypes +from azure.ai.ml.constants._component import DataTransferTaskType, ExternalDataType, NodeType +from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem +from azure.ai.ml.entities._inputs_outputs.output import Output +from azure.ai.ml.entities._validation.core import MutableValidationResult +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException + +from ..._schema import PathAwareSchema +from .._util import convert_ordered_dict_to_dict, validate_attribute_type +from .component import Component + + +class DataTransferComponent(Component): + """DataTransfer component version, used to define a data transfer component. + + :param task: Task type in the data transfer component. Possible values are "copy_data", + "import_data", and "export_data". + :type task: str + :param inputs: Mapping of input data bindings used in the job. + :type inputs: dict + :param outputs: Mapping of output data bindings used in the job. + :type outputs: dict + :param kwargs: Additional parameters for the data transfer component. + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + task: Optional[str] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + + kwargs[COMPONENT_TYPE] = NodeType.DATA_TRANSFER + # Set default base path + if BASE_PATH_CONTEXT_KEY not in kwargs: + kwargs[BASE_PATH_CONTEXT_KEY] = Path(".") + + super().__init__( + inputs=inputs, + outputs=outputs, + **kwargs, + ) + self._task = task + + @classmethod + def _attr_type_map(cls) -> dict: + return {} + + @property + def task(self) -> Optional[str]: + """Task type of the component. + + :return: Task type of the component. + :rtype: str + """ + return self._task + + def _to_dict(self) -> Dict: + return cast( + dict, + convert_ordered_dict_to_dict({**self._other_parameter, **super(DataTransferComponent, self)._to_dict()}), + ) + + def __str__(self) -> str: + try: + _toYaml: str = self._to_yaml() + return _toYaml + except BaseException: # pylint: disable=W0718 + _toStr: str = super(DataTransferComponent, self).__str__() + return _toStr + + @classmethod + def _build_source_sink(cls, io_dict: Union[Dict, Database, FileSystem]) -> Union[Database, FileSystem]: + component_io: Union[Database, FileSystem] = Database() + + if isinstance(io_dict, Database): + component_io = Database() + elif isinstance(io_dict, FileSystem): + component_io = FileSystem() + else: + if isinstance(io_dict, dict): + data_type = io_dict.pop("type", None) + if data_type == ExternalDataType.DATABASE: + component_io = Database() + elif data_type == ExternalDataType.FILE_SYSTEM: + component_io = FileSystem() + else: + msg = "Type in source or sink only support {} and {}, currently got {}." + raise ValidationException( + message=msg.format( + ExternalDataType.DATABASE, + ExternalDataType.FILE_SYSTEM, + data_type, + ), + no_personal_data_message=msg.format( + ExternalDataType.DATABASE, + ExternalDataType.FILE_SYSTEM, + "data_type", + ), + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + else: + msg = "Source or sink only support dict, Database and FileSystem" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + return component_io + + +@experimental +class DataTransferCopyComponent(DataTransferComponent): + """DataTransfer copy component version, used to define a data transfer copy component. + + :param data_copy_mode: Data copy mode in the copy task. + Possible values are "merge_with_overwrite" and "fail_if_conflict". + :type data_copy_mode: str + :param inputs: Mapping of input data bindings used in the job. + :type inputs: dict + :param outputs: Mapping of output data bindings used in the job. + :type outputs: dict + :param kwargs: Additional parameters for the data transfer copy component. + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + data_copy_mode: Optional[str] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + kwargs["task"] = DataTransferTaskType.COPY_DATA + super().__init__( + inputs=inputs, + outputs=outputs, + **kwargs, + ) + + self._data_copy_mode = data_copy_mode + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return DataTransferCopyComponentSchema(context=context) + + @property + def data_copy_mode(self) -> Optional[str]: + """Data copy mode of the component. + + :return: Data copy mode of the component. + :rtype: str + """ + return self._data_copy_mode + + def _customized_validate(self) -> MutableValidationResult: + validation_result = super(DataTransferCopyComponent, self)._customized_validate() + validation_result.merge_with(self._validate_input_output_mapping()) + return validation_result + + def _validate_input_output_mapping(self) -> MutableValidationResult: + validation_result = self._create_empty_validation_result() + inputs_count = len(self.inputs) + outputs_count = len(self.outputs) + if outputs_count != 1: + msg = "Only support single output in {}, but there're {} outputs." + validation_result.append_error( + message=msg.format(DataTransferTaskType.COPY_DATA, outputs_count), + yaml_path="outputs", + ) + else: + input_type = None + output_type = None + if inputs_count == 1: + for _, input_data in self.inputs.items(): + input_type = input_data.type + for _, output_data in self.outputs.items(): + output_type = output_data.type + if input_type is None or output_type is None or input_type != output_type: + msg = "Input type {} doesn't exactly match with output type {} in task {}" + validation_result.append_error( + message=msg.format(input_type, output_type, DataTransferTaskType.COPY_DATA), + yaml_path="outputs", + ) + elif inputs_count > 1: + for _, output_data in self.outputs.items(): + output_type = output_data.type + if output_type is None or output_type != AssetTypes.URI_FOLDER: + msg = "output type {} need to be {} in task {}" + validation_result.append_error( + message=msg.format( + output_type, + AssetTypes.URI_FOLDER, + DataTransferTaskType.COPY_DATA, + ), + yaml_path="outputs", + ) + else: + msg = "Inputs must be set in task {}." + validation_result.append_error( + message=msg.format(DataTransferTaskType.COPY_DATA), + yaml_path="inputs", + ) + return validation_result + + +@experimental +class DataTransferImportComponent(DataTransferComponent): + """DataTransfer import component version, used to define a data transfer import component. + + :param source: The data source of the file system or database. + :type source: dict + :param outputs: Mapping of output data bindings used in the job. + Default value is an output port with the key "sink" and the type "mltable". + :type outputs: dict + :param kwargs: Additional parameters for the data transfer import component. + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + source: Optional[Dict] = None, + outputs: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + outputs = outputs or {"sink": Output(type=AssetTypes.MLTABLE)} + kwargs["task"] = DataTransferTaskType.IMPORT_DATA + super().__init__( + outputs=outputs, + **kwargs, + ) + + source = source if source else {} + self.source = self._build_source_sink(source) + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return DataTransferImportComponentSchema(context=context) + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> NoReturn: + """Call ComponentVersion as a function and get a Component object.""" + + msg = "DataTransfer component is not callable for import task." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + ) + + +@experimental +class DataTransferExportComponent(DataTransferComponent): + """DataTransfer export component version, used to define a data transfer export component. + + :param sink: The sink of external data and databases. + :type sink: Union[Dict, Database, FileSystem] + :param inputs: Mapping of input data bindings used in the job. + :type inputs: dict + :param kwargs: Additional parameters for the data transfer export component. + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + inputs: Optional[Dict] = None, + sink: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + kwargs["task"] = DataTransferTaskType.EXPORT_DATA + super().__init__( + inputs=inputs, + **kwargs, + ) + + sink = sink if sink else {} + self.sink = self._build_source_sink(sink) + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return DataTransferExportComponentSchema(context=context) + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> NoReturn: + """Call ComponentVersion as a function and get a Component object.""" + + msg = "DataTransfer component is not callable for export task." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py new file mode 100644 index 00000000..e4ff06cc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/flow.py @@ -0,0 +1,553 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import contextlib +import json +import os +from collections import defaultdict +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Union + +import yaml # type: ignore[import] +from marshmallow import EXCLUDE, Schema, ValidationError + +from azure.ai.ml.constants._common import ( + BASE_PATH_CONTEXT_KEY, + COMPONENT_TYPE, + PROMPTFLOW_AZUREML_OVERRIDE_KEY, + SOURCE_PATH_CONTEXT_KEY, + AssetTypes, + SchemaUrl, +) +from azure.ai.ml.constants._component import ComponentParameterTypes, NodeType + +from ..._restclient.v2022_10_01.models import ComponentVersion +from ..._schema import PathAwareSchema +from ..._schema.component.flow import FlowComponentSchema, FlowSchema, RunSchema +from ...exceptions import ErrorCategory, ErrorTarget, ValidationException +from .. import Environment +from .._inputs_outputs import GroupInput, Input, Output +from ._additional_includes import AdditionalIncludesMixin +from .component import Component + +# avoid circular import error +if TYPE_CHECKING: + from azure.ai.ml.entities._builders.parallel import Parallel + +# pylint: disable=protected-access + + +class _FlowPortNames: + """Common yaml fields. + + Common yaml fields are used to define the common fields in yaml files. It can be one of the following values: type, + name, $schema. + """ + + DATA = "data" + RUN_OUTPUTS = "run_outputs" + CONNECTIONS = "connections" + + FLOW_OUTPUTS = "flow_outputs" + DEBUG_INFO = "debug_info" + + +class _FlowComponentPortDict(dict): + def __init__(self, ports: Dict): + self._allow_update_item = True + super().__init__() + for input_port_name, input_port in ports.items(): + self[input_port_name] = input_port + self._allow_update_item = False + + def __setitem__(self, key: Any, value: Any) -> None: + if not self._allow_update_item: + raise RuntimeError("Ports of flow component are not editable.") + super().__setitem__(key, value) + + def __delitem__(self, key: Any) -> None: + if not self._allow_update_item: + raise RuntimeError("Ports of flow component are not editable.") + super().__delitem__(key) + + +class FlowComponentInputDict(_FlowComponentPortDict): + """Input port dictionary for FlowComponent, with fixed input ports.""" + + def __init__(self) -> None: + super().__init__( + { + _FlowPortNames.CONNECTIONS: GroupInput(values={}, _group_class=None), + _FlowPortNames.DATA: Input(type=AssetTypes.URI_FOLDER, optional=False), + _FlowPortNames.FLOW_OUTPUTS: Input(type=AssetTypes.URI_FOLDER, optional=True), + } + ) + + @contextlib.contextmanager + def _fit_inputs(self, inputs: Optional[Dict]) -> Generator: + """Add dynamic input ports to the input port dictionary. + Input ports of a flow component include: + 1. data: required major uri_folder input + 2. run_output: optional uri_folder input + 3. connections.xxx.xxx: group of string parameters, first layer key can be any node name, + but we won't resolve the exact keys in SDK + 4. xxx: input_mapping parameters, key can be any node name, but we won't resolve the exact keys in SDK + + #3 will be grouped into connections, we make it a fixed group input port. + #4 are dynamic input ports, we will add them temporarily in this context manager and remove them + after the context manager is finished. + + :param inputs: The dynamic input to fit. + :type inputs: Dict[str, Any] + :return: None + :rtype: None + """ + dynamic_columns_mapping_keys = [] + dynamic_connections_inputs = defaultdict(list) + from azure.ai.ml.entities._job.pipeline._io import _GroupAttrDict + from azure.ai.ml.entities._job.pipeline._io.mixin import flatten_dict + + flattened_inputs = flatten_dict(inputs, _GroupAttrDict, allow_dict_fields=[_FlowPortNames.CONNECTIONS]) + + for flattened_input_key in flattened_inputs: + if flattened_input_key.startswith(f"{_FlowPortNames.CONNECTIONS}."): + if flattened_input_key.count(".") != 2: + raise ValidationException( + message="flattened connection input prot name must be " + "in the format of connections.<node_name>.<port_name>, " + "but got %s" % flattened_input_key, + no_personal_data_message="flattened connection input prot name must be in the format of " + "connections.<node_name>.<port_name>", + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + ) + _, node_name, param_name = flattened_input_key.split(".") + dynamic_connections_inputs[node_name].append(param_name) + continue + if flattened_input_key not in self: + dynamic_columns_mapping_keys.append(flattened_input_key) + + self._allow_update_item = True + for flattened_input_key in dynamic_columns_mapping_keys: + self[flattened_input_key] = Input(type=ComponentParameterTypes.STRING, optional=True) + if dynamic_connections_inputs: + self[_FlowPortNames.CONNECTIONS] = GroupInput( + values={ + node_name: GroupInput( + values={ + parameter_name: Input( + type=ComponentParameterTypes.STRING, + ) + for parameter_name in param_names + }, + _group_class=None, + ) + for node_name, param_names in dynamic_connections_inputs.items() + }, + _group_class=None, + ) + self._allow_update_item = False + + yield + + self._allow_update_item = True + for flattened_input_key in dynamic_columns_mapping_keys: + del self[flattened_input_key] + self[_FlowPortNames.CONNECTIONS] = GroupInput(values={}, _group_class=None) + self._allow_update_item = False + + +class FlowComponentOutputDict(_FlowComponentPortDict): + """Output port dictionary for FlowComponent, with fixed output ports.""" + + def __init__(self) -> None: + super().__init__( + { + _FlowPortNames.FLOW_OUTPUTS: Output(type=AssetTypes.URI_FOLDER), + _FlowPortNames.DEBUG_INFO: Output(type=AssetTypes.URI_FOLDER), + } + ) + + +class FlowComponent(Component, AdditionalIncludesMixin): + """Flow component version, used to define a Flow Component or Job. + + :keyword name: The name of the Flow job or component. + :type name: Optional[str] + :keyword version: The version of the Flow job or component. + :type version: Optional[str] + :keyword description: The description of the component. Defaults to None. + :type description: Optional[str] + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None. + :type tags: Optional[dict] + :keyword display_name: The display name of the component. + :type display_name: Optional[str] + :keyword flow: The path to the flow directory or flow definition file. Defaults to None and base path of this + component will be used as flow directory. + :type flow: Optional[Union[str, Path]] + :keyword column_mappings: The column mapping for the flow. Defaults to None. + :type column_mapping: Optional[dict[str, str]] + :keyword variant: The variant of the flow. Defaults to None. + :type variant: Optional[str] + :keyword connections: The connections for the flow. Defaults to None. + :type connections: Optional[dict[str, dict[str, str]]] + :keyword environment_variables: The environment variables for the flow. Defaults to None. + :type environment_variables: Optional[dict[str, str]] + :keyword environment: The environment for the flow component. Defaults to None. + :type environment: Optional[Union[str, Environment]) + :keyword is_deterministic: Specifies whether the Flow will return the same output given the same input. + Defaults to True. When True, if a Flow (component) is deterministic and has been run before in the + current workspace with the same input and settings, it will reuse results from a previous submitted job + when used as a node or step in a pipeline. In that scenario, no compute resources will be used. + :type is_deterministic: Optional[bool] + :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None. + :type additional_includes: Optional[list[str]] + :keyword properties: The job property dictionary. Defaults to None. + :type properties: Optional[dict[str, str]] + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if FlowComponent cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + name: Optional[str] = None, + version: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + flow: Optional[Union[str, Path]] = None, + column_mapping: Optional[Dict[str, str]] = None, + variant: Optional[str] = None, + connections: Optional[Dict[str, Dict[str, str]]] = None, + environment_variables: Optional[Dict[str, str]] = None, + environment: Optional[Union[str, Environment]] = None, + is_deterministic: bool = True, + additional_includes: Optional[List] = None, + properties: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + kwargs[COMPONENT_TYPE] = NodeType.FLOW_PARALLEL + + # always use flow directory as base path + # Note: we suppose that there is no relative path in run.yaml other than flow. + # If there are any, we will need to rebase them so that they have the same base path as attributes in + # flow.dag.yaml + flow_dir, self._flow = self._get_flow_definition( + flow=flow, + base_path=kwargs.pop(BASE_PATH_CONTEXT_KEY, Path.cwd()), + source_path=kwargs.get(SOURCE_PATH_CONTEXT_KEY, None), + ) + kwargs[BASE_PATH_CONTEXT_KEY] = flow_dir + + super().__init__( + name=name or self._normalize_component_name(flow_dir.name), + version=version or "1", + description=description, + tags=tags, + display_name=display_name, + inputs={}, + outputs={}, + is_deterministic=is_deterministic, + properties=properties, + **kwargs, + ) + self._environment = environment + self._column_mapping = column_mapping or {} + self._variant = variant + self._connections = connections or {} + + self._inputs = FlowComponentInputDict() + self._outputs = FlowComponentOutputDict() + + if flow: + # file existence has been checked in _get_flow_definition + # we don't need to rebase additional_includes as we have updated base_path + with open(Path(self.base_path, self._flow), "r", encoding="utf-8") as f: + flow_content = yaml.safe_load(f.read()) + additional_includes = flow_content.get("additional_includes", None) + # environment variables in run.yaml have higher priority than those in flow.dag.yaml + self._environment_variables = flow_content.get("environment_variables", {}) + self._environment_variables.update(environment_variables or {}) + else: + self._environment_variables = environment_variables or {} + + self._additional_includes = additional_includes or [] + + # unlike other Component, code is a private property in FlowComponent and + # will be used to store the arm id of the created code before constructing rest object + # we haven't used self.flow directly as self.flow can be a path to the flow dag yaml file instead of a directory + self._code_arm_id: Optional[str] = None + + # region valid properties + @property + def flow(self) -> str: + """The path to the flow definition file relative to the flow directory. + + :rtype: str + """ + return self._flow + + @property + def environment(self) -> Optional[Union[str, Environment]]: + """The environment for the flow component. Defaults to None. + + :rtype: Union[str, Environment]) + """ + return self._environment + + @environment.setter + def environment(self, value: Union[str, Environment]) -> None: + """The environment for the flow component. Defaults to None. + + :param value: The column mapping for the flow. + :type value: Union[str, Environment]) + """ + self._environment = value + + @property + def column_mapping(self) -> Dict[str, str]: + """The column mapping for the flow. Defaults to None. + + :rtype: Dict[str, str] + """ + return self._column_mapping + + @column_mapping.setter + def column_mapping(self, value: Optional[Dict[str, str]]) -> None: + """ + The column mapping for the flow. Defaults to None. + + :param value: The column mapping for the flow. + :type value: Optional[Dict[str, str]] + """ + self._column_mapping = value or {} + + @property + def variant(self) -> Optional[str]: + """The variant of the flow. Defaults to None. + + :rtype: Optional[str] + """ + return self._variant + + @variant.setter + def variant(self, value: Optional[str]) -> None: + """The variant of the flow. Defaults to None. + + :param value: The variant of the flow. + :type value: Optional[str] + """ + self._variant = value + + @property + def connections(self) -> Dict[str, Dict[str, str]]: + """The connections for the flow. Defaults to None. + + :rtype: Dict[str, Dict[str, str]] + """ + return self._connections + + @connections.setter + def connections(self, value: Optional[Dict[str, Dict[str, str]]]) -> None: + """ + The connections for the flow. Defaults to None. + + :param value: The connections for the flow. + :type value: Optional[Dict[str, Dict[str, str]]] + """ + self._connections = value or {} + + @property + def environment_variables(self) -> Dict[str, str]: + """The environment variables for the flow. Defaults to None. + + :rtype: Dict[str, str] + """ + return self._environment_variables + + @environment_variables.setter + def environment_variables(self, value: Optional[Dict[str, str]]) -> None: + """The environment variables for the flow. Defaults to None. + + :param value: The environment variables for the flow. + :type value: Optional[Dict[str, str]] + """ + self._environment_variables = value or {} + + @property + def additional_includes(self) -> List: + """A list of shared additional files to be included in the component. Defaults to None. + + :rtype: List + """ + return self._additional_includes + + @additional_includes.setter + def additional_includes(self, value: Optional[List]) -> None: + """A list of shared additional files to be included in the component. Defaults to None. + All local additional includes should be relative to the flow directory. + + :param value: A list of shared additional files to be included in the component. + :type value: Optional[List] + """ + self._additional_includes = value or [] + + # endregion + + @classmethod + def _normalize_component_name(cls, value: str) -> str: + return value.replace("-", "_") + + # region Component + @classmethod + def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict: + raise RuntimeError("FlowComponent does not support loading from REST object.") + + def _to_rest_object(self) -> ComponentVersion: + rest_obj = super()._to_rest_object() + rest_obj.properties.component_spec["code"] = self._code_arm_id + rest_obj.properties.component_spec["flow_file_name"] = self._flow + return rest_obj + + def _func(self, **kwargs: Any) -> "Parallel": # pylint: disable=invalid-overridden-method + from azure.ai.ml.entities._builders.parallel import Parallel + + with self._inputs._fit_inputs(kwargs): # type: ignore[attr-defined] + # pylint: disable=not-callable + return super()._func(**kwargs) # type: ignore + + @classmethod + def _get_flow_definition( + cls, + base_path: Path, + *, + flow: Optional[Union[str, os.PathLike]] = None, + source_path: Optional[Union[str, os.PathLike]] = None, + ) -> Tuple[Path, str]: + """ + Get the path to the flow directory and the file name of the flow dag yaml file. + If flow is not specified, we will assume that the source_path is the path to the flow dag yaml file. + If flow is specified, it can be either a path to the flow dag yaml file or a path to the flow directory. + If flow is a path to the flow directory, we will assume that the flow dag yaml file is named flow.dag.yaml. + + :param base_path: The base path of the flow component. + :type base_path: Path + :keyword flow: The path to the flow directory or flow definition file. Defaults to None and base path of this + component will be used as flow directory. + :type flow: Optional[Union[str, Path]] + :keyword source_path: The source path of the flow component, should be path to the flow dag yaml file + if specified. + :type source_path: Optional[Union[str, os.PathLike]] + :return: The path to the flow directory and the file name of the flow dag yaml file. + :rtype: Tuple[Path, str] + """ + flow_file_name = "flow.dag.yaml" + + if flow is None and source_path is None: + raise cls._create_validation_error( + message="Either flow or source_path must be specified.", + no_personal_data_message="Either flow or source_path must be specified.", + ) + + if flow is None: + # Flow component must be created with a local yaml file, so no need to check if source_path exists + if isinstance(source_path, (os.PathLike, str)): + flow_file_name = os.path.basename(source_path) + return Path(base_path), flow_file_name + + flow_path = Path(flow) + if not flow_path.is_absolute(): + # if flow_path points to a symlink, we still use the parent of the symlink as origin code + flow_path = Path(base_path, flow) + + if flow_path.is_dir() and (flow_path / flow_file_name).is_file(): + return flow_path, flow_file_name + + if flow_path.is_file(): + return flow_path.parent, flow_path.name + + raise cls._create_validation_error( + message="Flow path must be a directory containing flow.dag.yaml or a file, but got %s" % flow_path, + no_personal_data_message="Flow path must be a directory or a file", + ) + + # endregion + + # region SchemaValidatableMixin + @classmethod + def _load_with_schema( + cls, data: Any, *, context: Optional[Any] = None, raise_original_exception: bool = False, **kwargs: Any + ) -> Any: + # FlowComponent should be loaded with FlowSchema or FlowRunSchema instead of FlowComponentSchema + context = context or {BASE_PATH_CONTEXT_KEY: Path.cwd()} + _schema = data.get("$schema", None) + if _schema == SchemaUrl.PROMPTFLOW_RUN: + schema = RunSchema(context=context) + elif _schema == SchemaUrl.PROMPTFLOW_FLOW: + schema = FlowSchema(context=context) + else: + raise cls._create_validation_error( + message="$schema must be specified correctly for loading component from flow, but got %s" % _schema, + no_personal_data_message="$schema must be specified for loading component from flow", + ) + + # unlike other component, we should ignore unknown fields in flow to keep init_params clean and avoid + # too much understanding of flow.dag.yaml & run.yaml + kwargs["unknown"] = EXCLUDE + try: + loaded_dict = schema.load(data, **kwargs) + except ValidationError as e: + if raise_original_exception: + raise e + msg = "Trying to load data with schema failed. Data:\n%s\nError: %s" % ( + json.dumps(data, indent=4) if isinstance(data, dict) else data, + json.dumps(e.messages, indent=4), + ) + raise cls._create_validation_error( + message=msg, + no_personal_data_message=str(e), + ) from e + loaded_dict.update(loaded_dict.pop(PROMPTFLOW_AZUREML_OVERRIDE_KEY, {})) + return loaded_dict + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return FlowComponentSchema(context=context) + + # endregion + + # region AdditionalIncludesMixin + def _get_origin_code_value(self) -> Union[str, os.PathLike, None]: + if self._code_arm_id: + return self._code_arm_id + res: Union[str, os.PathLike, None] = self.base_path + return res + + def _fill_back_code_value(self, value: str) -> None: + self._code_arm_id = value + + @contextlib.contextmanager + def _try_build_local_code(self) -> Generator: + # false-positive by pylint, hence disable it + # (https://github.com/pylint-dev/pylint/blob/main/doc/data/messages + # /c/contextmanager-generator-missing-cleanup/details.rst) + with super()._try_build_local_code() as code: # pylint:disable=contextmanager-generator-missing-cleanup + if not code or not code.path: + yield code + return + + if not (Path(code.path) / ".promptflow" / "flow.tools.json").is_file(): + raise self._create_validation_error( + message="Flow component must be created with a ./promptflow/flow.tools.json, " + "please run `pf flow validate` to generate it or skip it in your ignore file.", + no_personal_data_message="Flow component must be created with a ./promptflow/flow.tools.json, " + "please run `pf flow validate` to generate it or skip it in your ignore file.", + ) + # TODO: should we remove additional includes from flow.dag.yaml? for now we suppose it will be removed + # by mldesigner compile if needed + + yield code + + # endregion diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py new file mode 100644 index 00000000..13464a06 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/import_component.py @@ -0,0 +1,96 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from pathlib import Path +from typing import Any, Dict, Optional, Union + +from marshmallow import Schema + +from azure.ai.ml._schema.component.import_component import ImportComponentSchema +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, COMPONENT_TYPE +from azure.ai.ml.constants._component import NodeType + +from ..._schema import PathAwareSchema +from ..._utils.utils import parse_args_description_from_docstring +from .._util import convert_ordered_dict_to_dict +from .component import Component + + +class ImportComponent(Component): + """Import component version, used to define an import component. + + :param name: Name of the component. + :type name: str + :param version: Version of the component. + :type version: str + :param description: Description of the component. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict + :param display_name: Display name of the component. + :type display_name: str + :param source: Input source parameters of the component. + :type source: dict + :param output: Output of the component. + :type output: dict + :param is_deterministic: Whether the command component is deterministic. Defaults to True. + :type is_deterministic: bool + :param kwargs: Additional parameters for the import component. + """ + + def __init__( + self, + *, + name: Optional[str] = None, + version: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + source: Optional[Dict] = None, + output: Optional[Dict] = None, + is_deterministic: bool = True, + **kwargs: Any, + ) -> None: + kwargs[COMPONENT_TYPE] = NodeType.IMPORT + # Set default base path + if BASE_PATH_CONTEXT_KEY not in kwargs: + kwargs[BASE_PATH_CONTEXT_KEY] = Path(".") + + super().__init__( + name=name, + version=version, + description=description, + tags=tags, + display_name=display_name, + inputs=source, + outputs={"output": output} if output else None, + is_deterministic=is_deterministic, + **kwargs, + ) + + self.source = source + self.output = output + + def _to_dict(self) -> Dict: + # TODO: Bug Item number: 2897665 + res: Dict = convert_ordered_dict_to_dict( # type: ignore + {**self._other_parameter, **super(ImportComponent, self)._to_dict()} + ) + return res + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return ImportComponentSchema(context=context) + + @classmethod + def _parse_args_description_from_docstring(cls, docstring: str) -> Dict: + res: dict = parse_args_description_from_docstring(docstring) + return res + + def __str__(self) -> str: + try: + toYaml: str = self._to_yaml() + return toYaml + except BaseException: # pylint: disable=W0718 + toStr: str = super(ImportComponent, self).__str__() + return toStr diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py new file mode 100644 index 00000000..3f29b1e1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/parallel_component.py @@ -0,0 +1,305 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import json +import os +import re +from typing import Any, Dict, List, Optional, Union, cast + +from marshmallow import Schema + +from azure.ai.ml._restclient.v2022_10_01.models import ComponentVersion +from azure.ai.ml._schema.component.parallel_component import ParallelComponentSchema +from azure.ai.ml.constants._common import COMPONENT_TYPE +from azure.ai.ml.constants._component import NodeType +from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration +from azure.ai.ml.entities._job.parallel.parallel_task import ParallelTask +from azure.ai.ml.entities._job.parallel.parameterized_parallel import ParameterizedParallel +from azure.ai.ml.entities._job.parallel.retry_settings import RetrySettings +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException + +from ..._schema import PathAwareSchema +from .._util import validate_attribute_type +from .._validation import MutableValidationResult +from .code import ComponentCodeMixin +from .component import Component + + +class ParallelComponent( + Component, ParameterizedParallel, ComponentCodeMixin +): # pylint: disable=too-many-instance-attributes + """Parallel component version, used to define a parallel component. + + :param name: Name of the component. Defaults to None + :type name: str + :param version: Version of the component. Defaults to None + :type version: str + :param description: Description of the component. Defaults to None + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None + :type tags: dict + :param display_name: Display name of the component. Defaults to None + :type display_name: str + :param retry_settings: parallel component run failed retry. Defaults to None + :type retry_settings: BatchRetrySettings + :param logging_level: A string of the logging level name. Defaults to None + :type logging_level: str + :param max_concurrency_per_instance: The max parallellism that each compute instance has. Defaults to None + :type max_concurrency_per_instance: int + :param error_threshold: The number of item processing failures should be ignored. Defaults to None + :type error_threshold: int + :param mini_batch_error_threshold: The number of mini batch processing failures should be ignored. Defaults to None + :type mini_batch_error_threshold: int + :param task: The parallel task. Defaults to None + :type task: ParallelTask + :param mini_batch_size: For FileDataset input, this field is the number of files a user script can process + in one run() call. For TabularDataset input, this field is the approximate size of data the user script + can process in one run() call. Example values are 1024, 1024KB, 10MB, and 1GB. + (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set + through PipelineParameter. + :type mini_batch_size: str + :param partition_keys: The keys used to partition dataset into mini-batches. Defaults to None + If specified, the data with the same key will be partitioned into the same mini-batch. + If both partition_keys and mini_batch_size are specified, partition_keys will take effect. + The input(s) must be partitioned dataset(s), + and the partition_keys must be a subset of the keys of every input dataset for this to work. + :type partition_keys: list + :param input_data: The input data. Defaults to None + :type input_data: str + :param resources: Compute Resource configuration for the component. Defaults to None + :type resources: Union[dict, ~azure.ai.ml.entities.JobResourceConfiguration] + :param inputs: Inputs of the component. Defaults to None + :type inputs: dict + :param outputs: Outputs of the component. Defaults to None + :type outputs: dict + :param code: promoted property from task.code + :type code: str + :param instance_count: promoted property from resources.instance_count. Defaults to None + :type instance_count: int + :param is_deterministic: Whether the parallel component is deterministic. Defaults to True + :type is_deterministic: bool + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if ParallelComponent cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( # pylint: disable=too-many-locals + self, + *, + name: Optional[str] = None, + version: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict[str, Any]] = None, + display_name: Optional[str] = None, + retry_settings: Optional[RetrySettings] = None, + logging_level: Optional[str] = None, + max_concurrency_per_instance: Optional[int] = None, + error_threshold: Optional[int] = None, + mini_batch_error_threshold: Optional[int] = None, + task: Optional[ParallelTask] = None, + mini_batch_size: Optional[str] = None, + partition_keys: Optional[List] = None, + input_data: Optional[str] = None, + resources: Optional[JobResourceConfiguration] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + code: Optional[str] = None, # promoted property from task.code + instance_count: Optional[int] = None, # promoted property from resources.instance_count + is_deterministic: bool = True, + **kwargs: Any, + ): + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + + kwargs[COMPONENT_TYPE] = NodeType.PARALLEL + + super().__init__( + name=name, + version=version, + description=description, + tags=tags, + display_name=display_name, + inputs=inputs, + outputs=outputs, + is_deterministic=is_deterministic, + **kwargs, + ) + + # No validation on value passed here because in pipeline job, required code&environment maybe absent + # and fill in later with job defaults. + self.task = task + self.mini_batch_size: int = 0 + self.partition_keys = partition_keys + self.input_data = input_data + self.retry_settings = retry_settings + self.logging_level = logging_level + self.max_concurrency_per_instance = max_concurrency_per_instance + self.error_threshold = error_threshold + self.mini_batch_error_threshold = mini_batch_error_threshold + self.resources = resources + + # check mutual exclusivity of promoted properties + if self.resources is not None and instance_count is not None: + msg = "instance_count and resources are mutually exclusive" + raise ValidationException( + message=msg, + target=ErrorTarget.COMPONENT, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + self.instance_count = instance_count + self.code = code + + if mini_batch_size is not None: + # Convert str to int. + pattern = re.compile(r"^\d+([kKmMgG][bB])*$") + if not pattern.match(mini_batch_size): + raise ValueError(r"Parameter mini_batch_size must follow regex rule ^\d+([kKmMgG][bB])*$") + + try: + self.mini_batch_size = int(mini_batch_size) + except ValueError as e: + unit = mini_batch_size[-2:].lower() + if unit == "kb": + self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 + elif unit == "mb": + self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024 + elif unit == "gb": + self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024 * 1024 + else: + raise ValueError("mini_batch_size unit must be kb, mb or gb") from e + + @property + def instance_count(self) -> Optional[int]: + """Return value of promoted property resources.instance_count. + + :return: Value of resources.instance_count. + :rtype: Optional[int] + """ + return self.resources.instance_count if self.resources and not isinstance(self.resources, dict) else None + + @instance_count.setter + def instance_count(self, value: int) -> None: + """Set the value of the promoted property resources.instance_count. + + :param value: The value to set for resources.instance_count. + :type value: int + """ + if not value: + return + if not self.resources: + self.resources = JobResourceConfiguration(instance_count=value) + else: + if not isinstance(self.resources, dict): + self.resources.instance_count = value + + @property + def code(self) -> Optional[str]: + """Return value of promoted property task.code, which is a local or + remote path pointing at source code. + + :return: Value of task.code. + :rtype: Optional[str] + """ + return self.task.code if self.task else None + + @code.setter + def code(self, value: str) -> None: + """Set the value of the promoted property task.code. + + :param value: The value to set for task.code. + :type value: str + """ + if not value: + return + if not self.task: + self.task = ParallelTask(code=value) + else: + self.task.code = value + + def _to_ordered_dict_for_yaml_dump(self) -> Dict: + """Dump the component content into a sorted yaml string. + + :return: The ordered dict + :rtype: Dict + """ + + obj: dict = super()._to_ordered_dict_for_yaml_dump() + # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value + if self.code and isinstance(self.code, str): + obj["task"]["code"] = self.code + return obj + + @property + def environment(self) -> Optional[str]: + """Return value of promoted property task.environment, indicate the + environment that training job will run in. + + :return: Value of task.environment. + :rtype: Optional[Environment, str] + """ + if self.task: + return cast(Optional[str], self.task.environment) + return None + + @environment.setter + def environment(self, value: str) -> None: + """Set the value of the promoted property task.environment. + + :param value: The value to set for task.environment. + :type value: str + """ + if not value: + return + if not self.task: + self.task = ParallelTask(environment=value) + else: + self.task.environment = value + + def _customized_validate(self) -> MutableValidationResult: + validation_result = super()._customized_validate() + self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result) + return validation_result + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "retry_settings": (dict, RetrySettings), + "task": (dict, ParallelTask), + "logging_level": str, + "max_concurrency_per_instance": int, + "input_data": str, + "error_threshold": int, + "mini_batch_error_threshold": int, + "code": (str, os.PathLike), + "resources": (dict, JobResourceConfiguration), + } + + def _to_rest_object(self) -> ComponentVersion: + rest_object = super()._to_rest_object() + # schema required list while backend accept json string + if self.partition_keys: + rest_object.properties.component_spec["partition_keys"] = json.dumps(self.partition_keys) + return rest_object + + @classmethod + def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict: + # schema required list while backend accept json string + # update rest obj as it will be + partition_keys = obj.properties.component_spec.get("partition_keys", None) + if partition_keys: + obj.properties.component_spec["partition_keys"] = json.loads(partition_keys) + res: dict = super()._from_rest_object_to_init_params(obj) + return res + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return ParallelComponentSchema(context=context) + + def __str__(self) -> str: + try: + toYaml: str = self._to_yaml() + return toYaml + except BaseException: # pylint: disable=W0718 + toStr: str = super(ParallelComponent, self).__str__() + return toStr diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py new file mode 100644 index 00000000..229b714d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/pipeline_component.py @@ -0,0 +1,529 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import json +import logging +import os +import re +import time +import typing +from collections import Counter +from typing import Any, Dict, List, Optional, Tuple, Union + +from marshmallow import Schema + +from azure.ai.ml._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties +from azure.ai.ml._schema import PathAwareSchema +from azure.ai.ml._schema.pipeline.pipeline_component import PipelineComponentSchema +from azure.ai.ml._utils._asset_utils import get_object_hash +from azure.ai.ml._utils.utils import hash_dict, is_data_binding_expression +from azure.ai.ml.constants._common import ARM_ID_PREFIX, ASSET_ARM_ID_REGEX_FORMAT, COMPONENT_TYPE +from azure.ai.ml.constants._component import ComponentSource, NodeType +from azure.ai.ml.constants._job.pipeline import ValidationErrorCode +from azure.ai.ml.entities._builders import BaseNode, Command +from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode, LoopNode +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._inputs_outputs import GroupInput, Input +from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob +from azure.ai.ml.entities._job.pipeline._attr_dict import has_attr_safe, try_get_non_arbitrary_attr +from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression +from azure.ai.ml.entities._validation import MutableValidationResult +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException + +module_logger = logging.getLogger(__name__) + + +class PipelineComponent(Component): + """Pipeline component, currently used to store components in an azure.ai.ml.dsl.pipeline. + + :param name: Name of the component. + :type name: str + :param version: Version of the component. + :type version: str + :param description: Description of the component. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict + :param display_name: Display name of the component. + :type display_name: str + :param inputs: Component inputs. + :type inputs: dict + :param outputs: Component outputs. + :type outputs: dict + :param jobs: Id to components dict inside the pipeline definition. + :type jobs: Dict[str, ~azure.ai.ml.entities._builders.BaseNode] + :param is_deterministic: Whether the pipeline component is deterministic. + :type is_deterministic: bool + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if PipelineComponent cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + name: Optional[str] = None, + version: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + jobs: Optional[Dict[str, BaseNode]] = None, + is_deterministic: Optional[bool] = None, + **kwargs: Any, + ) -> None: + kwargs[COMPONENT_TYPE] = NodeType.PIPELINE + super().__init__( + name=name, + version=version, + description=description, + tags=tags, + display_name=display_name, + inputs=inputs, + outputs=outputs, + is_deterministic=is_deterministic, # type: ignore[arg-type] + **kwargs, + ) + self._jobs = self._process_jobs(jobs) if jobs else {} + # for telemetry + self._job_types, self._job_sources = self._get_job_type_and_source() + # Private support: create pipeline component from pipeline job + self._source_job_id = kwargs.pop("source_job_id", None) + # TODO: set anonymous hash for reuse + + def _process_jobs(self, jobs: Dict[str, BaseNode]) -> Dict[str, BaseNode]: + """Process and validate jobs. + + :param jobs: A map of node name to node + :type jobs: Dict[str, BaseNode] + :return: The processed jobs + :rtype: Dict[str, BaseNode] + """ + # Remove swept Command + node_names_to_skip = [] + for node_name, job_instance in jobs.items(): + if isinstance(job_instance, Command) and job_instance._swept is True: + node_names_to_skip.append(node_name) + + for key in node_names_to_skip: + del jobs[key] + + # Set path and validate node type. + for _, job_instance in jobs.items(): + if isinstance(job_instance, BaseNode): + job_instance._set_base_path(self.base_path) + + if not isinstance(job_instance, (BaseNode, AutoMLJob, ControlFlowNode)): + msg = f"Not supported pipeline job type: {type(job_instance)}" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + return jobs + + def _customized_validate(self) -> MutableValidationResult: + """Validate pipeline component structure. + + :return: The validation result + :rtype: MutableValidationResult + """ + validation_result = super(PipelineComponent, self)._customized_validate() + + # Validate inputs + for input_name, input_value in self.inputs.items(): + if input_value.type is None: + validation_result.append_error( + yaml_path="inputs.{}".format(input_name), + message="Parameter type unknown, please add type annotation or specify input default value.", + error_code=ValidationErrorCode.PARAMETER_TYPE_UNKNOWN, + ) + + # Validate all nodes + for node_name, node in self.jobs.items(): + if isinstance(node, BaseNode): + # Node inputs will be validated. + validation_result.merge_with(node._validate(), "jobs.{}".format(node_name)) + if isinstance(node.component, Component): + # Validate binding if not remote resource. + validation_result.merge_with(self._validate_binding_inputs(node)) + elif isinstance(node, AutoMLJob): + pass + elif isinstance(node, ControlFlowNode): + # Validate control flow node. + validation_result.merge_with(node._validate(), "jobs.{}".format(node_name)) + else: + validation_result.append_error( + yaml_path="jobs.{}".format(node_name), + message=f"Not supported pipeline job type: {type(node)}", + ) + + return validation_result + + def _validate_compute_is_set(self, *, parent_node_name: Optional[str] = None) -> MutableValidationResult: + """Validate compute in pipeline component. + + This function will only be called from pipeline_job._validate_compute_is_set + when both of the pipeline_job.compute and pipeline_job.settings.default_compute is None. + Rules: + - For pipeline node: will call node._component._validate_compute_is_set to validate node compute in sub graph. + - For general node: + - If _skip_required_compute_missing_validation is True, validation will be skipped. + - All the rest of cases without compute will add compute not set error to validation result. + + :keyword parent_node_name: The name of the parent node. + :type parent_node_name: Optional[str] + :return: The validation result + :rtype: MutableValidationResult + """ + + # Note: do not put this into customized validate, as we would like call + # this from pipeline_job._validate_compute_is_set + validation_result = self._create_empty_validation_result() + no_compute_nodes = [] + parent_node_name = parent_node_name if parent_node_name else "" + for node_name, node in self.jobs.items(): + full_node_name = f"{parent_node_name}{node_name}.jobs." + if node.type == NodeType.PIPELINE and isinstance(node._component, PipelineComponent): + validation_result.merge_with(node._component._validate_compute_is_set(parent_node_name=full_node_name)) + continue + if isinstance(node, BaseNode) and node._skip_required_compute_missing_validation: + continue + if has_attr_safe(node, "compute") and node.compute is None: + no_compute_nodes.append(node_name) + + for node_name in no_compute_nodes: + validation_result.append_error( + yaml_path=f"jobs.{parent_node_name}{node_name}.compute", + message="Compute not set", + ) + return validation_result + + def _get_input_binding_dict(self, node: BaseNode) -> Tuple[dict, dict]: + """Return the input binding dict for each node. + + :param node: The node + :type node: BaseNode + :return: A 2-tuple of (binding_dict, optional_binding_in_expression_dict) + :rtype: Tuple[dict, dict] + """ + # pylint: disable=too-many-nested-blocks + binding_inputs = node._build_inputs() + # Collect binding relation dict {'pipeline_input': ['node_input']} + binding_dict: dict = {} + optional_binding_in_expression_dict: dict = {} + for component_input_name, component_binding_input in binding_inputs.items(): + if isinstance(component_binding_input, PipelineExpression): + for pipeline_input_name in component_binding_input._inputs.keys(): + if pipeline_input_name not in self.inputs: + continue + if pipeline_input_name not in binding_dict: + binding_dict[pipeline_input_name] = [] + binding_dict[pipeline_input_name].append(component_input_name) + if pipeline_input_name not in optional_binding_in_expression_dict: + optional_binding_in_expression_dict[pipeline_input_name] = [] + optional_binding_in_expression_dict[pipeline_input_name].append(pipeline_input_name) + else: + if isinstance(component_binding_input, Input): + component_binding_input = component_binding_input.path + if is_data_binding_expression(component_binding_input, ["parent"]): + # data binding may have more than one PipelineInput now + for pipeline_input_name in PipelineExpression.parse_pipeline_inputs_from_data_binding( + component_binding_input + ): + if pipeline_input_name not in self.inputs: + continue + if pipeline_input_name not in binding_dict: + binding_dict[pipeline_input_name] = [] + binding_dict[pipeline_input_name].append(component_input_name) + # for data binding expression "${{parent.inputs.pipeline_input}}", it should not be optional + if len(component_binding_input.replace("${{parent.inputs." + pipeline_input_name + "}}", "")): + if pipeline_input_name not in optional_binding_in_expression_dict: + optional_binding_in_expression_dict[pipeline_input_name] = [] + optional_binding_in_expression_dict[pipeline_input_name].append(pipeline_input_name) + return binding_dict, optional_binding_in_expression_dict + + def _validate_binding_inputs(self, node: BaseNode) -> MutableValidationResult: + """Validate pipeline binding inputs and return all used pipeline input names. + + Mark input as optional if all binding is optional and optional not set. Raise error if pipeline input is + optional but link to required inputs. + + :param node: The node to validate + :type node: BaseNode + :return: The validation result + :rtype: MutableValidationResult + """ + component_definition_inputs = {} + # Add flattened group input into definition inputs. + # e.g. Add {'group_name.item': PipelineInput} for {'group_name': GroupInput} + for name, val in node.component.inputs.items(): + if isinstance(val, GroupInput): + component_definition_inputs.update(val.flatten(group_parameter_name=name)) + component_definition_inputs[name] = val + # Collect binding relation dict {'pipeline_input': ['node_input']} + validation_result = self._create_empty_validation_result() + binding_dict, optional_binding_in_expression_dict = self._get_input_binding_dict(node) + + # Validate links required and optional + for pipeline_input_name, binding_inputs in binding_dict.items(): + pipeline_input = self.inputs[pipeline_input_name] + required_bindings = [] + for name in binding_inputs: + # not check optional/required for pipeline input used in pipeline expression + if name in optional_binding_in_expression_dict.get(pipeline_input_name, []): + continue + if name in component_definition_inputs and component_definition_inputs[name].optional is not True: + required_bindings.append(f"{node.name}.inputs.{name}") + if pipeline_input.optional is None and not required_bindings: + # Set input as optional if all binding is optional and optional not set. + pipeline_input.optional = True + pipeline_input._is_inferred_optional = True + elif pipeline_input.optional is True and required_bindings: + if pipeline_input._is_inferred_optional: + # Change optional=True to None if is inferred by us + pipeline_input.optional = None + else: + # Raise exception if pipeline input is optional set by user but link to required inputs. + validation_result.append_error( + yaml_path="inputs.{}".format(pipeline_input._port_name), + message=f"Pipeline optional Input binding to required inputs: {required_bindings}", + ) + return validation_result + + def _get_job_type_and_source(self) -> Tuple[Dict[str, int], Dict[str, int]]: + """Get job types and sources for telemetry. + + :return: A 2-tuple of + * A map of job type to the number of occurrences + * A map of job source to the number of occurrences + :rtype: Tuple[Dict[str, int], Dict[str, int]] + """ + job_types: list = [] + job_sources = [] + for job in self.jobs.values(): + job_types.append(job.type) + if isinstance(job, BaseNode): + job_sources.append(job._source) + elif isinstance(job, AutoMLJob): + # Consider all automl_job has builder type for now, + # as it's not easy to distinguish their source(yaml/builder). + job_sources.append(ComponentSource.BUILDER) + else: + # Fall back to CLASS + job_sources.append(ComponentSource.CLASS) + return dict(Counter(job_types)), dict(Counter(job_sources)) + + @property + def jobs(self) -> Dict[str, BaseNode]: + """Return a dictionary from component variable name to component object. + + :return: Dictionary mapping component variable names to component objects. + :rtype: Dict[str, ~azure.ai.ml.entities._builders.BaseNode] + """ + return self._jobs + + def _get_anonymous_hash(self) -> str: + """Get anonymous hash for pipeline component. + + :return: The anonymous hash of the pipeline component + :rtype: str + """ + # ideally we should always use rest object to generate hash as it's the same as + # what we send to server-side, but changing the hash function will break reuse of + # existing components except for command component (hash result is the same for + # command component), so we just use rest object to generate hash for pipeline component, + # which doesn't have reuse issue. + component_interface_dict = self._to_rest_object().properties.component_spec + # Hash local inputs in pipeline component jobs + for job_name, job in self.jobs.items(): + if getattr(job, "inputs", None): + for input_name, input_value in job.inputs.items(): + try: + if ( + getattr(input_value, "_data", None) + and isinstance(input_value._data, Input) + and input_value.path + and os.path.exists(input_value.path) + ): + start_time = time.time() + component_interface_dict["jobs"][job_name]["inputs"][input_name]["content_hash"] = ( + get_object_hash(input_value.path) + ) + module_logger.debug( + "Takes %s seconds to calculate the content hash of local input %s", + time.time() - start_time, + input_value.path, + ) + except ValidationException: + pass + hash_value: str = hash_dict( + component_interface_dict, + keys_to_omit=[ + # omit name since anonymous component will have same name + "name", + # omit _source since it doesn't impact component's uniqueness + "_source", + # omit id since it will be set after component is registered + "id", + # omit version since it will be set to this hash later + "version", + ], + ) + return hash_value + + @classmethod + def _load_from_rest_pipeline_job(cls, data: Dict) -> "PipelineComponent": + # TODO: refine this? + # Set type as None here to avoid schema validation failed + definition_inputs = {p: {"type": None} for p in data.get("inputs", {}).keys()} + definition_outputs = {p: {"type": None} for p in data.get("outputs", {}).keys()} + return PipelineComponent( + display_name=data.get("display_name"), + description=data.get("description"), + inputs=definition_inputs, + outputs=definition_outputs, + jobs=data.get("jobs"), + _source=ComponentSource.REMOTE_WORKSPACE_JOB, + ) + + @classmethod + def _resolve_sub_nodes(cls, rest_jobs: Dict) -> Dict: + from azure.ai.ml.entities._job.pipeline._load_component import pipeline_node_factory + + sub_nodes = {} + if rest_jobs is None: + return sub_nodes + for node_name, node in rest_jobs.items(): + # TODO: Remove this ad-hoc fix after unified arm id format in object + component_id = node.get("componentId", "") + if isinstance(component_id, str) and re.match(ASSET_ARM_ID_REGEX_FORMAT, component_id): + node["componentId"] = component_id[len(ARM_ID_PREFIX) :] + if not LoopNode._is_loop_node_dict(node): + # skip resolve LoopNode first since it may reference other nodes + # use node factory instead of BaseNode._from_rest_object here as AutoMLJob is not a BaseNode + sub_nodes[node_name] = pipeline_node_factory.load_from_rest_object(obj=node) + for node_name, node in rest_jobs.items(): + if LoopNode._is_loop_node_dict(node): + # resolve LoopNode after all other nodes are resolved + sub_nodes[node_name] = pipeline_node_factory.load_from_rest_object(obj=node, pipeline_jobs=sub_nodes) + return sub_nodes + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return PipelineComponentSchema(context=context) + + @classmethod + def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]: + # jobs validations are done in _customized_validate() + return ["jobs"] + + @classmethod + def _check_ignored_keys(cls, obj: object) -> List[str]: + """Return ignored keys in obj as a pipeline component when its value be set. + + :param obj: The object to examine + :type obj: object + :return: List of keys to ignore + :rtype: List[str] + """ + examine_mapping = { + "compute": lambda val: val is not None, + "settings": lambda val: val is not None and any(v is not None for v in val._to_dict().values()), + } + # Avoid new attr added by use `try_get_non...` instead of `hasattr` or `getattr` directly. + return [k for k, has_set in examine_mapping.items() if has_set(try_get_non_arbitrary_attr(obj, k))] + + def _get_telemetry_values(self, *args: Any, **kwargs: Any) -> Dict: + telemetry_values: dict = super()._get_telemetry_values() + telemetry_values.update( + { + "source": self._source, + "node_count": len(self.jobs), + "node_type": json.dumps(self._job_types), + "node_source": json.dumps(self._job_sources), + } + ) + return telemetry_values + + @classmethod + def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict: + # Pop jobs to avoid it goes with schema load + jobs = obj.properties.component_spec.pop("jobs", None) + init_params_dict: dict = super()._from_rest_object_to_init_params(obj) + if jobs: + try: + init_params_dict["jobs"] = PipelineComponent._resolve_sub_nodes(jobs) + except Exception as e: # pylint: disable=W0718 + # Skip parse jobs if error exists. + # TODO: https://msdata.visualstudio.com/Vienna/_workitems/edit/2052262 + module_logger.debug("Parse pipeline component jobs failed with: %s", e) + return init_params_dict + + def _to_dict(self) -> Dict: + return {**self._other_parameter, **super()._to_dict()} + + def _build_rest_component_jobs(self) -> Dict[str, dict]: + """Build pipeline component jobs to rest. + + :return: A map of job name to rest objects + :rtype: Dict[str, dict] + """ + # Build the jobs to dict + rest_component_jobs = {} + for job_name, job in self.jobs.items(): + if isinstance(job, (BaseNode, ControlFlowNode)): + rest_node_dict = job._to_rest_object() + elif isinstance(job, AutoMLJob): + rest_node_dict = json.loads(json.dumps(job._to_dict(inside_pipeline=True))) + else: + msg = f"Non supported job type in Pipeline jobs: {type(job)}" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + ) + rest_component_jobs[job_name] = rest_node_dict + return rest_component_jobs + + def _to_rest_object(self) -> ComponentVersion: + """Check ignored keys and return rest object. + + :return: The component version + :rtype: ComponentVersion + """ + ignored_keys = self._check_ignored_keys(self) + if ignored_keys: + module_logger.warning("%s ignored on pipeline component %r.", ignored_keys, self.name) + component = self._to_dict() + # add source type to component rest object + component["_source"] = self._source + component["jobs"] = self._build_rest_component_jobs() + component["sourceJobId"] = self._source_job_id + if self._intellectual_property: + # hack while full pass through supported is worked on for IPP fields + component.pop("intellectual_property") + component["intellectualProperty"] = self._intellectual_property._to_rest_object().serialize() + properties = ComponentVersionProperties( + component_spec=component, + description=self.description, + is_anonymous=self._is_anonymous, + properties=self.properties, + tags=self.tags, + ) + result = ComponentVersion(properties=properties) + result.name = self.name + return result + + def __str__(self) -> str: + try: + toYaml: str = self._to_yaml() + return toYaml + except BaseException: # pylint: disable=W0718 + toStr: str = super(PipelineComponent, self).__str__() + return toStr diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py new file mode 100644 index 00000000..7da65fb6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py @@ -0,0 +1,211 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import os +from typing import Any, Dict, List, Optional, Union + +from marshmallow import Schema + +from azure.ai.ml._schema.component.spark_component import SparkComponentSchema +from azure.ai.ml.constants._common import COMPONENT_TYPE +from azure.ai.ml.constants._component import NodeType +from azure.ai.ml.constants._job.job import RestSparkConfKey +from azure.ai.ml.entities._assets import Environment +from azure.ai.ml.entities._job.parameterized_spark import ParameterizedSpark + +from ..._schema import PathAwareSchema +from .._job.spark_job_entry_mixin import SparkJobEntry, SparkJobEntryMixin +from .._util import convert_ordered_dict_to_dict, validate_attribute_type +from .._validation import MutableValidationResult +from ._additional_includes import AdditionalIncludesMixin +from .component import Component + + +class SparkComponent( + Component, ParameterizedSpark, SparkJobEntryMixin, AdditionalIncludesMixin +): # pylint: disable=too-many-instance-attributes + """Spark component version, used to define a Spark Component or Job. + + :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing + to a remote location. Defaults to ".", indicating the current directory. + :type code: Union[str, os.PathLike] + :keyword entry: The file or class entry point. + :paramtype entry: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkJobEntry]] + :keyword py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps. Defaults to None. + :paramtype py_files: Optional[List[str]] + :keyword jars: The list of .JAR files to include on the driver and executor classpaths. Defaults to None. + :paramtype jars: Optional[List[str]] + :keyword files: The list of files to be placed in the working directory of each executor. Defaults to None. + :paramtype files: Optional[List[str]] + :keyword archives: The list of archives to be extracted into the working directory of each executor. + Defaults to None. + :paramtype archives: Optional[List[str]] + :keyword driver_cores: The number of cores to use for the driver process, only in cluster mode. + :paramtype driver_cores: Optional[int] + :keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit + suffix ("k", "m", "g" or "t") (e.g. "512m", "2g"). + :paramtype driver_memory: Optional[str] + :keyword executor_cores: The number of cores to use on each executor. + :paramtype executor_cores: Optional[int] + :keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit + suffix ("k", "m", "g" or "t") (e.g. "512m", "2g"). + :paramtype executor_memory: Optional[str] + :keyword executor_instances: The initial number of executors. + :paramtype executor_instances: Optional[int] + :keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of + executors registered with this application up and down based on the workload. Defaults to False. + :paramtype dynamic_allocation_enabled: Optional[bool] + :keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is + enabled. + :paramtype dynamic_allocation_min_executors: Optional[int] + :keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is + enabled. + :paramtype dynamic_allocation_max_executors: Optional[int] + :keyword conf: A dictionary with pre-defined Spark configurations key and values. Defaults to None. + :paramtype conf: Optional[dict[str, str]] + :keyword environment: The Azure ML environment to run the job in. + :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]] + :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None. + :paramtype inputs: Optional[dict[str, Union[ + ~azure.ai.ml.entities._job.pipeline._io.NodeOutput, + ~azure.ai.ml.Input, + str, + bool, + int, + float, + Enum, + ]]] + :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None. + :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]] + :keyword args: The arguments for the job. Defaults to None. + :paramtype args: Optional[str] + :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None. + :paramtype additional_includes: Optional[List[str]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_spark_configurations.py + :start-after: [START spark_component_definition] + :end-before: [END spark_component_definition] + :language: python + :dedent: 8 + :caption: Creating SparkComponent. + """ + + def __init__( + self, + *, + code: Optional[Union[str, os.PathLike]] = ".", + entry: Optional[Union[Dict[str, str], SparkJobEntry]] = None, + py_files: Optional[List[str]] = None, + jars: Optional[List[str]] = None, + files: Optional[List[str]] = None, + archives: Optional[List[str]] = None, + driver_cores: Optional[Union[int, str]] = None, + driver_memory: Optional[str] = None, + executor_cores: Optional[Union[int, str]] = None, + executor_memory: Optional[str] = None, + executor_instances: Optional[Union[int, str]] = None, + dynamic_allocation_enabled: Optional[Union[bool, str]] = None, + dynamic_allocation_min_executors: Optional[Union[int, str]] = None, + dynamic_allocation_max_executors: Optional[Union[int, str]] = None, + conf: Optional[Dict[str, str]] = None, + environment: Optional[Union[str, Environment]] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + args: Optional[str] = None, + additional_includes: Optional[List] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + + kwargs[COMPONENT_TYPE] = NodeType.SPARK + + super().__init__( + inputs=inputs, + outputs=outputs, + **kwargs, + ) + + self.code: Optional[Union[str, os.PathLike]] = code + self.entry = entry + self.py_files = py_files + self.jars = jars + self.files = files + self.archives = archives + self.conf = conf + self.environment = environment + self.args = args + self.additional_includes = additional_includes or [] + # For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf. + # If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent + # verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node + # in pipeline sdk + conf = conf or {} + self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None) + self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None) + self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None) + self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None) + self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None) + self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get( + RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None + ) + self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get( + RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None + ) + self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get( + RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None + ) + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + return SparkComponentSchema(context=context) + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "environment": (str, Environment), + "code": (str, os.PathLike), + } + + def _customized_validate(self) -> MutableValidationResult: + validation_result = super()._customized_validate() + self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result) + return validation_result + + def _to_dict(self) -> Dict: + # TODO: Bug Item number: 2897665 + res: Dict = convert_ordered_dict_to_dict( # type: ignore + {**self._other_parameter, **super(SparkComponent, self)._to_dict()} + ) + return res + + def _to_ordered_dict_for_yaml_dump(self) -> Dict: + """Dump the component content into a sorted yaml string. + + :return: The ordered dict + :rtype: Dict + """ + + obj: dict = super()._to_ordered_dict_for_yaml_dump() + # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value + if self.code and isinstance(self.code, str): + obj["code"] = self.code + return obj + + def _get_environment_id(self) -> Union[str, None]: + # Return environment id of environment + # handle case when environment is defined inline + if isinstance(self.environment, Environment): + res: Optional[str] = self.environment.id + return res + return self.environment + + def __str__(self) -> str: + try: + toYaml: str = self._to_yaml() + return toYaml + except BaseException: # pylint: disable=W0718 + toStr: str = super(SparkComponent, self).__str__() + return toStr |