diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py | 1289 |
1 files changed, 1289 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py new file mode 100644 index 00000000..f9e43f1d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py @@ -0,0 +1,1289 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access,too-many-lines +import time +import collections +import types +from functools import partial +from inspect import Parameter, signature +from os import PathLike +from pathlib import Path +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast +import hashlib + +from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import ( + AzureMachineLearningWorkspaces as ServiceClient102021Dataplane, +) +from azure.ai.ml._restclient.v2024_01_01_preview import ( + AzureMachineLearningWorkspaces as ServiceClient012024, +) +from azure.ai.ml._restclient.v2024_01_01_preview.models import ( + ComponentVersion, + ListViewType, +) +from azure.ai.ml._scope_dependent_operations import ( + OperationConfig, + OperationsContainer, + OperationScope, + _ScopeDependentOperations, +) +from azure.ai.ml._telemetry import ( + ActivityType, + monitor_with_activity, + monitor_with_telemetry_mixin, +) +from azure.ai.ml._utils._asset_utils import ( + _archive_or_restore, + _create_or_update_autoincrement, + _get_file_hash, + _get_latest, + _get_next_version_from_container, + _resolve_label_to_asset, + get_ignore_file, + get_upload_files_from_folder, + IgnoreFile, + delete_two_catalog_files, + create_catalog_files, +) +from azure.ai.ml._utils._azureml_polling import AzureMLPolling +from azure.ai.ml._utils._endpoint_utils import polling_wait +from azure.ai.ml._utils._logger_utils import OpsLogger +from azure.ai.ml._vendor.azure_resources.operations import DeploymentsOperations +from azure.ai.ml.constants._common import ( + DEFAULT_COMPONENT_VERSION, + DEFAULT_LABEL_NAME, + AzureMLResourceType, + DefaultOpenEncoding, + LROConfigurations, +) +from azure.ai.ml.entities import Component, ValidationResult +from azure.ai.ml.exceptions import ( + ComponentException, + ErrorCategory, + ErrorTarget, + ValidationException, +) +from azure.core.exceptions import HttpResponseError, ResourceNotFoundError + +from .._utils._cache_utils import CachedNodeResolver +from .._utils._experimental import experimental +from .._utils.utils import extract_name_and_version, is_data_binding_expression +from ..entities._builders import BaseNode +from ..entities._builders.condition_node import ConditionNode +from ..entities._builders.control_flow_node import LoopNode +from ..entities._component.automl_component import AutoMLComponent +from ..entities._component.code import ComponentCodeMixin +from ..entities._component.pipeline_component import PipelineComponent +from ..entities._job.pipeline._attr_dict import has_attr_safe +from ._code_operations import CodeOperations +from ._environment_operations import EnvironmentOperations +from ._operation_orchestrator import OperationOrchestrator, _AssetResolver +from ._workspace_operations import WorkspaceOperations + +ops_logger = OpsLogger(__name__) +logger, module_logger = ops_logger.package_logger, ops_logger.module_logger + + +class ComponentOperations(_ScopeDependentOperations): + """ComponentOperations. + + You should not instantiate this class directly. Instead, you should + create an MLClient instance that instantiates it for you and + attaches it as an attribute. + + :param operation_scope: The operation scope. + :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope + :param operation_config: The operation configuration. + :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig + :param service_client: The service client for API operations. + :type service_client: Union[ + ~azure.ai.ml._restclient.v2022_10_01.AzureMachineLearningWorkspaces, + ~azure.ai.ml._restclient.v2021_10_01_dataplanepreview.AzureMachineLearningWorkspaces] + :param all_operations: The container for all available operations. + :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer + :param preflight_operation: The preflight operation for deployments. + :type preflight_operation: Optional[~azure.ai.ml._vendor.azure_resources.operations.DeploymentsOperations] + :param kwargs: Additional keyword arguments. + :type kwargs: Dict + """ + + def __init__( + self, + operation_scope: OperationScope, + operation_config: OperationConfig, + service_client: Union[ServiceClient012024, ServiceClient102021Dataplane], + all_operations: OperationsContainer, + preflight_operation: Optional[DeploymentsOperations] = None, + **kwargs: Dict, + ) -> None: + super(ComponentOperations, self).__init__(operation_scope, operation_config) + ops_logger.update_filter() + self._version_operation = service_client.component_versions + self._preflight_operation = preflight_operation + self._container_operation = service_client.component_containers + self._all_operations = all_operations + self._init_args = kwargs + # Maps a label to a function which given an asset name, + # returns the asset associated with the label + self._managed_label_resolver = {"latest": self._get_latest_version} + self._orchestrators = OperationOrchestrator(self._all_operations, self._operation_scope, self._operation_config) + + self._client_key: Optional[str] = None + + @property + def _code_operations(self) -> CodeOperations: + res: CodeOperations = self._all_operations.get_operation( # type: ignore[misc] + AzureMLResourceType.CODE, lambda x: isinstance(x, CodeOperations) + ) + return res + + @property + def _environment_operations(self) -> EnvironmentOperations: + return cast( + EnvironmentOperations, + self._all_operations.get_operation( # type: ignore[misc] + AzureMLResourceType.ENVIRONMENT, + lambda x: isinstance(x, EnvironmentOperations), + ), + ) + + @property + def _workspace_operations(self) -> WorkspaceOperations: + return cast( + WorkspaceOperations, + self._all_operations.get_operation( # type: ignore[misc] + AzureMLResourceType.WORKSPACE, + lambda x: isinstance(x, WorkspaceOperations), + ), + ) + + @property + def _job_operations(self) -> Any: + from ._job_operations import JobOperations + + return self._all_operations.get_operation( # type: ignore[misc] + AzureMLResourceType.JOB, lambda x: isinstance(x, JobOperations) + ) + + @monitor_with_activity(ops_logger, "Component.List", ActivityType.PUBLICAPI) + def list( + self, + name: Union[str, None] = None, + *, + list_view_type: ListViewType = ListViewType.ACTIVE_ONLY, + ) -> Iterable[Component]: + """List specific component or components of the workspace. + + :param name: Component name, if not set, list all components of the workspace + :type name: Optional[str] + :keyword list_view_type: View type for including/excluding (for example) archived components. + Default: ACTIVE_ONLY. + :type list_view_type: Optional[ListViewType] + :return: An iterator like instance of component objects + :rtype: ~azure.core.paging.ItemPaged[Component] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START component_operations_list] + :end-before: [END component_operations_list] + :language: python + :dedent: 8 + :caption: List component example. + """ + + if name: + return cast( + Iterable[Component], + ( + self._version_operation.list( + name=name, + resource_group_name=self._resource_group_name, + registry_name=self._registry_name, + **self._init_args, + cls=lambda objs: [Component._from_rest_object(obj) for obj in objs], + ) + if self._registry_name + else self._version_operation.list( + name=name, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + list_view_type=list_view_type, + **self._init_args, + cls=lambda objs: [Component._from_rest_object(obj) for obj in objs], + ) + ), + ) + return cast( + Iterable[Component], + ( + self._container_operation.list( + resource_group_name=self._resource_group_name, + registry_name=self._registry_name, + **self._init_args, + cls=lambda objs: [Component._from_container_rest_object(obj) for obj in objs], + ) + if self._registry_name + else self._container_operation.list( + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + list_view_type=list_view_type, + **self._init_args, + cls=lambda objs: [Component._from_container_rest_object(obj) for obj in objs], + ) + ), + ) + + @monitor_with_telemetry_mixin(ops_logger, "ComponentVersion.Get", ActivityType.INTERNALCALL) + def _get_component_version(self, name: str, version: Optional[str] = DEFAULT_COMPONENT_VERSION) -> ComponentVersion: + """Returns ComponentVersion information about the specified component name and version. + + :param name: Name of the code component. + :type name: str + :param version: Version of the component. + :type version: Optional[str] + :return: The ComponentVersion object of the specified component name and version. + :rtype: ~azure.ai.ml.entities.ComponentVersion + """ + result = ( + self._version_operation.get( + name=name, + version=version, + resource_group_name=self._resource_group_name, + registry_name=self._registry_name, + **self._init_args, + ) + if self._registry_name + else self._version_operation.get( + name=name, + version=version, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + **self._init_args, + ) + ) + return result + + @monitor_with_telemetry_mixin(ops_logger, "Component.Get", ActivityType.PUBLICAPI) + def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Component: + """Returns information about the specified component. + + :param name: Name of the code component. + :type name: str + :param version: Version of the component. + :type version: Optional[str] + :param label: Label of the component, mutually exclusive with version. + :type label: Optional[str] + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully + identified and retrieved. Details will be provided in the error message. + :return: The specified component object. + :rtype: ~azure.ai.ml.entities.Component + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START component_operations_get] + :end-before: [END component_operations_get] + :language: python + :dedent: 8 + :caption: Get component example. + """ + return self._get(name=name, version=version, label=label) + + def _localize_code(self, component: Component, base_dir: Path) -> None: + if not isinstance(component, ComponentCodeMixin): + return + code = component._get_origin_code_value() + if not isinstance(code, str): + return + # registry code will keep the "azureml:" prefix can be used directly + if code.startswith("azureml://registries"): + return + + target_code_value = "./code" + self._code_operations.download( + **extract_name_and_version(code), + download_path=base_dir.joinpath(target_code_value), + ) + + setattr(component, component._get_code_field_name(), target_code_value) + + def _localize_environment(self, component: Component, base_dir: Path) -> None: + from azure.ai.ml.entities import ParallelComponent + + parent: Any = None + if hasattr(component, "environment"): + parent = component + elif isinstance(component, ParallelComponent): + parent = component.task + else: + return + + # environment can be None + if not isinstance(parent.environment, str): + return + # registry environment will keep the "azureml:" prefix can be used directly + if parent.environment.startswith("azureml://registries"): + return + + environment = self._environment_operations.get(**extract_name_and_version(parent.environment)) + environment._localize(base_path=base_dir.absolute().as_posix()) + parent.environment = environment + + @experimental + @monitor_with_telemetry_mixin(ops_logger, "Component.Download", ActivityType.PUBLICAPI) + def download( + self, + name: str, + download_path: Union[PathLike, str] = ".", + *, + version: Optional[str] = None, + ) -> None: + """Download the specified component and its dependencies to local. Local component can be used to create + the component in another workspace or for offline development. + + :param name: Name of the code component. + :type name: str + :param Union[PathLike, str] download_path: Local path as download destination, + defaults to current working directory of the current user. Will be created if not exists. + :type download_path: str + :keyword version: Version of the component. + :paramtype version: Optional[str] + :raises ~OSError: Raised if download_path is pointing to an existing directory that is not empty. + identified and retrieved. Details will be provided in the error message. + :return: The specified component object. + :rtype: ~azure.ai.ml.entities.Component + """ + download_path = Path(download_path) + component = self._get(name=name, version=version) + self._resolve_azureml_id(component) + + output_dir = Path(download_path) + if output_dir.is_dir(): + # an OSError will be raised if the directory is not empty + output_dir.rmdir() + output_dir.mkdir(parents=True) + # download code + self._localize_code(component, output_dir) + + # download environment + self._localize_environment(component, output_dir) + + component._localize(output_dir.absolute().as_posix()) + (output_dir / "component_spec.yaml").write_text(component._to_yaml(), encoding=DefaultOpenEncoding.WRITE) + + def _get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Component: + if version and label: + msg = "Cannot specify both version and label." + raise ValidationException( + message=msg, + target=ErrorTarget.COMPONENT, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + + if not version and not label: + label = DEFAULT_LABEL_NAME + + if label == DEFAULT_LABEL_NAME: + label = None + version = DEFAULT_COMPONENT_VERSION + + if label: + return _resolve_label_to_asset(self, name, label) + + result = self._get_component_version(name, version) + component = Component._from_rest_object(result) + self._resolve_azureml_id(component, jobs_only=True) + return component + + @experimental + @monitor_with_telemetry_mixin(ops_logger, "Component.Validate", ActivityType.PUBLICAPI) + def validate( + self, + component: Union[Component, types.FunctionType], + raise_on_failure: bool = False, + **kwargs: Any, + ) -> ValidationResult: + """validate a specified component. if there are inline defined + entities, e.g. Environment, Code, they won't be created. + + :param component: The component object or a mldesigner component function that generates component object + :type component: Union[Component, types.FunctionType] + :param raise_on_failure: Whether to raise exception on validation error. Defaults to False + :type raise_on_failure: bool + :return: All validation errors + :rtype: ~azure.ai.ml.entities.ValidationResult + """ + return self._validate( + component, + raise_on_failure=raise_on_failure, + # TODO 2330505: change this to True after remote validation is ready + skip_remote_validation=kwargs.pop("skip_remote_validation", True), + ) + + @monitor_with_telemetry_mixin(ops_logger, "Component.Validate", ActivityType.INTERNALCALL) + def _validate( + self, + component: Union[Component, types.FunctionType], + raise_on_failure: bool, + skip_remote_validation: bool, + ) -> ValidationResult: + """Implementation of validate. Add this function to avoid calling validate() directly in create_or_update(), + which will impact telemetry statistics & bring experimental warning in create_or_update(). + + :param component: The component + :type component: Union[Component, types.FunctionType] + :param raise_on_failure: Whether to raise on failure. + :type raise_on_failure: bool + :param skip_remote_validation: Whether to skip remote validation. + :type skip_remote_validation: bool + :return: The validation result + :rtype: ValidationResult + """ + # Update component when the input is a component function + if isinstance(component, types.FunctionType): + component = _refine_component(component) + + # local validation + result = component._validate(raise_error=raise_on_failure) + # remote validation, note that preflight_operation is not available for registry client + if not skip_remote_validation and self._preflight_operation: + workspace = self._workspace_operations.get() + remote_validation_result = self._preflight_operation.begin_validate( + resource_group_name=self._resource_group_name, + deployment_name=self._workspace_name, + parameters=component._build_rest_object_for_remote_validation( + location=workspace.location, + workspace_name=self._workspace_name, + ), + **self._init_args, + ) + result.merge_with( + # pylint: disable=protected-access + component._build_validation_result_from_rest_object(remote_validation_result.result()), + overwrite=True, + ) + # resolve location for diagnostics from remote validation + result.resolve_location_for_diagnostics(component._source_path) # type: ignore + return component._try_raise( # pylint: disable=protected-access + result, + raise_error=raise_on_failure, + ) + + def _update_flow_rest_object(self, rest_component_resource: Any) -> None: + import re + + from azure.ai.ml._utils._arm_id_utils import AMLVersionedArmId + + component_spec = rest_component_resource.properties.component_spec + code, flow_file_name = AMLVersionedArmId(component_spec["code"]), component_spec.pop("flow_file_name") + # TODO: avoid remote request here if met performance issue + created_code = self._code_operations.get(name=code.asset_name, version=code.asset_version) + # remove port number and append flow file name to get full uri for flow.dag.yaml + component_spec["flow_definition_uri"] = f"{re.sub(r':[0-9]+/', '/', created_code.path)}/{flow_file_name}" + + def _reset_version_if_no_change(self, component: Component, current_name: str, current_version: str) -> Tuple: + """Reset component version to default version if there's no change in the component. + + :param component: The component object + :type component: Component + :param current_name: The component name + :type current_name: str + :param current_version: The component version + :type current_version: str + :return: The new version and rest component resource + :rtype: Tuple[str, ComponentVersion] + """ + rest_component_resource = component._to_rest_object() + + try: + client_component_hash = rest_component_resource.properties.properties.get("client_component_hash") + remote_component_version = self._get_component_version(name=current_name) # will raise error if not found. + remote_component_hash = remote_component_version.properties.properties.get("client_component_hash") + if client_component_hash == remote_component_hash: + component.version = remote_component_version.properties.component_spec.get( + "version" + ) # only update the default version component instead of creating a new version component + logger.warning( + "The component is not modified compared to the default version " + "and the new version component registration is skipped." + ) + return component.version, component._to_rest_object() + except ResourceNotFoundError as e: + logger.info("Failed to get component version, %s", e) + except Exception as e: # pylint: disable=W0718 + logger.error("Failed to compare client_component_hash, %s", e) + + return current_version, rest_component_resource + + def _create_or_update_component_version( + self, + component: Component, + name: str, + version: Optional[str], + rest_component_resource: Any, + ) -> Any: + try: + if self._registry_name: + start_time = time.time() + path_format_arguments = { + "componentName": component.name, + "resourceGroupName": self._resource_group_name, + "registryName": self._registry_name, + } + poller = self._version_operation.begin_create_or_update( + name=name, + version=version, + resource_group_name=self._operation_scope.resource_group_name, + registry_name=self._registry_name, + body=rest_component_resource, + polling=AzureMLPolling( + LROConfigurations.POLL_INTERVAL, + path_format_arguments=path_format_arguments, + ), + ) + message = f"Creating/updating registry component {component.name} with version {component.version} " + polling_wait(poller=poller, start_time=start_time, message=message, timeout=None) + + else: + # _auto_increment_version can be True for non-registry component creation operation; + # and anonymous component should use hash as version + if not component._is_anonymous and component._auto_increment_version: + return _create_or_update_autoincrement( + name=name, + body=rest_component_resource, + version_operation=self._version_operation, + container_operation=self._container_operation, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + **self._init_args, + ) + + return self._version_operation.create_or_update( + name=name, + version=version, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + body=rest_component_resource, + **self._init_args, + ) + except Exception as e: + raise e + + return None + + @monitor_with_telemetry_mixin( + logger, + "Component.CreateOrUpdate", + ActivityType.PUBLICAPI, + extra_keys=["is_anonymous"], + ) + def create_or_update( + self, + component: Component, + version: Optional[str] = None, + *, + skip_validation: bool = False, + **kwargs: Any, + ) -> Component: + """Create or update a specified component. if there're inline defined + entities, e.g. Environment, Code, they'll be created together with the + component. + + :param component: The component object or a mldesigner component function that generates component object + :type component: Union[Component, types.FunctionType] + :param version: The component version to override. + :type version: str + :keyword skip_validation: whether to skip validation before creating/updating the component, defaults to False + :paramtype skip_validation: bool + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully validated. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.AssetException: Raised if Component assets + (e.g. Data, Code, Model, Environment) cannot be successfully validated. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.ComponentException: Raised if Component type is unsupported. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.ModelException: Raised if Component model cannot be successfully validated. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory. + :return: The specified component object. + :rtype: ~azure.ai.ml.entities.Component + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START component_operations_create_or_update] + :end-before: [END component_operations_create_or_update] + :language: python + :dedent: 8 + :caption: Create component example. + """ + # Update component when the input is a component function + if isinstance(component, types.FunctionType): + component = _refine_component(component) + if version is not None: + component.version = version + # In non-registry scenario, if component does not have version, no need to get next version here. + # As Component property version has setter that updates `_auto_increment_version` in-place, then + # a component will get a version after its creation, and it will always use this version in its + # future creation operations, which breaks version auto increment mechanism. + if self._registry_name and not component.version and component._auto_increment_version: + component.version = _get_next_version_from_container( + name=component.name, + container_operation=self._container_operation, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + registry_name=self._registry_name, + **self._init_args, + ) + + if not component._is_anonymous: + component._is_anonymous = kwargs.pop("is_anonymous", False) + + if not skip_validation: + self._validate(component, raise_on_failure=True, skip_remote_validation=True) + + # Create all dependent resources + # Only upload dependencies if component is NOT IPP + if not component._intellectual_property: + self._resolve_arm_id_or_upload_dependencies(component) + + name, version = component._get_rest_name_version() + if not component._is_anonymous and kwargs.get("skip_if_no_change"): + version, rest_component_resource = self._reset_version_if_no_change( + component, + current_name=name, + current_version=str(version), + ) + else: + rest_component_resource = component._to_rest_object() + + # TODO: remove this after server side support directly using client created code + from azure.ai.ml.entities._component.flow import FlowComponent + + if isinstance(component, FlowComponent): + self._update_flow_rest_object(rest_component_resource) + + result = self._create_or_update_component_version( + component, + name, + version, + rest_component_resource, + ) + + if not result: + component = self.get(name=component.name, version=component.version) + else: + component = Component._from_rest_object(result) + + self._resolve_azureml_id( + component=component, + jobs_only=True, + ) + return component + + @experimental + def prepare_for_sign(self, component: Component) -> None: + ignore_file = IgnoreFile() + + if isinstance(component, ComponentCodeMixin): + with component._build_code() as code: + delete_two_catalog_files(code.path) + ignore_file = get_ignore_file(code.path) if code._ignore_file is None else ignore_file + file_list = get_upload_files_from_folder(code.path, ignore_file=ignore_file) + json_stub = {} + json_stub["HashAlgorithm"] = "SHA256" + json_stub["CatalogItems"] = {} # type: ignore + + for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()): + file_hash = _get_file_hash(file_path, hashlib.sha256()).hexdigest().upper() + json_stub["CatalogItems"][file_name] = file_hash # type: ignore + + json_stub["CatalogItems"] = collections.OrderedDict( # type: ignore + sorted(json_stub["CatalogItems"].items()) # type: ignore + ) + create_catalog_files(code.path, json_stub) + + @monitor_with_telemetry_mixin(ops_logger, "Component.Archive", ActivityType.PUBLICAPI) + def archive( + self, + name: str, + version: Optional[str] = None, + label: Optional[str] = None, + # pylint:disable=unused-argument + **kwargs: Any, + ) -> None: + """Archive a component. + + :param name: Name of the component. + :type name: str + :param version: Version of the component. + :type version: str + :param label: Label of the component. (mutually exclusive with version). + :type label: str + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START component_operations_archive] + :end-before: [END component_operations_archive] + :language: python + :dedent: 8 + :caption: Archive component example. + """ + _archive_or_restore( + asset_operations=self, + version_operation=self._version_operation, + container_operation=self._container_operation, + is_archived=True, + name=name, + version=version, + label=label, + ) + + @monitor_with_telemetry_mixin(ops_logger, "Component.Restore", ActivityType.PUBLICAPI) + def restore( + self, + name: str, + version: Optional[str] = None, + label: Optional[str] = None, + # pylint:disable=unused-argument + **kwargs: Any, + ) -> None: + """Restore an archived component. + + :param name: Name of the component. + :type name: str + :param version: Version of the component. + :type version: str + :param label: Label of the component. (mutually exclusive with version). + :type label: str + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START component_operations_restore] + :end-before: [END component_operations_restore] + :language: python + :dedent: 8 + :caption: Restore component example. + """ + _archive_or_restore( + asset_operations=self, + version_operation=self._version_operation, + container_operation=self._container_operation, + is_archived=False, + name=name, + version=version, + label=label, + ) + + def _get_latest_version(self, component_name: str) -> Component: + """Returns the latest version of the asset with the given name. + + Latest is defined as the most recently created, not the most + recently updated. + + :param component_name: The component name + :type component_name: str + :return: A latest version of the named Component + :rtype: Component + """ + + result = ( + _get_latest( + component_name, + self._version_operation, + self._resource_group_name, + workspace_name=None, + registry_name=self._registry_name, + ) + if self._registry_name + else _get_latest( + component_name, + self._version_operation, + self._resource_group_name, + self._workspace_name, + ) + ) + return Component._from_rest_object(result) + + @classmethod + def _try_resolve_environment_for_component( + cls, component: Union[BaseNode, str], _: str, resolver: _AssetResolver + ) -> None: + if isinstance(component, BaseNode): + component = component._component # pylint: disable=protected-access + + if isinstance(component, str): + return + potential_parents: List[BaseNode] = [component] + if hasattr(component, "task"): + potential_parents.append(component.task) + for parent in potential_parents: + # for internal component, environment may be a dict or InternalEnvironment object + # in these two scenarios, we don't need to resolve the environment; + # Note for not directly importing InternalEnvironment and check with `isinstance`: + # import from azure.ai.ml._internal will enable internal component feature for all users, + # therefore, use type().__name__ to avoid import and execute type check + if not hasattr(parent, "environment"): + continue + if isinstance(parent.environment, dict): + continue + if type(parent.environment).__name__ == "InternalEnvironment": + continue + parent.environment = resolver(parent.environment, azureml_type=AzureMLResourceType.ENVIRONMENT) + + def _resolve_azureml_id(self, component: Component, jobs_only: bool = False) -> None: + # TODO: remove the parameter `jobs_only`. Some tests are expecting an arm id after resolving for now. + resolver = self._orchestrators.resolve_azureml_id + self._resolve_dependencies_for_component(component, resolver, jobs_only=jobs_only) + + def _resolve_arm_id_or_upload_dependencies(self, component: Component) -> None: + resolver = OperationOrchestrator( + self._all_operations, self._operation_scope, self._operation_config + ).get_asset_arm_id + + self._resolve_dependencies_for_component(component, resolver) + + def _resolve_dependencies_for_component( + self, + component: Component, + resolver: Callable, + *, + jobs_only: bool = False, + ) -> None: + # for now, many tests are expecting long arm id instead of short id for environment and code + if not jobs_only: + if isinstance(component, AutoMLComponent): + # no extra dependency for automl component + return + + # type check for potential Job type, which is unexpected here. + if not isinstance(component, Component): + msg = f"Non supported component type: {type(component)}" + raise ValidationException( + message=msg, + target=ErrorTarget.COMPONENT, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + + # resolve component's code + _try_resolve_code_for_component(component=component, resolver=resolver) + # resolve component's environment + self._try_resolve_environment_for_component( + component=component, # type: ignore + resolver=resolver, + _="", + ) + + self._resolve_dependencies_for_pipeline_component_jobs( + component, + resolver=resolver, + ) + + def _resolve_inputs_for_pipeline_component_jobs(self, jobs: Dict[str, Any], base_path: str) -> None: + """Resolve inputs for jobs in a pipeline component. + + :param jobs: A dict of nodes in a pipeline component. + :type jobs: Dict[str, Any] + :param base_path: The base path used to resolve inputs. Usually it's the base path of the pipeline component. + :type base_path: str + """ + from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob + + for _, job_instance in jobs.items(): + # resolve inputs for each job's component + if isinstance(job_instance, BaseNode): + node: BaseNode = job_instance + self._job_operations._resolve_job_inputs( + # parameter group input need to be flattened first + self._job_operations._flatten_group_inputs(node.inputs), + base_path, + ) + elif isinstance(job_instance, AutoMLJob): + self._job_operations._resolve_automl_job_inputs(job_instance) + + @classmethod + def _resolve_binding_on_supported_fields_for_node(cls, node: BaseNode) -> None: + """Resolve all PipelineInput(binding from sdk) on supported fields to string. + + :param node: The node + :type node: BaseNode + """ + from azure.ai.ml.entities._job.pipeline._attr_dict import ( + try_get_non_arbitrary_attr, + ) + from azure.ai.ml.entities._job.pipeline._io import PipelineInput + + # compute binding to pipeline input is supported on node. + supported_fields = ["compute", "compute_name"] + for field_name in supported_fields: + val = try_get_non_arbitrary_attr(node, field_name) + if isinstance(val, PipelineInput): + # Put binding string to field + setattr(node, field_name, val._data_binding()) + + @classmethod + def _try_resolve_node_level_task_for_parallel_node(cls, node: BaseNode, _: str, resolver: _AssetResolver) -> None: + """Resolve node.task.code for parallel node if it's a reference to node.component.task.code. + + This is a hack operation. + + parallel_node.task.code won't be resolved directly for now, but it will be resolved if + parallel_node.task is a reference to parallel_node.component.task. Then when filling back + parallel_node.component.task.code, parallel_node.task.code will be changed as well. + + However, if we enable in-memory/on-disk cache for component resolution, such change + won't happen, so we resolve node level task code manually here. + + Note that we will always use resolved node.component.code to fill back node.task.code + given code overwrite on parallel node won't take effect for now. This is to make behaviors + consistent across os and python versions. + + The ideal solution should be done after PRS team decides how to handle parallel.task.code + + :param node: The node + :type node: BaseNode + :param _: The component name + :type _: str + :param resolver: The resolver function + :type resolver: _AssetResolver + """ + from azure.ai.ml.entities import Parallel, ParallelComponent + + if not isinstance(node, Parallel): + return + component = node._component # pylint: disable=protected-access + if not isinstance(component, ParallelComponent): + return + if not node.task: + return + + if node.task.code: + _try_resolve_code_for_component( + component, + resolver=resolver, + ) + node.task.code = component.code + if node.task.environment: + node.task.environment = resolver(component.environment, azureml_type=AzureMLResourceType.ENVIRONMENT) + + @classmethod + def _set_default_display_name_for_anonymous_component_in_node(cls, node: BaseNode, default_name: str) -> None: + """Set default display name for anonymous component in a node. + If node._component is an anonymous component and without display name, set the default display name. + + :param node: The node + :type node: BaseNode + :param default_name: The default name to set + :type default_name: str + """ + if not isinstance(node, BaseNode): + return + component = node._component + if isinstance(component, PipelineComponent): + return + # Set display name as node name + # TODO: the same anonymous component with different node name will have different anonymous hash + # as their display name will be different. + if ( + isinstance(component, Component) + # check if component is anonymous and not created based on its id. We can't directly check + # node._component._is_anonymous as it will be set to True on component creation, + # which is later than this check + and not component.id + and not component.display_name + ): + component.display_name = default_name + + @classmethod + def _try_resolve_compute_for_node(cls, node: BaseNode, _: str, resolver: _AssetResolver) -> None: + """Resolve compute for base node. + + :param node: The node + :type node: BaseNode + :param _: The node name + :type _: str + :param resolver: The resolver function + :type resolver: _AssetResolver + """ + if not isinstance(node, BaseNode): + return + if not isinstance(node._component, PipelineComponent): + # Resolve compute for other type + # Keep data binding expression as they are + if not is_data_binding_expression(node.compute): + # Get compute for each job + node.compute = resolver(node.compute, azureml_type=AzureMLResourceType.COMPUTE) + if has_attr_safe(node, "compute_name") and not is_data_binding_expression(node.compute_name): + node.compute_name = resolver(node.compute_name, azureml_type=AzureMLResourceType.COMPUTE) + + @classmethod + def _divide_nodes_to_resolve_into_layers( + cls, + component: PipelineComponent, + extra_operations: List[Callable[[BaseNode, str], Any]], + ) -> List: + """Traverse the pipeline component and divide nodes to resolve into layers. Note that all leaf nodes will be + put in the last layer. + For example, for below pipeline component, assuming that all nodes need to be resolved: + A + /|\ + B C D + | | + E F + | + G + return value will be: + [ + [("B", B), ("C", C)], + [("E", E)], + [("D", D), ("F", F), ("G", G)], + ] + + :param component: The pipeline component to resolve. + :type component: PipelineComponent + :param extra_operations: Extra operations to apply on nodes during the traversing. + :type extra_operations: List[Callable[Callable[[BaseNode, str], Any]]] + :return: A list of layers of nodes to resolve. + :rtype: List[List[Tuple[str, BaseNode]]] + """ + nodes_to_process = list(component.jobs.items()) + layers: List = [] + leaf_nodes = [] + + while nodes_to_process: + layers.append([]) + new_nodes_to_process = [] + for key, job_instance in nodes_to_process: + cls._resolve_binding_on_supported_fields_for_node(job_instance) + if isinstance(job_instance, LoopNode): + job_instance = job_instance.body + + for extra_operation in extra_operations: + extra_operation(job_instance, key) + + if isinstance(job_instance, BaseNode) and isinstance(job_instance._component, PipelineComponent): + # candidates for next layer + new_nodes_to_process.extend(job_instance.component.jobs.items()) + # use layers to store pipeline nodes in each layer for now + layers[-1].append((key, job_instance)) + else: + # note that LoopNode has already been replaced by its body here + leaf_nodes.append((key, job_instance)) + nodes_to_process = new_nodes_to_process + + # if there is subgraph, the last item in layers will be empty for now as all leaf nodes are stored in leaf_nodes + if len(layers) != 0: + layers.pop() + layers.append(leaf_nodes) + + return layers + + def _get_workspace_key(self) -> str: + try: + workspace_rest = self._workspace_operations._operation.get( + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + ) + return str(workspace_rest.workspace_id) + except HttpResponseError: + return "{}/{}/{}".format(self._subscription_id, self._resource_group_name, self._workspace_name) + + def _get_registry_key(self) -> str: + """Get key for used registry. + + Note that, although registry id is in registry discovery response, it is not in RegistryDiscoveryDto; and we'll + lose the information after deserialization. + To avoid changing related rest client, we simply use registry related information from self to construct + registry key, which means that on-disk cache will be invalid if a registry is deleted and then created + again with the same name. + + :return: The registry key + :rtype: str + """ + return "{}/{}/{}".format(self._subscription_id, self._resource_group_name, self._registry_name) + + def _get_client_key(self) -> str: + """Get key for used client. + Key should be able to uniquely identify used registry or workspace. + + :return: The client key + :rtype: str + """ + # check cache first + if self._client_key: + return self._client_key + + # registry name has a higher priority comparing to workspace name according to current __init__ implementation + # of MLClient + if self._registry_name: + self._client_key = "registry/" + self._get_registry_key() + elif self._workspace_name: + self._client_key = "workspace/" + self._get_workspace_key() + else: + # This should never happen. + raise ValueError("Either workspace name or registry name must be provided to use component operations.") + return self._client_key + + def _resolve_dependencies_for_pipeline_component_jobs( + self, + component: Union[Component, str], + resolver: _AssetResolver, + ) -> None: + """Resolve dependencies for pipeline component jobs. + Will directly return if component is not a pipeline component. + + :param component: The pipeline component to resolve. + :type component: Union[Component, str] + :param resolver: The resolver to resolve the dependencies. + :type resolver: _AssetResolver + """ + if not isinstance(component, PipelineComponent) or not component.jobs: + return + + from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob + + self._resolve_inputs_for_pipeline_component_jobs(component.jobs, component._base_path) + + # This is a preparation for concurrent resolution. Nodes will be resolved later layer by layer + # from bottom to top, as hash calculation of a parent node will be impacted by resolution + # of its child nodes. + layers = self._divide_nodes_to_resolve_into_layers( + component, + extra_operations=[ + # no need to do this as we now keep the original component name for anonymous components + # self._set_default_display_name_for_anonymous_component_in_node, + partial( + self._try_resolve_node_level_task_for_parallel_node, + resolver=resolver, + ), + partial(self._try_resolve_environment_for_component, resolver=resolver), + partial(self._try_resolve_compute_for_node, resolver=resolver), + # should we resolve code here after we do extra operations concurrently? + ], + ) + + # cache anonymous component only for now + # request level in-memory cache can be a better solution for other type of assets as they are + # relatively simple and of small number of distinct instances + component_cache = CachedNodeResolver( + resolver=resolver, + client_key=self._get_client_key(), + ) + + for layer in reversed(layers): + for _, job_instance in layer: + if isinstance(job_instance, AutoMLJob): + # only compute is resolved here + self._job_operations._resolve_arm_id_for_automl_job(job_instance, resolver, inside_pipeline=True) + elif isinstance(job_instance, BaseNode): + component_cache.register_node_for_lazy_resolution(job_instance) + elif isinstance(job_instance, ConditionNode): + pass + else: + msg = f"Non supported job type in Pipeline: {type(job_instance)}" + raise ComponentException( + message=msg, + target=ErrorTarget.COMPONENT, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + + component_cache.resolve_nodes() + + +def _refine_component(component_func: Any) -> Component: + """Return the component of function that is decorated by command + component decorator. + + :param component_func: Function that is decorated by command component decorator + :type component_func: types.FunctionType + :return: Component entity of target function + :rtype: Component + """ + + def check_parameter_type(f: Any) -> None: + """Check all parameter is annotated or has a default value with clear type(not None). + + :param f: The component function + :type f: types.FunctionType + """ + annotations = getattr(f, "__annotations__", {}) + func_parameters = signature(f).parameters + defaults_dict = {key: val.default for key, val in func_parameters.items()} + variable_inputs = [ + key for key, val in func_parameters.items() if val.kind in [val.VAR_POSITIONAL, val.VAR_KEYWORD] + ] + if variable_inputs: + msg = "Cannot register the component {} with variable inputs {!r}." + raise ValidationException( + message=msg.format(f.__name__, variable_inputs), + no_personal_data_message=msg.format("[keys]", "[name]"), + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + ) + unknown_type_keys = [ + key for key, val in defaults_dict.items() if key not in annotations and val is Parameter.empty + ] + if unknown_type_keys: + msg = "Unknown type of parameter {} in pipeline func {!r}, please add type annotation." + raise ValidationException( + message=msg.format(unknown_type_keys, f.__name__), + no_personal_data_message=msg.format("[keys]", "[name]"), + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + ) + + def check_non_pipeline_inputs(f: Any) -> None: + """Check whether non_pipeline_inputs exist in pipeline builder. + + :param f: The component function + :type f: types.FunctionType + """ + if f._pipeline_builder.non_pipeline_parameter_names: + msg = "Cannot register pipeline component {!r} with non_pipeline_inputs." + raise ValidationException( + message=msg.format(f.__name__), + no_personal_data_message=msg.format(""), + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + ) + + if hasattr(component_func, "_is_mldesigner_component") and component_func._is_mldesigner_component: + return component_func.component + if hasattr(component_func, "_is_dsl_func") and component_func._is_dsl_func: + check_non_pipeline_inputs(component_func) + check_parameter_type(component_func) + if component_func._job_settings: + module_logger.warning( + "Job settings %s on pipeline function '%s' are ignored when creating PipelineComponent.", + component_func._job_settings, + component_func.__name__, + ) + # Normally pipeline component are created when dsl.pipeline inputs are provided + # so pipeline input .result() can resolve to correct value. + # When pipeline component created without dsl.pipeline inputs, pipeline input .result() won't work. + return component_func._pipeline_builder.build(user_provided_kwargs={}) + msg = "Function must be a dsl or mldesigner component function: {!r}" + raise ValidationException( + message=msg.format(component_func), + no_personal_data_message=msg.format("component"), + error_category=ErrorCategory.USER_ERROR, + target=ErrorTarget.COMPONENT, + ) + + +def _try_resolve_code_for_component(component: Component, resolver: _AssetResolver) -> None: + if isinstance(component, ComponentCodeMixin): + with component._build_code() as code: + if code is None: + code = component._get_origin_code_value() + if code is None: + return + component._fill_back_code_value(resolver(code, azureml_type=AzureMLResourceType.CODE)) # type: ignore |