aboutsummaryrefslogtreecommitdiff
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access,too-many-lines
import time
import collections
import types
from functools import partial
from inspect import Parameter, signature
from os import PathLike
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast
import hashlib

from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import (
    AzureMachineLearningWorkspaces as ServiceClient102021Dataplane,
)
from azure.ai.ml._restclient.v2024_01_01_preview import (
    AzureMachineLearningWorkspaces as ServiceClient012024,
)
from azure.ai.ml._restclient.v2024_01_01_preview.models import (
    ComponentVersion,
    ListViewType,
)
from azure.ai.ml._scope_dependent_operations import (
    OperationConfig,
    OperationsContainer,
    OperationScope,
    _ScopeDependentOperations,
)
from azure.ai.ml._telemetry import (
    ActivityType,
    monitor_with_activity,
    monitor_with_telemetry_mixin,
)
from azure.ai.ml._utils._asset_utils import (
    _archive_or_restore,
    _create_or_update_autoincrement,
    _get_file_hash,
    _get_latest,
    _get_next_version_from_container,
    _resolve_label_to_asset,
    get_ignore_file,
    get_upload_files_from_folder,
    IgnoreFile,
    delete_two_catalog_files,
    create_catalog_files,
)
from azure.ai.ml._utils._azureml_polling import AzureMLPolling
from azure.ai.ml._utils._endpoint_utils import polling_wait
from azure.ai.ml._utils._logger_utils import OpsLogger
from azure.ai.ml._vendor.azure_resources.operations import DeploymentsOperations
from azure.ai.ml.constants._common import (
    DEFAULT_COMPONENT_VERSION,
    DEFAULT_LABEL_NAME,
    AzureMLResourceType,
    DefaultOpenEncoding,
    LROConfigurations,
)
from azure.ai.ml.entities import Component, ValidationResult
from azure.ai.ml.exceptions import (
    ComponentException,
    ErrorCategory,
    ErrorTarget,
    ValidationException,
)
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError

from .._utils._cache_utils import CachedNodeResolver
from .._utils._experimental import experimental
from .._utils.utils import extract_name_and_version, is_data_binding_expression
from ..entities._builders import BaseNode
from ..entities._builders.condition_node import ConditionNode
from ..entities._builders.control_flow_node import LoopNode
from ..entities._component.automl_component import AutoMLComponent
from ..entities._component.code import ComponentCodeMixin
from ..entities._component.pipeline_component import PipelineComponent
from ..entities._job.pipeline._attr_dict import has_attr_safe
from ._code_operations import CodeOperations
from ._environment_operations import EnvironmentOperations
from ._operation_orchestrator import OperationOrchestrator, _AssetResolver
from ._workspace_operations import WorkspaceOperations

ops_logger = OpsLogger(__name__)
logger, module_logger = ops_logger.package_logger, ops_logger.module_logger


class ComponentOperations(_ScopeDependentOperations):
    """ComponentOperations.

    You should not instantiate this class directly. Instead, you should
    create an MLClient instance that instantiates it for you and
    attaches it as an attribute.

    :param operation_scope: The operation scope.
    :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
    :param operation_config: The operation configuration.
    :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
    :param service_client: The service client for API operations.
    :type service_client: Union[
        ~azure.ai.ml._restclient.v2022_10_01.AzureMachineLearningWorkspaces,
        ~azure.ai.ml._restclient.v2021_10_01_dataplanepreview.AzureMachineLearningWorkspaces]
    :param all_operations: The container for all available operations.
    :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer
    :param preflight_operation: The preflight operation for deployments.
    :type preflight_operation: Optional[~azure.ai.ml._vendor.azure_resources.operations.DeploymentsOperations]
    :param kwargs: Additional keyword arguments.
    :type kwargs: Dict
    """

    def __init__(
        self,
        operation_scope: OperationScope,
        operation_config: OperationConfig,
        service_client: Union[ServiceClient012024, ServiceClient102021Dataplane],
        all_operations: OperationsContainer,
        preflight_operation: Optional[DeploymentsOperations] = None,
        **kwargs: Dict,
    ) -> None:
        super(ComponentOperations, self).__init__(operation_scope, operation_config)
        ops_logger.update_filter()
        self._version_operation = service_client.component_versions
        self._preflight_operation = preflight_operation
        self._container_operation = service_client.component_containers
        self._all_operations = all_operations
        self._init_args = kwargs
        # Maps a label to a function which given an asset name,
        # returns the asset associated with the label
        self._managed_label_resolver = {"latest": self._get_latest_version}
        self._orchestrators = OperationOrchestrator(self._all_operations, self._operation_scope, self._operation_config)

        self._client_key: Optional[str] = None

    @property
    def _code_operations(self) -> CodeOperations:
        res: CodeOperations = self._all_operations.get_operation(  # type: ignore[misc]
            AzureMLResourceType.CODE, lambda x: isinstance(x, CodeOperations)
        )
        return res

    @property
    def _environment_operations(self) -> EnvironmentOperations:
        return cast(
            EnvironmentOperations,
            self._all_operations.get_operation(  # type: ignore[misc]
                AzureMLResourceType.ENVIRONMENT,
                lambda x: isinstance(x, EnvironmentOperations),
            ),
        )

    @property
    def _workspace_operations(self) -> WorkspaceOperations:
        return cast(
            WorkspaceOperations,
            self._all_operations.get_operation(  # type: ignore[misc]
                AzureMLResourceType.WORKSPACE,
                lambda x: isinstance(x, WorkspaceOperations),
            ),
        )

    @property
    def _job_operations(self) -> Any:
        from ._job_operations import JobOperations

        return self._all_operations.get_operation(  # type: ignore[misc]
            AzureMLResourceType.JOB, lambda x: isinstance(x, JobOperations)
        )

    @monitor_with_activity(ops_logger, "Component.List", ActivityType.PUBLICAPI)
    def list(
        self,
        name: Union[str, None] = None,
        *,
        list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
    ) -> Iterable[Component]:
        """List specific component or components of the workspace.

        :param name: Component name, if not set, list all components of the workspace
        :type name: Optional[str]
        :keyword list_view_type: View type for including/excluding (for example) archived components.
            Default: ACTIVE_ONLY.
        :type list_view_type: Optional[ListViewType]
        :return: An iterator like instance of component objects
        :rtype: ~azure.core.paging.ItemPaged[Component]

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START component_operations_list]
                :end-before: [END component_operations_list]
                :language: python
                :dedent: 8
                :caption: List component example.
        """

        if name:
            return cast(
                Iterable[Component],
                (
                    self._version_operation.list(
                        name=name,
                        resource_group_name=self._resource_group_name,
                        registry_name=self._registry_name,
                        **self._init_args,
                        cls=lambda objs: [Component._from_rest_object(obj) for obj in objs],
                    )
                    if self._registry_name
                    else self._version_operation.list(
                        name=name,
                        resource_group_name=self._resource_group_name,
                        workspace_name=self._workspace_name,
                        list_view_type=list_view_type,
                        **self._init_args,
                        cls=lambda objs: [Component._from_rest_object(obj) for obj in objs],
                    )
                ),
            )
        return cast(
            Iterable[Component],
            (
                self._container_operation.list(
                    resource_group_name=self._resource_group_name,
                    registry_name=self._registry_name,
                    **self._init_args,
                    cls=lambda objs: [Component._from_container_rest_object(obj) for obj in objs],
                )
                if self._registry_name
                else self._container_operation.list(
                    resource_group_name=self._resource_group_name,
                    workspace_name=self._workspace_name,
                    list_view_type=list_view_type,
                    **self._init_args,
                    cls=lambda objs: [Component._from_container_rest_object(obj) for obj in objs],
                )
            ),
        )

    @monitor_with_telemetry_mixin(ops_logger, "ComponentVersion.Get", ActivityType.INTERNALCALL)
    def _get_component_version(self, name: str, version: Optional[str] = DEFAULT_COMPONENT_VERSION) -> ComponentVersion:
        """Returns ComponentVersion information about the specified component name and version.

        :param name: Name of the code component.
        :type name: str
        :param version: Version of the component.
        :type version: Optional[str]
        :return: The ComponentVersion object of the specified component name and version.
        :rtype: ~azure.ai.ml.entities.ComponentVersion
        """
        result = (
            self._version_operation.get(
                name=name,
                version=version,
                resource_group_name=self._resource_group_name,
                registry_name=self._registry_name,
                **self._init_args,
            )
            if self._registry_name
            else self._version_operation.get(
                name=name,
                version=version,
                resource_group_name=self._resource_group_name,
                workspace_name=self._workspace_name,
                **self._init_args,
            )
        )
        return result

    @monitor_with_telemetry_mixin(ops_logger, "Component.Get", ActivityType.PUBLICAPI)
    def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Component:
        """Returns information about the specified component.

        :param name: Name of the code component.
        :type name: str
        :param version: Version of the component.
        :type version: Optional[str]
        :param label: Label of the component, mutually exclusive with version.
        :type label: Optional[str]
        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully
            identified and retrieved. Details will be provided in the error message.
        :return: The specified component object.
        :rtype: ~azure.ai.ml.entities.Component

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START component_operations_get]
                :end-before: [END component_operations_get]
                :language: python
                :dedent: 8
                :caption: Get component example.
        """
        return self._get(name=name, version=version, label=label)

    def _localize_code(self, component: Component, base_dir: Path) -> None:
        if not isinstance(component, ComponentCodeMixin):
            return
        code = component._get_origin_code_value()
        if not isinstance(code, str):
            return
        # registry code will keep the "azureml:" prefix can be used directly
        if code.startswith("azureml://registries"):
            return

        target_code_value = "./code"
        self._code_operations.download(
            **extract_name_and_version(code),
            download_path=base_dir.joinpath(target_code_value),
        )

        setattr(component, component._get_code_field_name(), target_code_value)

    def _localize_environment(self, component: Component, base_dir: Path) -> None:
        from azure.ai.ml.entities import ParallelComponent

        parent: Any = None
        if hasattr(component, "environment"):
            parent = component
        elif isinstance(component, ParallelComponent):
            parent = component.task
        else:
            return

        # environment can be None
        if not isinstance(parent.environment, str):
            return
        # registry environment will keep the "azureml:" prefix can be used directly
        if parent.environment.startswith("azureml://registries"):
            return

        environment = self._environment_operations.get(**extract_name_and_version(parent.environment))
        environment._localize(base_path=base_dir.absolute().as_posix())
        parent.environment = environment

    @experimental
    @monitor_with_telemetry_mixin(ops_logger, "Component.Download", ActivityType.PUBLICAPI)
    def download(
        self,
        name: str,
        download_path: Union[PathLike, str] = ".",
        *,
        version: Optional[str] = None,
    ) -> None:
        """Download the specified component and its dependencies to local. Local component can be used to create
        the component in another workspace or for offline development.

        :param name: Name of the code component.
        :type name: str
        :param Union[PathLike, str] download_path: Local path as download destination,
            defaults to current working directory of the current user. Will be created if not exists.
        :type download_path: str
        :keyword version: Version of the component.
        :paramtype version: Optional[str]
        :raises ~OSError: Raised if download_path is pointing to an existing directory that is not empty.
            identified and retrieved. Details will be provided in the error message.
        :return: The specified component object.
        :rtype: ~azure.ai.ml.entities.Component
        """
        download_path = Path(download_path)
        component = self._get(name=name, version=version)
        self._resolve_azureml_id(component)

        output_dir = Path(download_path)
        if output_dir.is_dir():
            # an OSError will be raised if the directory is not empty
            output_dir.rmdir()
        output_dir.mkdir(parents=True)
        # download code
        self._localize_code(component, output_dir)

        # download environment
        self._localize_environment(component, output_dir)

        component._localize(output_dir.absolute().as_posix())
        (output_dir / "component_spec.yaml").write_text(component._to_yaml(), encoding=DefaultOpenEncoding.WRITE)

    def _get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Component:
        if version and label:
            msg = "Cannot specify both version and label."
            raise ValidationException(
                message=msg,
                target=ErrorTarget.COMPONENT,
                no_personal_data_message=msg,
                error_category=ErrorCategory.USER_ERROR,
            )

        if not version and not label:
            label = DEFAULT_LABEL_NAME

        if label == DEFAULT_LABEL_NAME:
            label = None
            version = DEFAULT_COMPONENT_VERSION

        if label:
            return _resolve_label_to_asset(self, name, label)

        result = self._get_component_version(name, version)
        component = Component._from_rest_object(result)
        self._resolve_azureml_id(component, jobs_only=True)
        return component

    @experimental
    @monitor_with_telemetry_mixin(ops_logger, "Component.Validate", ActivityType.PUBLICAPI)
    def validate(
        self,
        component: Union[Component, types.FunctionType],
        raise_on_failure: bool = False,
        **kwargs: Any,
    ) -> ValidationResult:
        """validate a specified component. if there are inline defined
        entities, e.g. Environment, Code, they won't be created.

        :param component: The component object or a mldesigner component function that generates component object
        :type component: Union[Component, types.FunctionType]
        :param raise_on_failure: Whether to raise exception on validation error. Defaults to False
        :type raise_on_failure: bool
        :return: All validation errors
        :rtype: ~azure.ai.ml.entities.ValidationResult
        """
        return self._validate(
            component,
            raise_on_failure=raise_on_failure,
            # TODO 2330505: change this to True after remote validation is ready
            skip_remote_validation=kwargs.pop("skip_remote_validation", True),
        )

    @monitor_with_telemetry_mixin(ops_logger, "Component.Validate", ActivityType.INTERNALCALL)
    def _validate(
        self,
        component: Union[Component, types.FunctionType],
        raise_on_failure: bool,
        skip_remote_validation: bool,
    ) -> ValidationResult:
        """Implementation of validate. Add this function to avoid calling validate() directly in create_or_update(),
        which will impact telemetry statistics & bring experimental warning in create_or_update().

        :param component: The component
        :type component: Union[Component, types.FunctionType]
        :param raise_on_failure: Whether to raise on failure.
        :type raise_on_failure: bool
        :param skip_remote_validation: Whether to skip remote validation.
        :type skip_remote_validation: bool
        :return: The validation result
        :rtype: ValidationResult
        """
        # Update component when the input is a component function
        if isinstance(component, types.FunctionType):
            component = _refine_component(component)

        # local validation
        result = component._validate(raise_error=raise_on_failure)
        # remote validation, note that preflight_operation is not available for registry client
        if not skip_remote_validation and self._preflight_operation:
            workspace = self._workspace_operations.get()
            remote_validation_result = self._preflight_operation.begin_validate(
                resource_group_name=self._resource_group_name,
                deployment_name=self._workspace_name,
                parameters=component._build_rest_object_for_remote_validation(
                    location=workspace.location,
                    workspace_name=self._workspace_name,
                ),
                **self._init_args,
            )
            result.merge_with(
                # pylint: disable=protected-access
                component._build_validation_result_from_rest_object(remote_validation_result.result()),
                overwrite=True,
            )
        # resolve location for diagnostics from remote validation
        result.resolve_location_for_diagnostics(component._source_path)  # type: ignore
        return component._try_raise(  # pylint: disable=protected-access
            result,
            raise_error=raise_on_failure,
        )

    def _update_flow_rest_object(self, rest_component_resource: Any) -> None:
        import re

        from azure.ai.ml._utils._arm_id_utils import AMLVersionedArmId

        component_spec = rest_component_resource.properties.component_spec
        code, flow_file_name = AMLVersionedArmId(component_spec["code"]), component_spec.pop("flow_file_name")
        # TODO: avoid remote request here if met performance issue
        created_code = self._code_operations.get(name=code.asset_name, version=code.asset_version)
        # remove port number and append flow file name to get full uri for flow.dag.yaml
        component_spec["flow_definition_uri"] = f"{re.sub(r':[0-9]+/', '/', created_code.path)}/{flow_file_name}"

    def _reset_version_if_no_change(self, component: Component, current_name: str, current_version: str) -> Tuple:
        """Reset component version to default version if there's no change in the component.

        :param component: The component object
        :type component: Component
        :param current_name: The component name
        :type current_name: str
        :param current_version: The component version
        :type current_version: str
        :return: The new version and rest component resource
        :rtype: Tuple[str, ComponentVersion]
        """
        rest_component_resource = component._to_rest_object()

        try:
            client_component_hash = rest_component_resource.properties.properties.get("client_component_hash")
            remote_component_version = self._get_component_version(name=current_name)  # will raise error if not found.
            remote_component_hash = remote_component_version.properties.properties.get("client_component_hash")
            if client_component_hash == remote_component_hash:
                component.version = remote_component_version.properties.component_spec.get(
                    "version"
                )  # only update the default version component instead of creating a new version component
                logger.warning(
                    "The component is not modified compared to the default version "
                    "and the new version component registration is skipped."
                )
                return component.version, component._to_rest_object()
        except ResourceNotFoundError as e:
            logger.info("Failed to get component version, %s", e)
        except Exception as e:  # pylint: disable=W0718
            logger.error("Failed to compare client_component_hash, %s", e)

        return current_version, rest_component_resource

    def _create_or_update_component_version(
        self,
        component: Component,
        name: str,
        version: Optional[str],
        rest_component_resource: Any,
    ) -> Any:
        try:
            if self._registry_name:
                start_time = time.time()
                path_format_arguments = {
                    "componentName": component.name,
                    "resourceGroupName": self._resource_group_name,
                    "registryName": self._registry_name,
                }
                poller = self._version_operation.begin_create_or_update(
                    name=name,
                    version=version,
                    resource_group_name=self._operation_scope.resource_group_name,
                    registry_name=self._registry_name,
                    body=rest_component_resource,
                    polling=AzureMLPolling(
                        LROConfigurations.POLL_INTERVAL,
                        path_format_arguments=path_format_arguments,
                    ),
                )
                message = f"Creating/updating registry component {component.name} with version {component.version} "
                polling_wait(poller=poller, start_time=start_time, message=message, timeout=None)

            else:
                # _auto_increment_version can be True for non-registry component creation operation;
                # and anonymous component should use hash as version
                if not component._is_anonymous and component._auto_increment_version:
                    return _create_or_update_autoincrement(
                        name=name,
                        body=rest_component_resource,
                        version_operation=self._version_operation,
                        container_operation=self._container_operation,
                        resource_group_name=self._operation_scope.resource_group_name,
                        workspace_name=self._workspace_name,
                        **self._init_args,
                    )

                return self._version_operation.create_or_update(
                    name=name,
                    version=version,
                    resource_group_name=self._resource_group_name,
                    workspace_name=self._workspace_name,
                    body=rest_component_resource,
                    **self._init_args,
                )
        except Exception as e:
            raise e

        return None

    @monitor_with_telemetry_mixin(
        logger,
        "Component.CreateOrUpdate",
        ActivityType.PUBLICAPI,
        extra_keys=["is_anonymous"],
    )
    def create_or_update(
        self,
        component: Component,
        version: Optional[str] = None,
        *,
        skip_validation: bool = False,
        **kwargs: Any,
    ) -> Component:
        """Create or update a specified component. if there're inline defined
        entities, e.g. Environment, Code, they'll be created together with the
        component.

        :param component: The component object or a mldesigner component function that generates component object
        :type component: Union[Component, types.FunctionType]
        :param version: The component version to override.
        :type version: str
        :keyword skip_validation: whether to skip validation before creating/updating the component, defaults to False
        :paramtype skip_validation: bool
        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully validated.
            Details will be provided in the error message.
        :raises ~azure.ai.ml.exceptions.AssetException: Raised if Component assets
            (e.g. Data, Code, Model, Environment) cannot be successfully validated.
            Details will be provided in the error message.
        :raises ~azure.ai.ml.exceptions.ComponentException: Raised if Component type is unsupported.
            Details will be provided in the error message.
        :raises ~azure.ai.ml.exceptions.ModelException: Raised if Component model cannot be successfully validated.
            Details will be provided in the error message.
        :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory.
        :return: The specified component object.
        :rtype: ~azure.ai.ml.entities.Component

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START component_operations_create_or_update]
                :end-before: [END component_operations_create_or_update]
                :language: python
                :dedent: 8
                :caption: Create component example.
        """
        # Update component when the input is a component function
        if isinstance(component, types.FunctionType):
            component = _refine_component(component)
        if version is not None:
            component.version = version
        # In non-registry scenario, if component does not have version, no need to get next version here.
        # As Component property version has setter that updates `_auto_increment_version` in-place, then
        # a component will get a version after its creation, and it will always use this version in its
        # future creation operations, which breaks version auto increment mechanism.
        if self._registry_name and not component.version and component._auto_increment_version:
            component.version = _get_next_version_from_container(
                name=component.name,
                container_operation=self._container_operation,
                resource_group_name=self._operation_scope.resource_group_name,
                workspace_name=self._workspace_name,
                registry_name=self._registry_name,
                **self._init_args,
            )

        if not component._is_anonymous:
            component._is_anonymous = kwargs.pop("is_anonymous", False)

        if not skip_validation:
            self._validate(component, raise_on_failure=True, skip_remote_validation=True)

        # Create all dependent resources
        # Only upload dependencies if component is NOT IPP
        if not component._intellectual_property:
            self._resolve_arm_id_or_upload_dependencies(component)

        name, version = component._get_rest_name_version()
        if not component._is_anonymous and kwargs.get("skip_if_no_change"):
            version, rest_component_resource = self._reset_version_if_no_change(
                component,
                current_name=name,
                current_version=str(version),
            )
        else:
            rest_component_resource = component._to_rest_object()

        # TODO: remove this after server side support directly using client created code
        from azure.ai.ml.entities._component.flow import FlowComponent

        if isinstance(component, FlowComponent):
            self._update_flow_rest_object(rest_component_resource)

        result = self._create_or_update_component_version(
            component,
            name,
            version,
            rest_component_resource,
        )

        if not result:
            component = self.get(name=component.name, version=component.version)
        else:
            component = Component._from_rest_object(result)

        self._resolve_azureml_id(
            component=component,
            jobs_only=True,
        )
        return component

    @experimental
    def prepare_for_sign(self, component: Component) -> None:
        ignore_file = IgnoreFile()

        if isinstance(component, ComponentCodeMixin):
            with component._build_code() as code:
                delete_two_catalog_files(code.path)
                ignore_file = get_ignore_file(code.path) if code._ignore_file is None else ignore_file
                file_list = get_upload_files_from_folder(code.path, ignore_file=ignore_file)
                json_stub = {}
                json_stub["HashAlgorithm"] = "SHA256"
                json_stub["CatalogItems"] = {}  # type: ignore

                for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
                    file_hash = _get_file_hash(file_path, hashlib.sha256()).hexdigest().upper()
                    json_stub["CatalogItems"][file_name] = file_hash  # type: ignore

                json_stub["CatalogItems"] = collections.OrderedDict(  # type: ignore
                    sorted(json_stub["CatalogItems"].items())  # type: ignore
                )
                create_catalog_files(code.path, json_stub)

    @monitor_with_telemetry_mixin(ops_logger, "Component.Archive", ActivityType.PUBLICAPI)
    def archive(
        self,
        name: str,
        version: Optional[str] = None,
        label: Optional[str] = None,
        # pylint:disable=unused-argument
        **kwargs: Any,
    ) -> None:
        """Archive a component.

        :param name: Name of the component.
        :type name: str
        :param version: Version of the component.
        :type version: str
        :param label: Label of the component. (mutually exclusive with version).
        :type label: str

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START component_operations_archive]
                :end-before: [END component_operations_archive]
                :language: python
                :dedent: 8
                :caption: Archive component example.
        """
        _archive_or_restore(
            asset_operations=self,
            version_operation=self._version_operation,
            container_operation=self._container_operation,
            is_archived=True,
            name=name,
            version=version,
            label=label,
        )

    @monitor_with_telemetry_mixin(ops_logger, "Component.Restore", ActivityType.PUBLICAPI)
    def restore(
        self,
        name: str,
        version: Optional[str] = None,
        label: Optional[str] = None,
        # pylint:disable=unused-argument
        **kwargs: Any,
    ) -> None:
        """Restore an archived component.

        :param name: Name of the component.
        :type name: str
        :param version: Version of the component.
        :type version: str
        :param label: Label of the component. (mutually exclusive with version).
        :type label: str

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START component_operations_restore]
                :end-before: [END component_operations_restore]
                :language: python
                :dedent: 8
                :caption: Restore component example.
        """
        _archive_or_restore(
            asset_operations=self,
            version_operation=self._version_operation,
            container_operation=self._container_operation,
            is_archived=False,
            name=name,
            version=version,
            label=label,
        )

    def _get_latest_version(self, component_name: str) -> Component:
        """Returns the latest version of the asset with the given name.

        Latest is defined as the most recently created, not the most
        recently updated.

        :param component_name: The component name
        :type component_name: str
        :return: A latest version of the named Component
        :rtype: Component
        """

        result = (
            _get_latest(
                component_name,
                self._version_operation,
                self._resource_group_name,
                workspace_name=None,
                registry_name=self._registry_name,
            )
            if self._registry_name
            else _get_latest(
                component_name,
                self._version_operation,
                self._resource_group_name,
                self._workspace_name,
            )
        )
        return Component._from_rest_object(result)

    @classmethod
    def _try_resolve_environment_for_component(
        cls, component: Union[BaseNode, str], _: str, resolver: _AssetResolver
    ) -> None:
        if isinstance(component, BaseNode):
            component = component._component  # pylint: disable=protected-access

        if isinstance(component, str):
            return
        potential_parents: List[BaseNode] = [component]
        if hasattr(component, "task"):
            potential_parents.append(component.task)
        for parent in potential_parents:
            # for internal component, environment may be a dict or InternalEnvironment object
            # in these two scenarios, we don't need to resolve the environment;
            # Note for not directly importing InternalEnvironment and check with `isinstance`:
            #   import from azure.ai.ml._internal will enable internal component feature for all users,
            #   therefore, use type().__name__ to avoid import and execute type check
            if not hasattr(parent, "environment"):
                continue
            if isinstance(parent.environment, dict):
                continue
            if type(parent.environment).__name__ == "InternalEnvironment":
                continue
            parent.environment = resolver(parent.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)

    def _resolve_azureml_id(self, component: Component, jobs_only: bool = False) -> None:
        # TODO: remove the parameter `jobs_only`. Some tests are expecting an arm id after resolving for now.
        resolver = self._orchestrators.resolve_azureml_id
        self._resolve_dependencies_for_component(component, resolver, jobs_only=jobs_only)

    def _resolve_arm_id_or_upload_dependencies(self, component: Component) -> None:
        resolver = OperationOrchestrator(
            self._all_operations, self._operation_scope, self._operation_config
        ).get_asset_arm_id

        self._resolve_dependencies_for_component(component, resolver)

    def _resolve_dependencies_for_component(
        self,
        component: Component,
        resolver: Callable,
        *,
        jobs_only: bool = False,
    ) -> None:
        # for now, many tests are expecting long arm id instead of short id for environment and code
        if not jobs_only:
            if isinstance(component, AutoMLComponent):
                # no extra dependency for automl component
                return

            # type check for potential Job type, which is unexpected here.
            if not isinstance(component, Component):
                msg = f"Non supported component type: {type(component)}"
                raise ValidationException(
                    message=msg,
                    target=ErrorTarget.COMPONENT,
                    no_personal_data_message=msg,
                    error_category=ErrorCategory.USER_ERROR,
                )

            # resolve component's code
            _try_resolve_code_for_component(component=component, resolver=resolver)
            # resolve component's environment
            self._try_resolve_environment_for_component(
                component=component,  # type: ignore
                resolver=resolver,
                _="",
            )

        self._resolve_dependencies_for_pipeline_component_jobs(
            component,
            resolver=resolver,
        )

    def _resolve_inputs_for_pipeline_component_jobs(self, jobs: Dict[str, Any], base_path: str) -> None:
        """Resolve inputs for jobs in a pipeline component.

        :param jobs: A dict of nodes in a pipeline component.
        :type jobs: Dict[str, Any]
        :param base_path: The base path used to resolve inputs. Usually it's the base path of the pipeline component.
        :type base_path: str
        """
        from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob

        for _, job_instance in jobs.items():
            # resolve inputs for each job's component
            if isinstance(job_instance, BaseNode):
                node: BaseNode = job_instance
                self._job_operations._resolve_job_inputs(
                    # parameter group input need to be flattened first
                    self._job_operations._flatten_group_inputs(node.inputs),
                    base_path,
                )
            elif isinstance(job_instance, AutoMLJob):
                self._job_operations._resolve_automl_job_inputs(job_instance)

    @classmethod
    def _resolve_binding_on_supported_fields_for_node(cls, node: BaseNode) -> None:
        """Resolve all PipelineInput(binding from sdk) on supported fields to string.

        :param node: The node
        :type node: BaseNode
        """
        from azure.ai.ml.entities._job.pipeline._attr_dict import (
            try_get_non_arbitrary_attr,
        )
        from azure.ai.ml.entities._job.pipeline._io import PipelineInput

        # compute binding to pipeline input is supported on node.
        supported_fields = ["compute", "compute_name"]
        for field_name in supported_fields:
            val = try_get_non_arbitrary_attr(node, field_name)
            if isinstance(val, PipelineInput):
                # Put binding string to field
                setattr(node, field_name, val._data_binding())

    @classmethod
    def _try_resolve_node_level_task_for_parallel_node(cls, node: BaseNode, _: str, resolver: _AssetResolver) -> None:
        """Resolve node.task.code for parallel node if it's a reference to node.component.task.code.

        This is a hack operation.

        parallel_node.task.code won't be resolved directly for now, but it will be resolved if
        parallel_node.task is a reference to parallel_node.component.task. Then when filling back
        parallel_node.component.task.code, parallel_node.task.code will be changed as well.

        However, if we enable in-memory/on-disk cache for component resolution, such change
        won't happen, so we resolve node level task code manually here.

        Note that we will always use resolved node.component.code to fill back node.task.code
        given code overwrite on parallel node won't take effect for now. This is to make behaviors
        consistent across os and python versions.

        The ideal solution should be done after PRS team decides how to handle parallel.task.code

        :param node: The node
        :type node: BaseNode
        :param _: The component name
        :type _: str
        :param resolver: The resolver function
        :type resolver: _AssetResolver
        """
        from azure.ai.ml.entities import Parallel, ParallelComponent

        if not isinstance(node, Parallel):
            return
        component = node._component  # pylint: disable=protected-access
        if not isinstance(component, ParallelComponent):
            return
        if not node.task:
            return

        if node.task.code:
            _try_resolve_code_for_component(
                component,
                resolver=resolver,
            )
            node.task.code = component.code
        if node.task.environment:
            node.task.environment = resolver(component.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)

    @classmethod
    def _set_default_display_name_for_anonymous_component_in_node(cls, node: BaseNode, default_name: str) -> None:
        """Set default display name for anonymous component in a node.
        If node._component is an anonymous component and without display name, set the default display name.

        :param node: The node
        :type node: BaseNode
        :param default_name: The default name to set
        :type default_name: str
        """
        if not isinstance(node, BaseNode):
            return
        component = node._component
        if isinstance(component, PipelineComponent):
            return
        # Set display name as node name
        # TODO: the same anonymous component with different node name will have different anonymous hash
        # as their display name will be different.
        if (
            isinstance(component, Component)
            # check if component is anonymous and not created based on its id. We can't directly check
            # node._component._is_anonymous as it will be set to True on component creation,
            # which is later than this check
            and not component.id
            and not component.display_name
        ):
            component.display_name = default_name

    @classmethod
    def _try_resolve_compute_for_node(cls, node: BaseNode, _: str, resolver: _AssetResolver) -> None:
        """Resolve compute for base node.

        :param node: The node
        :type node: BaseNode
        :param _: The node name
        :type _: str
        :param resolver: The resolver function
        :type resolver: _AssetResolver
        """
        if not isinstance(node, BaseNode):
            return
        if not isinstance(node._component, PipelineComponent):
            # Resolve compute for other type
            # Keep data binding expression as they are
            if not is_data_binding_expression(node.compute):
                # Get compute for each job
                node.compute = resolver(node.compute, azureml_type=AzureMLResourceType.COMPUTE)
            if has_attr_safe(node, "compute_name") and not is_data_binding_expression(node.compute_name):
                node.compute_name = resolver(node.compute_name, azureml_type=AzureMLResourceType.COMPUTE)

    @classmethod
    def _divide_nodes_to_resolve_into_layers(
        cls,
        component: PipelineComponent,
        extra_operations: List[Callable[[BaseNode, str], Any]],
    ) -> List:
        """Traverse the pipeline component and divide nodes to resolve into layers. Note that all leaf nodes will be
        put in the last layer.
        For example, for below pipeline component, assuming that all nodes need to be resolved:
          A
         /|\
        B C D
        | |
        E F
        |
        G
        return value will be:
        [
          [("B", B), ("C", C)],
          [("E", E)],
          [("D", D), ("F", F), ("G", G)],
        ]

        :param component: The pipeline component to resolve.
        :type component: PipelineComponent
        :param extra_operations: Extra operations to apply on nodes during the traversing.
        :type extra_operations: List[Callable[Callable[[BaseNode, str], Any]]]
        :return: A list of layers of nodes to resolve.
        :rtype: List[List[Tuple[str, BaseNode]]]
        """
        nodes_to_process = list(component.jobs.items())
        layers: List = []
        leaf_nodes = []

        while nodes_to_process:
            layers.append([])
            new_nodes_to_process = []
            for key, job_instance in nodes_to_process:
                cls._resolve_binding_on_supported_fields_for_node(job_instance)
                if isinstance(job_instance, LoopNode):
                    job_instance = job_instance.body

                for extra_operation in extra_operations:
                    extra_operation(job_instance, key)

                if isinstance(job_instance, BaseNode) and isinstance(job_instance._component, PipelineComponent):
                    # candidates for next layer
                    new_nodes_to_process.extend(job_instance.component.jobs.items())
                    # use layers to store pipeline nodes in each layer for now
                    layers[-1].append((key, job_instance))
                else:
                    # note that LoopNode has already been replaced by its body here
                    leaf_nodes.append((key, job_instance))
            nodes_to_process = new_nodes_to_process

        # if there is subgraph, the last item in layers will be empty for now as all leaf nodes are stored in leaf_nodes
        if len(layers) != 0:
            layers.pop()
            layers.append(leaf_nodes)

        return layers

    def _get_workspace_key(self) -> str:
        try:
            workspace_rest = self._workspace_operations._operation.get(
                resource_group_name=self._resource_group_name,
                workspace_name=self._workspace_name,
            )
            return str(workspace_rest.workspace_id)
        except HttpResponseError:
            return "{}/{}/{}".format(self._subscription_id, self._resource_group_name, self._workspace_name)

    def _get_registry_key(self) -> str:
        """Get key for used registry.

        Note that, although registry id is in registry discovery response, it is not in RegistryDiscoveryDto; and we'll
        lose the information after deserialization.
        To avoid changing related rest client, we simply use registry related information from self to construct
        registry key, which means that on-disk cache will be invalid if a registry is deleted and then created
        again with the same name.

        :return: The registry key
        :rtype: str
        """
        return "{}/{}/{}".format(self._subscription_id, self._resource_group_name, self._registry_name)

    def _get_client_key(self) -> str:
        """Get key for used client.
        Key should be able to uniquely identify used registry or workspace.

        :return: The client key
        :rtype: str
        """
        # check cache first
        if self._client_key:
            return self._client_key

        # registry name has a higher priority comparing to workspace name according to current __init__ implementation
        # of MLClient
        if self._registry_name:
            self._client_key = "registry/" + self._get_registry_key()
        elif self._workspace_name:
            self._client_key = "workspace/" + self._get_workspace_key()
        else:
            # This should never happen.
            raise ValueError("Either workspace name or registry name must be provided to use component operations.")
        return self._client_key

    def _resolve_dependencies_for_pipeline_component_jobs(
        self,
        component: Union[Component, str],
        resolver: _AssetResolver,
    ) -> None:
        """Resolve dependencies for pipeline component jobs.
        Will directly return if component is not a pipeline component.

        :param component: The pipeline component to resolve.
        :type component: Union[Component, str]
        :param resolver: The resolver to resolve the dependencies.
        :type resolver: _AssetResolver
        """
        if not isinstance(component, PipelineComponent) or not component.jobs:
            return

        from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob

        self._resolve_inputs_for_pipeline_component_jobs(component.jobs, component._base_path)

        # This is a preparation for concurrent resolution. Nodes will be resolved later layer by layer
        # from bottom to top, as hash calculation of a parent node will be impacted by resolution
        # of its child nodes.
        layers = self._divide_nodes_to_resolve_into_layers(
            component,
            extra_operations=[
                # no need to do this as we now keep the original component name for anonymous components
                # self._set_default_display_name_for_anonymous_component_in_node,
                partial(
                    self._try_resolve_node_level_task_for_parallel_node,
                    resolver=resolver,
                ),
                partial(self._try_resolve_environment_for_component, resolver=resolver),
                partial(self._try_resolve_compute_for_node, resolver=resolver),
                # should we resolve code here after we do extra operations concurrently?
            ],
        )

        # cache anonymous component only for now
        # request level in-memory cache can be a better solution for other type of assets as they are
        # relatively simple and of small number of distinct instances
        component_cache = CachedNodeResolver(
            resolver=resolver,
            client_key=self._get_client_key(),
        )

        for layer in reversed(layers):
            for _, job_instance in layer:
                if isinstance(job_instance, AutoMLJob):
                    # only compute is resolved here
                    self._job_operations._resolve_arm_id_for_automl_job(job_instance, resolver, inside_pipeline=True)
                elif isinstance(job_instance, BaseNode):
                    component_cache.register_node_for_lazy_resolution(job_instance)
                elif isinstance(job_instance, ConditionNode):
                    pass
                else:
                    msg = f"Non supported job type in Pipeline: {type(job_instance)}"
                    raise ComponentException(
                        message=msg,
                        target=ErrorTarget.COMPONENT,
                        no_personal_data_message=msg,
                        error_category=ErrorCategory.USER_ERROR,
                    )

            component_cache.resolve_nodes()


def _refine_component(component_func: Any) -> Component:
    """Return the component of function that is decorated by command
    component decorator.

    :param component_func: Function that is decorated by command component decorator
    :type component_func: types.FunctionType
    :return: Component entity of target function
    :rtype: Component
    """

    def check_parameter_type(f: Any) -> None:
        """Check all parameter is annotated or has a default value with clear type(not None).

        :param f: The component function
        :type f: types.FunctionType
        """
        annotations = getattr(f, "__annotations__", {})
        func_parameters = signature(f).parameters
        defaults_dict = {key: val.default for key, val in func_parameters.items()}
        variable_inputs = [
            key for key, val in func_parameters.items() if val.kind in [val.VAR_POSITIONAL, val.VAR_KEYWORD]
        ]
        if variable_inputs:
            msg = "Cannot register the component {} with variable inputs {!r}."
            raise ValidationException(
                message=msg.format(f.__name__, variable_inputs),
                no_personal_data_message=msg.format("[keys]", "[name]"),
                target=ErrorTarget.COMPONENT,
                error_category=ErrorCategory.USER_ERROR,
            )
        unknown_type_keys = [
            key for key, val in defaults_dict.items() if key not in annotations and val is Parameter.empty
        ]
        if unknown_type_keys:
            msg = "Unknown type of parameter {} in pipeline func {!r}, please add type annotation."
            raise ValidationException(
                message=msg.format(unknown_type_keys, f.__name__),
                no_personal_data_message=msg.format("[keys]", "[name]"),
                target=ErrorTarget.COMPONENT,
                error_category=ErrorCategory.USER_ERROR,
            )

    def check_non_pipeline_inputs(f: Any) -> None:
        """Check whether non_pipeline_inputs exist in pipeline builder.

        :param f: The component function
        :type f: types.FunctionType
        """
        if f._pipeline_builder.non_pipeline_parameter_names:
            msg = "Cannot register pipeline component {!r} with non_pipeline_inputs."
            raise ValidationException(
                message=msg.format(f.__name__),
                no_personal_data_message=msg.format(""),
                target=ErrorTarget.COMPONENT,
                error_category=ErrorCategory.USER_ERROR,
            )

    if hasattr(component_func, "_is_mldesigner_component") and component_func._is_mldesigner_component:
        return component_func.component
    if hasattr(component_func, "_is_dsl_func") and component_func._is_dsl_func:
        check_non_pipeline_inputs(component_func)
        check_parameter_type(component_func)
        if component_func._job_settings:
            module_logger.warning(
                "Job settings %s on pipeline function '%s' are ignored when creating PipelineComponent.",
                component_func._job_settings,
                component_func.__name__,
            )
        # Normally pipeline component are created when dsl.pipeline inputs are provided
        # so pipeline input .result() can resolve to correct value.
        # When pipeline component created without dsl.pipeline inputs, pipeline input .result() won't work.
        return component_func._pipeline_builder.build(user_provided_kwargs={})
    msg = "Function must be a dsl or mldesigner component function: {!r}"
    raise ValidationException(
        message=msg.format(component_func),
        no_personal_data_message=msg.format("component"),
        error_category=ErrorCategory.USER_ERROR,
        target=ErrorTarget.COMPONENT,
    )


def _try_resolve_code_for_component(component: Component, resolver: _AssetResolver) -> None:
    if isinstance(component, ComponentCodeMixin):
        with component._build_code() as code:
            if code is None:
                code = component._get_origin_code_value()
            if code is None:
                return
            component._fill_back_code_value(resolver(code, azureml_type=AzureMLResourceType.CODE))  # type: ignore