aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py1289
1 files changed, 1289 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py
new file mode 100644
index 00000000..f9e43f1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_component_operations.py
@@ -0,0 +1,1289 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access,too-many-lines
+import time
+import collections
+import types
+from functools import partial
+from inspect import Parameter, signature
+from os import PathLike
+from pathlib import Path
+from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast
+import hashlib
+
+from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import (
+ AzureMachineLearningWorkspaces as ServiceClient102021Dataplane,
+)
+from azure.ai.ml._restclient.v2024_01_01_preview import (
+ AzureMachineLearningWorkspaces as ServiceClient012024,
+)
+from azure.ai.ml._restclient.v2024_01_01_preview.models import (
+ ComponentVersion,
+ ListViewType,
+)
+from azure.ai.ml._scope_dependent_operations import (
+ OperationConfig,
+ OperationsContainer,
+ OperationScope,
+ _ScopeDependentOperations,
+)
+from azure.ai.ml._telemetry import (
+ ActivityType,
+ monitor_with_activity,
+ monitor_with_telemetry_mixin,
+)
+from azure.ai.ml._utils._asset_utils import (
+ _archive_or_restore,
+ _create_or_update_autoincrement,
+ _get_file_hash,
+ _get_latest,
+ _get_next_version_from_container,
+ _resolve_label_to_asset,
+ get_ignore_file,
+ get_upload_files_from_folder,
+ IgnoreFile,
+ delete_two_catalog_files,
+ create_catalog_files,
+)
+from azure.ai.ml._utils._azureml_polling import AzureMLPolling
+from azure.ai.ml._utils._endpoint_utils import polling_wait
+from azure.ai.ml._utils._logger_utils import OpsLogger
+from azure.ai.ml._vendor.azure_resources.operations import DeploymentsOperations
+from azure.ai.ml.constants._common import (
+ DEFAULT_COMPONENT_VERSION,
+ DEFAULT_LABEL_NAME,
+ AzureMLResourceType,
+ DefaultOpenEncoding,
+ LROConfigurations,
+)
+from azure.ai.ml.entities import Component, ValidationResult
+from azure.ai.ml.exceptions import (
+ ComponentException,
+ ErrorCategory,
+ ErrorTarget,
+ ValidationException,
+)
+from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
+
+from .._utils._cache_utils import CachedNodeResolver
+from .._utils._experimental import experimental
+from .._utils.utils import extract_name_and_version, is_data_binding_expression
+from ..entities._builders import BaseNode
+from ..entities._builders.condition_node import ConditionNode
+from ..entities._builders.control_flow_node import LoopNode
+from ..entities._component.automl_component import AutoMLComponent
+from ..entities._component.code import ComponentCodeMixin
+from ..entities._component.pipeline_component import PipelineComponent
+from ..entities._job.pipeline._attr_dict import has_attr_safe
+from ._code_operations import CodeOperations
+from ._environment_operations import EnvironmentOperations
+from ._operation_orchestrator import OperationOrchestrator, _AssetResolver
+from ._workspace_operations import WorkspaceOperations
+
+ops_logger = OpsLogger(__name__)
+logger, module_logger = ops_logger.package_logger, ops_logger.module_logger
+
+
+class ComponentOperations(_ScopeDependentOperations):
+ """ComponentOperations.
+
+ You should not instantiate this class directly. Instead, you should
+ create an MLClient instance that instantiates it for you and
+ attaches it as an attribute.
+
+ :param operation_scope: The operation scope.
+ :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
+ :param operation_config: The operation configuration.
+ :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
+ :param service_client: The service client for API operations.
+ :type service_client: Union[
+ ~azure.ai.ml._restclient.v2022_10_01.AzureMachineLearningWorkspaces,
+ ~azure.ai.ml._restclient.v2021_10_01_dataplanepreview.AzureMachineLearningWorkspaces]
+ :param all_operations: The container for all available operations.
+ :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer
+ :param preflight_operation: The preflight operation for deployments.
+ :type preflight_operation: Optional[~azure.ai.ml._vendor.azure_resources.operations.DeploymentsOperations]
+ :param kwargs: Additional keyword arguments.
+ :type kwargs: Dict
+ """
+
+ def __init__(
+ self,
+ operation_scope: OperationScope,
+ operation_config: OperationConfig,
+ service_client: Union[ServiceClient012024, ServiceClient102021Dataplane],
+ all_operations: OperationsContainer,
+ preflight_operation: Optional[DeploymentsOperations] = None,
+ **kwargs: Dict,
+ ) -> None:
+ super(ComponentOperations, self).__init__(operation_scope, operation_config)
+ ops_logger.update_filter()
+ self._version_operation = service_client.component_versions
+ self._preflight_operation = preflight_operation
+ self._container_operation = service_client.component_containers
+ self._all_operations = all_operations
+ self._init_args = kwargs
+ # Maps a label to a function which given an asset name,
+ # returns the asset associated with the label
+ self._managed_label_resolver = {"latest": self._get_latest_version}
+ self._orchestrators = OperationOrchestrator(self._all_operations, self._operation_scope, self._operation_config)
+
+ self._client_key: Optional[str] = None
+
+ @property
+ def _code_operations(self) -> CodeOperations:
+ res: CodeOperations = self._all_operations.get_operation( # type: ignore[misc]
+ AzureMLResourceType.CODE, lambda x: isinstance(x, CodeOperations)
+ )
+ return res
+
+ @property
+ def _environment_operations(self) -> EnvironmentOperations:
+ return cast(
+ EnvironmentOperations,
+ self._all_operations.get_operation( # type: ignore[misc]
+ AzureMLResourceType.ENVIRONMENT,
+ lambda x: isinstance(x, EnvironmentOperations),
+ ),
+ )
+
+ @property
+ def _workspace_operations(self) -> WorkspaceOperations:
+ return cast(
+ WorkspaceOperations,
+ self._all_operations.get_operation( # type: ignore[misc]
+ AzureMLResourceType.WORKSPACE,
+ lambda x: isinstance(x, WorkspaceOperations),
+ ),
+ )
+
+ @property
+ def _job_operations(self) -> Any:
+ from ._job_operations import JobOperations
+
+ return self._all_operations.get_operation( # type: ignore[misc]
+ AzureMLResourceType.JOB, lambda x: isinstance(x, JobOperations)
+ )
+
+ @monitor_with_activity(ops_logger, "Component.List", ActivityType.PUBLICAPI)
+ def list(
+ self,
+ name: Union[str, None] = None,
+ *,
+ list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
+ ) -> Iterable[Component]:
+ """List specific component or components of the workspace.
+
+ :param name: Component name, if not set, list all components of the workspace
+ :type name: Optional[str]
+ :keyword list_view_type: View type for including/excluding (for example) archived components.
+ Default: ACTIVE_ONLY.
+ :type list_view_type: Optional[ListViewType]
+ :return: An iterator like instance of component objects
+ :rtype: ~azure.core.paging.ItemPaged[Component]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START component_operations_list]
+ :end-before: [END component_operations_list]
+ :language: python
+ :dedent: 8
+ :caption: List component example.
+ """
+
+ if name:
+ return cast(
+ Iterable[Component],
+ (
+ self._version_operation.list(
+ name=name,
+ resource_group_name=self._resource_group_name,
+ registry_name=self._registry_name,
+ **self._init_args,
+ cls=lambda objs: [Component._from_rest_object(obj) for obj in objs],
+ )
+ if self._registry_name
+ else self._version_operation.list(
+ name=name,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ list_view_type=list_view_type,
+ **self._init_args,
+ cls=lambda objs: [Component._from_rest_object(obj) for obj in objs],
+ )
+ ),
+ )
+ return cast(
+ Iterable[Component],
+ (
+ self._container_operation.list(
+ resource_group_name=self._resource_group_name,
+ registry_name=self._registry_name,
+ **self._init_args,
+ cls=lambda objs: [Component._from_container_rest_object(obj) for obj in objs],
+ )
+ if self._registry_name
+ else self._container_operation.list(
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ list_view_type=list_view_type,
+ **self._init_args,
+ cls=lambda objs: [Component._from_container_rest_object(obj) for obj in objs],
+ )
+ ),
+ )
+
+ @monitor_with_telemetry_mixin(ops_logger, "ComponentVersion.Get", ActivityType.INTERNALCALL)
+ def _get_component_version(self, name: str, version: Optional[str] = DEFAULT_COMPONENT_VERSION) -> ComponentVersion:
+ """Returns ComponentVersion information about the specified component name and version.
+
+ :param name: Name of the code component.
+ :type name: str
+ :param version: Version of the component.
+ :type version: Optional[str]
+ :return: The ComponentVersion object of the specified component name and version.
+ :rtype: ~azure.ai.ml.entities.ComponentVersion
+ """
+ result = (
+ self._version_operation.get(
+ name=name,
+ version=version,
+ resource_group_name=self._resource_group_name,
+ registry_name=self._registry_name,
+ **self._init_args,
+ )
+ if self._registry_name
+ else self._version_operation.get(
+ name=name,
+ version=version,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._init_args,
+ )
+ )
+ return result
+
+ @monitor_with_telemetry_mixin(ops_logger, "Component.Get", ActivityType.PUBLICAPI)
+ def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Component:
+ """Returns information about the specified component.
+
+ :param name: Name of the code component.
+ :type name: str
+ :param version: Version of the component.
+ :type version: Optional[str]
+ :param label: Label of the component, mutually exclusive with version.
+ :type label: Optional[str]
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully
+ identified and retrieved. Details will be provided in the error message.
+ :return: The specified component object.
+ :rtype: ~azure.ai.ml.entities.Component
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START component_operations_get]
+ :end-before: [END component_operations_get]
+ :language: python
+ :dedent: 8
+ :caption: Get component example.
+ """
+ return self._get(name=name, version=version, label=label)
+
+ def _localize_code(self, component: Component, base_dir: Path) -> None:
+ if not isinstance(component, ComponentCodeMixin):
+ return
+ code = component._get_origin_code_value()
+ if not isinstance(code, str):
+ return
+ # registry code will keep the "azureml:" prefix can be used directly
+ if code.startswith("azureml://registries"):
+ return
+
+ target_code_value = "./code"
+ self._code_operations.download(
+ **extract_name_and_version(code),
+ download_path=base_dir.joinpath(target_code_value),
+ )
+
+ setattr(component, component._get_code_field_name(), target_code_value)
+
+ def _localize_environment(self, component: Component, base_dir: Path) -> None:
+ from azure.ai.ml.entities import ParallelComponent
+
+ parent: Any = None
+ if hasattr(component, "environment"):
+ parent = component
+ elif isinstance(component, ParallelComponent):
+ parent = component.task
+ else:
+ return
+
+ # environment can be None
+ if not isinstance(parent.environment, str):
+ return
+ # registry environment will keep the "azureml:" prefix can be used directly
+ if parent.environment.startswith("azureml://registries"):
+ return
+
+ environment = self._environment_operations.get(**extract_name_and_version(parent.environment))
+ environment._localize(base_path=base_dir.absolute().as_posix())
+ parent.environment = environment
+
+ @experimental
+ @monitor_with_telemetry_mixin(ops_logger, "Component.Download", ActivityType.PUBLICAPI)
+ def download(
+ self,
+ name: str,
+ download_path: Union[PathLike, str] = ".",
+ *,
+ version: Optional[str] = None,
+ ) -> None:
+ """Download the specified component and its dependencies to local. Local component can be used to create
+ the component in another workspace or for offline development.
+
+ :param name: Name of the code component.
+ :type name: str
+ :param Union[PathLike, str] download_path: Local path as download destination,
+ defaults to current working directory of the current user. Will be created if not exists.
+ :type download_path: str
+ :keyword version: Version of the component.
+ :paramtype version: Optional[str]
+ :raises ~OSError: Raised if download_path is pointing to an existing directory that is not empty.
+ identified and retrieved. Details will be provided in the error message.
+ :return: The specified component object.
+ :rtype: ~azure.ai.ml.entities.Component
+ """
+ download_path = Path(download_path)
+ component = self._get(name=name, version=version)
+ self._resolve_azureml_id(component)
+
+ output_dir = Path(download_path)
+ if output_dir.is_dir():
+ # an OSError will be raised if the directory is not empty
+ output_dir.rmdir()
+ output_dir.mkdir(parents=True)
+ # download code
+ self._localize_code(component, output_dir)
+
+ # download environment
+ self._localize_environment(component, output_dir)
+
+ component._localize(output_dir.absolute().as_posix())
+ (output_dir / "component_spec.yaml").write_text(component._to_yaml(), encoding=DefaultOpenEncoding.WRITE)
+
+ def _get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Component:
+ if version and label:
+ msg = "Cannot specify both version and label."
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.COMPONENT,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ if not version and not label:
+ label = DEFAULT_LABEL_NAME
+
+ if label == DEFAULT_LABEL_NAME:
+ label = None
+ version = DEFAULT_COMPONENT_VERSION
+
+ if label:
+ return _resolve_label_to_asset(self, name, label)
+
+ result = self._get_component_version(name, version)
+ component = Component._from_rest_object(result)
+ self._resolve_azureml_id(component, jobs_only=True)
+ return component
+
+ @experimental
+ @monitor_with_telemetry_mixin(ops_logger, "Component.Validate", ActivityType.PUBLICAPI)
+ def validate(
+ self,
+ component: Union[Component, types.FunctionType],
+ raise_on_failure: bool = False,
+ **kwargs: Any,
+ ) -> ValidationResult:
+ """validate a specified component. if there are inline defined
+ entities, e.g. Environment, Code, they won't be created.
+
+ :param component: The component object or a mldesigner component function that generates component object
+ :type component: Union[Component, types.FunctionType]
+ :param raise_on_failure: Whether to raise exception on validation error. Defaults to False
+ :type raise_on_failure: bool
+ :return: All validation errors
+ :rtype: ~azure.ai.ml.entities.ValidationResult
+ """
+ return self._validate(
+ component,
+ raise_on_failure=raise_on_failure,
+ # TODO 2330505: change this to True after remote validation is ready
+ skip_remote_validation=kwargs.pop("skip_remote_validation", True),
+ )
+
+ @monitor_with_telemetry_mixin(ops_logger, "Component.Validate", ActivityType.INTERNALCALL)
+ def _validate(
+ self,
+ component: Union[Component, types.FunctionType],
+ raise_on_failure: bool,
+ skip_remote_validation: bool,
+ ) -> ValidationResult:
+ """Implementation of validate. Add this function to avoid calling validate() directly in create_or_update(),
+ which will impact telemetry statistics & bring experimental warning in create_or_update().
+
+ :param component: The component
+ :type component: Union[Component, types.FunctionType]
+ :param raise_on_failure: Whether to raise on failure.
+ :type raise_on_failure: bool
+ :param skip_remote_validation: Whether to skip remote validation.
+ :type skip_remote_validation: bool
+ :return: The validation result
+ :rtype: ValidationResult
+ """
+ # Update component when the input is a component function
+ if isinstance(component, types.FunctionType):
+ component = _refine_component(component)
+
+ # local validation
+ result = component._validate(raise_error=raise_on_failure)
+ # remote validation, note that preflight_operation is not available for registry client
+ if not skip_remote_validation and self._preflight_operation:
+ workspace = self._workspace_operations.get()
+ remote_validation_result = self._preflight_operation.begin_validate(
+ resource_group_name=self._resource_group_name,
+ deployment_name=self._workspace_name,
+ parameters=component._build_rest_object_for_remote_validation(
+ location=workspace.location,
+ workspace_name=self._workspace_name,
+ ),
+ **self._init_args,
+ )
+ result.merge_with(
+ # pylint: disable=protected-access
+ component._build_validation_result_from_rest_object(remote_validation_result.result()),
+ overwrite=True,
+ )
+ # resolve location for diagnostics from remote validation
+ result.resolve_location_for_diagnostics(component._source_path) # type: ignore
+ return component._try_raise( # pylint: disable=protected-access
+ result,
+ raise_error=raise_on_failure,
+ )
+
+ def _update_flow_rest_object(self, rest_component_resource: Any) -> None:
+ import re
+
+ from azure.ai.ml._utils._arm_id_utils import AMLVersionedArmId
+
+ component_spec = rest_component_resource.properties.component_spec
+ code, flow_file_name = AMLVersionedArmId(component_spec["code"]), component_spec.pop("flow_file_name")
+ # TODO: avoid remote request here if met performance issue
+ created_code = self._code_operations.get(name=code.asset_name, version=code.asset_version)
+ # remove port number and append flow file name to get full uri for flow.dag.yaml
+ component_spec["flow_definition_uri"] = f"{re.sub(r':[0-9]+/', '/', created_code.path)}/{flow_file_name}"
+
+ def _reset_version_if_no_change(self, component: Component, current_name: str, current_version: str) -> Tuple:
+ """Reset component version to default version if there's no change in the component.
+
+ :param component: The component object
+ :type component: Component
+ :param current_name: The component name
+ :type current_name: str
+ :param current_version: The component version
+ :type current_version: str
+ :return: The new version and rest component resource
+ :rtype: Tuple[str, ComponentVersion]
+ """
+ rest_component_resource = component._to_rest_object()
+
+ try:
+ client_component_hash = rest_component_resource.properties.properties.get("client_component_hash")
+ remote_component_version = self._get_component_version(name=current_name) # will raise error if not found.
+ remote_component_hash = remote_component_version.properties.properties.get("client_component_hash")
+ if client_component_hash == remote_component_hash:
+ component.version = remote_component_version.properties.component_spec.get(
+ "version"
+ ) # only update the default version component instead of creating a new version component
+ logger.warning(
+ "The component is not modified compared to the default version "
+ "and the new version component registration is skipped."
+ )
+ return component.version, component._to_rest_object()
+ except ResourceNotFoundError as e:
+ logger.info("Failed to get component version, %s", e)
+ except Exception as e: # pylint: disable=W0718
+ logger.error("Failed to compare client_component_hash, %s", e)
+
+ return current_version, rest_component_resource
+
+ def _create_or_update_component_version(
+ self,
+ component: Component,
+ name: str,
+ version: Optional[str],
+ rest_component_resource: Any,
+ ) -> Any:
+ try:
+ if self._registry_name:
+ start_time = time.time()
+ path_format_arguments = {
+ "componentName": component.name,
+ "resourceGroupName": self._resource_group_name,
+ "registryName": self._registry_name,
+ }
+ poller = self._version_operation.begin_create_or_update(
+ name=name,
+ version=version,
+ resource_group_name=self._operation_scope.resource_group_name,
+ registry_name=self._registry_name,
+ body=rest_component_resource,
+ polling=AzureMLPolling(
+ LROConfigurations.POLL_INTERVAL,
+ path_format_arguments=path_format_arguments,
+ ),
+ )
+ message = f"Creating/updating registry component {component.name} with version {component.version} "
+ polling_wait(poller=poller, start_time=start_time, message=message, timeout=None)
+
+ else:
+ # _auto_increment_version can be True for non-registry component creation operation;
+ # and anonymous component should use hash as version
+ if not component._is_anonymous and component._auto_increment_version:
+ return _create_or_update_autoincrement(
+ name=name,
+ body=rest_component_resource,
+ version_operation=self._version_operation,
+ container_operation=self._container_operation,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._init_args,
+ )
+
+ return self._version_operation.create_or_update(
+ name=name,
+ version=version,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ body=rest_component_resource,
+ **self._init_args,
+ )
+ except Exception as e:
+ raise e
+
+ return None
+
+ @monitor_with_telemetry_mixin(
+ logger,
+ "Component.CreateOrUpdate",
+ ActivityType.PUBLICAPI,
+ extra_keys=["is_anonymous"],
+ )
+ def create_or_update(
+ self,
+ component: Component,
+ version: Optional[str] = None,
+ *,
+ skip_validation: bool = False,
+ **kwargs: Any,
+ ) -> Component:
+ """Create or update a specified component. if there're inline defined
+ entities, e.g. Environment, Code, they'll be created together with the
+ component.
+
+ :param component: The component object or a mldesigner component function that generates component object
+ :type component: Union[Component, types.FunctionType]
+ :param version: The component version to override.
+ :type version: str
+ :keyword skip_validation: whether to skip validation before creating/updating the component, defaults to False
+ :paramtype skip_validation: bool
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Component cannot be successfully validated.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.AssetException: Raised if Component assets
+ (e.g. Data, Code, Model, Environment) cannot be successfully validated.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.ComponentException: Raised if Component type is unsupported.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.ModelException: Raised if Component model cannot be successfully validated.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory.
+ :return: The specified component object.
+ :rtype: ~azure.ai.ml.entities.Component
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START component_operations_create_or_update]
+ :end-before: [END component_operations_create_or_update]
+ :language: python
+ :dedent: 8
+ :caption: Create component example.
+ """
+ # Update component when the input is a component function
+ if isinstance(component, types.FunctionType):
+ component = _refine_component(component)
+ if version is not None:
+ component.version = version
+ # In non-registry scenario, if component does not have version, no need to get next version here.
+ # As Component property version has setter that updates `_auto_increment_version` in-place, then
+ # a component will get a version after its creation, and it will always use this version in its
+ # future creation operations, which breaks version auto increment mechanism.
+ if self._registry_name and not component.version and component._auto_increment_version:
+ component.version = _get_next_version_from_container(
+ name=component.name,
+ container_operation=self._container_operation,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ registry_name=self._registry_name,
+ **self._init_args,
+ )
+
+ if not component._is_anonymous:
+ component._is_anonymous = kwargs.pop("is_anonymous", False)
+
+ if not skip_validation:
+ self._validate(component, raise_on_failure=True, skip_remote_validation=True)
+
+ # Create all dependent resources
+ # Only upload dependencies if component is NOT IPP
+ if not component._intellectual_property:
+ self._resolve_arm_id_or_upload_dependencies(component)
+
+ name, version = component._get_rest_name_version()
+ if not component._is_anonymous and kwargs.get("skip_if_no_change"):
+ version, rest_component_resource = self._reset_version_if_no_change(
+ component,
+ current_name=name,
+ current_version=str(version),
+ )
+ else:
+ rest_component_resource = component._to_rest_object()
+
+ # TODO: remove this after server side support directly using client created code
+ from azure.ai.ml.entities._component.flow import FlowComponent
+
+ if isinstance(component, FlowComponent):
+ self._update_flow_rest_object(rest_component_resource)
+
+ result = self._create_or_update_component_version(
+ component,
+ name,
+ version,
+ rest_component_resource,
+ )
+
+ if not result:
+ component = self.get(name=component.name, version=component.version)
+ else:
+ component = Component._from_rest_object(result)
+
+ self._resolve_azureml_id(
+ component=component,
+ jobs_only=True,
+ )
+ return component
+
+ @experimental
+ def prepare_for_sign(self, component: Component) -> None:
+ ignore_file = IgnoreFile()
+
+ if isinstance(component, ComponentCodeMixin):
+ with component._build_code() as code:
+ delete_two_catalog_files(code.path)
+ ignore_file = get_ignore_file(code.path) if code._ignore_file is None else ignore_file
+ file_list = get_upload_files_from_folder(code.path, ignore_file=ignore_file)
+ json_stub = {}
+ json_stub["HashAlgorithm"] = "SHA256"
+ json_stub["CatalogItems"] = {} # type: ignore
+
+ for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
+ file_hash = _get_file_hash(file_path, hashlib.sha256()).hexdigest().upper()
+ json_stub["CatalogItems"][file_name] = file_hash # type: ignore
+
+ json_stub["CatalogItems"] = collections.OrderedDict( # type: ignore
+ sorted(json_stub["CatalogItems"].items()) # type: ignore
+ )
+ create_catalog_files(code.path, json_stub)
+
+ @monitor_with_telemetry_mixin(ops_logger, "Component.Archive", ActivityType.PUBLICAPI)
+ def archive(
+ self,
+ name: str,
+ version: Optional[str] = None,
+ label: Optional[str] = None,
+ # pylint:disable=unused-argument
+ **kwargs: Any,
+ ) -> None:
+ """Archive a component.
+
+ :param name: Name of the component.
+ :type name: str
+ :param version: Version of the component.
+ :type version: str
+ :param label: Label of the component. (mutually exclusive with version).
+ :type label: str
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START component_operations_archive]
+ :end-before: [END component_operations_archive]
+ :language: python
+ :dedent: 8
+ :caption: Archive component example.
+ """
+ _archive_or_restore(
+ asset_operations=self,
+ version_operation=self._version_operation,
+ container_operation=self._container_operation,
+ is_archived=True,
+ name=name,
+ version=version,
+ label=label,
+ )
+
+ @monitor_with_telemetry_mixin(ops_logger, "Component.Restore", ActivityType.PUBLICAPI)
+ def restore(
+ self,
+ name: str,
+ version: Optional[str] = None,
+ label: Optional[str] = None,
+ # pylint:disable=unused-argument
+ **kwargs: Any,
+ ) -> None:
+ """Restore an archived component.
+
+ :param name: Name of the component.
+ :type name: str
+ :param version: Version of the component.
+ :type version: str
+ :param label: Label of the component. (mutually exclusive with version).
+ :type label: str
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START component_operations_restore]
+ :end-before: [END component_operations_restore]
+ :language: python
+ :dedent: 8
+ :caption: Restore component example.
+ """
+ _archive_or_restore(
+ asset_operations=self,
+ version_operation=self._version_operation,
+ container_operation=self._container_operation,
+ is_archived=False,
+ name=name,
+ version=version,
+ label=label,
+ )
+
+ def _get_latest_version(self, component_name: str) -> Component:
+ """Returns the latest version of the asset with the given name.
+
+ Latest is defined as the most recently created, not the most
+ recently updated.
+
+ :param component_name: The component name
+ :type component_name: str
+ :return: A latest version of the named Component
+ :rtype: Component
+ """
+
+ result = (
+ _get_latest(
+ component_name,
+ self._version_operation,
+ self._resource_group_name,
+ workspace_name=None,
+ registry_name=self._registry_name,
+ )
+ if self._registry_name
+ else _get_latest(
+ component_name,
+ self._version_operation,
+ self._resource_group_name,
+ self._workspace_name,
+ )
+ )
+ return Component._from_rest_object(result)
+
+ @classmethod
+ def _try_resolve_environment_for_component(
+ cls, component: Union[BaseNode, str], _: str, resolver: _AssetResolver
+ ) -> None:
+ if isinstance(component, BaseNode):
+ component = component._component # pylint: disable=protected-access
+
+ if isinstance(component, str):
+ return
+ potential_parents: List[BaseNode] = [component]
+ if hasattr(component, "task"):
+ potential_parents.append(component.task)
+ for parent in potential_parents:
+ # for internal component, environment may be a dict or InternalEnvironment object
+ # in these two scenarios, we don't need to resolve the environment;
+ # Note for not directly importing InternalEnvironment and check with `isinstance`:
+ # import from azure.ai.ml._internal will enable internal component feature for all users,
+ # therefore, use type().__name__ to avoid import and execute type check
+ if not hasattr(parent, "environment"):
+ continue
+ if isinstance(parent.environment, dict):
+ continue
+ if type(parent.environment).__name__ == "InternalEnvironment":
+ continue
+ parent.environment = resolver(parent.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
+
+ def _resolve_azureml_id(self, component: Component, jobs_only: bool = False) -> None:
+ # TODO: remove the parameter `jobs_only`. Some tests are expecting an arm id after resolving for now.
+ resolver = self._orchestrators.resolve_azureml_id
+ self._resolve_dependencies_for_component(component, resolver, jobs_only=jobs_only)
+
+ def _resolve_arm_id_or_upload_dependencies(self, component: Component) -> None:
+ resolver = OperationOrchestrator(
+ self._all_operations, self._operation_scope, self._operation_config
+ ).get_asset_arm_id
+
+ self._resolve_dependencies_for_component(component, resolver)
+
+ def _resolve_dependencies_for_component(
+ self,
+ component: Component,
+ resolver: Callable,
+ *,
+ jobs_only: bool = False,
+ ) -> None:
+ # for now, many tests are expecting long arm id instead of short id for environment and code
+ if not jobs_only:
+ if isinstance(component, AutoMLComponent):
+ # no extra dependency for automl component
+ return
+
+ # type check for potential Job type, which is unexpected here.
+ if not isinstance(component, Component):
+ msg = f"Non supported component type: {type(component)}"
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.COMPONENT,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ # resolve component's code
+ _try_resolve_code_for_component(component=component, resolver=resolver)
+ # resolve component's environment
+ self._try_resolve_environment_for_component(
+ component=component, # type: ignore
+ resolver=resolver,
+ _="",
+ )
+
+ self._resolve_dependencies_for_pipeline_component_jobs(
+ component,
+ resolver=resolver,
+ )
+
+ def _resolve_inputs_for_pipeline_component_jobs(self, jobs: Dict[str, Any], base_path: str) -> None:
+ """Resolve inputs for jobs in a pipeline component.
+
+ :param jobs: A dict of nodes in a pipeline component.
+ :type jobs: Dict[str, Any]
+ :param base_path: The base path used to resolve inputs. Usually it's the base path of the pipeline component.
+ :type base_path: str
+ """
+ from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+
+ for _, job_instance in jobs.items():
+ # resolve inputs for each job's component
+ if isinstance(job_instance, BaseNode):
+ node: BaseNode = job_instance
+ self._job_operations._resolve_job_inputs(
+ # parameter group input need to be flattened first
+ self._job_operations._flatten_group_inputs(node.inputs),
+ base_path,
+ )
+ elif isinstance(job_instance, AutoMLJob):
+ self._job_operations._resolve_automl_job_inputs(job_instance)
+
+ @classmethod
+ def _resolve_binding_on_supported_fields_for_node(cls, node: BaseNode) -> None:
+ """Resolve all PipelineInput(binding from sdk) on supported fields to string.
+
+ :param node: The node
+ :type node: BaseNode
+ """
+ from azure.ai.ml.entities._job.pipeline._attr_dict import (
+ try_get_non_arbitrary_attr,
+ )
+ from azure.ai.ml.entities._job.pipeline._io import PipelineInput
+
+ # compute binding to pipeline input is supported on node.
+ supported_fields = ["compute", "compute_name"]
+ for field_name in supported_fields:
+ val = try_get_non_arbitrary_attr(node, field_name)
+ if isinstance(val, PipelineInput):
+ # Put binding string to field
+ setattr(node, field_name, val._data_binding())
+
+ @classmethod
+ def _try_resolve_node_level_task_for_parallel_node(cls, node: BaseNode, _: str, resolver: _AssetResolver) -> None:
+ """Resolve node.task.code for parallel node if it's a reference to node.component.task.code.
+
+ This is a hack operation.
+
+ parallel_node.task.code won't be resolved directly for now, but it will be resolved if
+ parallel_node.task is a reference to parallel_node.component.task. Then when filling back
+ parallel_node.component.task.code, parallel_node.task.code will be changed as well.
+
+ However, if we enable in-memory/on-disk cache for component resolution, such change
+ won't happen, so we resolve node level task code manually here.
+
+ Note that we will always use resolved node.component.code to fill back node.task.code
+ given code overwrite on parallel node won't take effect for now. This is to make behaviors
+ consistent across os and python versions.
+
+ The ideal solution should be done after PRS team decides how to handle parallel.task.code
+
+ :param node: The node
+ :type node: BaseNode
+ :param _: The component name
+ :type _: str
+ :param resolver: The resolver function
+ :type resolver: _AssetResolver
+ """
+ from azure.ai.ml.entities import Parallel, ParallelComponent
+
+ if not isinstance(node, Parallel):
+ return
+ component = node._component # pylint: disable=protected-access
+ if not isinstance(component, ParallelComponent):
+ return
+ if not node.task:
+ return
+
+ if node.task.code:
+ _try_resolve_code_for_component(
+ component,
+ resolver=resolver,
+ )
+ node.task.code = component.code
+ if node.task.environment:
+ node.task.environment = resolver(component.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
+
+ @classmethod
+ def _set_default_display_name_for_anonymous_component_in_node(cls, node: BaseNode, default_name: str) -> None:
+ """Set default display name for anonymous component in a node.
+ If node._component is an anonymous component and without display name, set the default display name.
+
+ :param node: The node
+ :type node: BaseNode
+ :param default_name: The default name to set
+ :type default_name: str
+ """
+ if not isinstance(node, BaseNode):
+ return
+ component = node._component
+ if isinstance(component, PipelineComponent):
+ return
+ # Set display name as node name
+ # TODO: the same anonymous component with different node name will have different anonymous hash
+ # as their display name will be different.
+ if (
+ isinstance(component, Component)
+ # check if component is anonymous and not created based on its id. We can't directly check
+ # node._component._is_anonymous as it will be set to True on component creation,
+ # which is later than this check
+ and not component.id
+ and not component.display_name
+ ):
+ component.display_name = default_name
+
+ @classmethod
+ def _try_resolve_compute_for_node(cls, node: BaseNode, _: str, resolver: _AssetResolver) -> None:
+ """Resolve compute for base node.
+
+ :param node: The node
+ :type node: BaseNode
+ :param _: The node name
+ :type _: str
+ :param resolver: The resolver function
+ :type resolver: _AssetResolver
+ """
+ if not isinstance(node, BaseNode):
+ return
+ if not isinstance(node._component, PipelineComponent):
+ # Resolve compute for other type
+ # Keep data binding expression as they are
+ if not is_data_binding_expression(node.compute):
+ # Get compute for each job
+ node.compute = resolver(node.compute, azureml_type=AzureMLResourceType.COMPUTE)
+ if has_attr_safe(node, "compute_name") and not is_data_binding_expression(node.compute_name):
+ node.compute_name = resolver(node.compute_name, azureml_type=AzureMLResourceType.COMPUTE)
+
+ @classmethod
+ def _divide_nodes_to_resolve_into_layers(
+ cls,
+ component: PipelineComponent,
+ extra_operations: List[Callable[[BaseNode, str], Any]],
+ ) -> List:
+ """Traverse the pipeline component and divide nodes to resolve into layers. Note that all leaf nodes will be
+ put in the last layer.
+ For example, for below pipeline component, assuming that all nodes need to be resolved:
+ A
+ /|\
+ B C D
+ | |
+ E F
+ |
+ G
+ return value will be:
+ [
+ [("B", B), ("C", C)],
+ [("E", E)],
+ [("D", D), ("F", F), ("G", G)],
+ ]
+
+ :param component: The pipeline component to resolve.
+ :type component: PipelineComponent
+ :param extra_operations: Extra operations to apply on nodes during the traversing.
+ :type extra_operations: List[Callable[Callable[[BaseNode, str], Any]]]
+ :return: A list of layers of nodes to resolve.
+ :rtype: List[List[Tuple[str, BaseNode]]]
+ """
+ nodes_to_process = list(component.jobs.items())
+ layers: List = []
+ leaf_nodes = []
+
+ while nodes_to_process:
+ layers.append([])
+ new_nodes_to_process = []
+ for key, job_instance in nodes_to_process:
+ cls._resolve_binding_on_supported_fields_for_node(job_instance)
+ if isinstance(job_instance, LoopNode):
+ job_instance = job_instance.body
+
+ for extra_operation in extra_operations:
+ extra_operation(job_instance, key)
+
+ if isinstance(job_instance, BaseNode) and isinstance(job_instance._component, PipelineComponent):
+ # candidates for next layer
+ new_nodes_to_process.extend(job_instance.component.jobs.items())
+ # use layers to store pipeline nodes in each layer for now
+ layers[-1].append((key, job_instance))
+ else:
+ # note that LoopNode has already been replaced by its body here
+ leaf_nodes.append((key, job_instance))
+ nodes_to_process = new_nodes_to_process
+
+ # if there is subgraph, the last item in layers will be empty for now as all leaf nodes are stored in leaf_nodes
+ if len(layers) != 0:
+ layers.pop()
+ layers.append(leaf_nodes)
+
+ return layers
+
+ def _get_workspace_key(self) -> str:
+ try:
+ workspace_rest = self._workspace_operations._operation.get(
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ )
+ return str(workspace_rest.workspace_id)
+ except HttpResponseError:
+ return "{}/{}/{}".format(self._subscription_id, self._resource_group_name, self._workspace_name)
+
+ def _get_registry_key(self) -> str:
+ """Get key for used registry.
+
+ Note that, although registry id is in registry discovery response, it is not in RegistryDiscoveryDto; and we'll
+ lose the information after deserialization.
+ To avoid changing related rest client, we simply use registry related information from self to construct
+ registry key, which means that on-disk cache will be invalid if a registry is deleted and then created
+ again with the same name.
+
+ :return: The registry key
+ :rtype: str
+ """
+ return "{}/{}/{}".format(self._subscription_id, self._resource_group_name, self._registry_name)
+
+ def _get_client_key(self) -> str:
+ """Get key for used client.
+ Key should be able to uniquely identify used registry or workspace.
+
+ :return: The client key
+ :rtype: str
+ """
+ # check cache first
+ if self._client_key:
+ return self._client_key
+
+ # registry name has a higher priority comparing to workspace name according to current __init__ implementation
+ # of MLClient
+ if self._registry_name:
+ self._client_key = "registry/" + self._get_registry_key()
+ elif self._workspace_name:
+ self._client_key = "workspace/" + self._get_workspace_key()
+ else:
+ # This should never happen.
+ raise ValueError("Either workspace name or registry name must be provided to use component operations.")
+ return self._client_key
+
+ def _resolve_dependencies_for_pipeline_component_jobs(
+ self,
+ component: Union[Component, str],
+ resolver: _AssetResolver,
+ ) -> None:
+ """Resolve dependencies for pipeline component jobs.
+ Will directly return if component is not a pipeline component.
+
+ :param component: The pipeline component to resolve.
+ :type component: Union[Component, str]
+ :param resolver: The resolver to resolve the dependencies.
+ :type resolver: _AssetResolver
+ """
+ if not isinstance(component, PipelineComponent) or not component.jobs:
+ return
+
+ from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+
+ self._resolve_inputs_for_pipeline_component_jobs(component.jobs, component._base_path)
+
+ # This is a preparation for concurrent resolution. Nodes will be resolved later layer by layer
+ # from bottom to top, as hash calculation of a parent node will be impacted by resolution
+ # of its child nodes.
+ layers = self._divide_nodes_to_resolve_into_layers(
+ component,
+ extra_operations=[
+ # no need to do this as we now keep the original component name for anonymous components
+ # self._set_default_display_name_for_anonymous_component_in_node,
+ partial(
+ self._try_resolve_node_level_task_for_parallel_node,
+ resolver=resolver,
+ ),
+ partial(self._try_resolve_environment_for_component, resolver=resolver),
+ partial(self._try_resolve_compute_for_node, resolver=resolver),
+ # should we resolve code here after we do extra operations concurrently?
+ ],
+ )
+
+ # cache anonymous component only for now
+ # request level in-memory cache can be a better solution for other type of assets as they are
+ # relatively simple and of small number of distinct instances
+ component_cache = CachedNodeResolver(
+ resolver=resolver,
+ client_key=self._get_client_key(),
+ )
+
+ for layer in reversed(layers):
+ for _, job_instance in layer:
+ if isinstance(job_instance, AutoMLJob):
+ # only compute is resolved here
+ self._job_operations._resolve_arm_id_for_automl_job(job_instance, resolver, inside_pipeline=True)
+ elif isinstance(job_instance, BaseNode):
+ component_cache.register_node_for_lazy_resolution(job_instance)
+ elif isinstance(job_instance, ConditionNode):
+ pass
+ else:
+ msg = f"Non supported job type in Pipeline: {type(job_instance)}"
+ raise ComponentException(
+ message=msg,
+ target=ErrorTarget.COMPONENT,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ component_cache.resolve_nodes()
+
+
+def _refine_component(component_func: Any) -> Component:
+ """Return the component of function that is decorated by command
+ component decorator.
+
+ :param component_func: Function that is decorated by command component decorator
+ :type component_func: types.FunctionType
+ :return: Component entity of target function
+ :rtype: Component
+ """
+
+ def check_parameter_type(f: Any) -> None:
+ """Check all parameter is annotated or has a default value with clear type(not None).
+
+ :param f: The component function
+ :type f: types.FunctionType
+ """
+ annotations = getattr(f, "__annotations__", {})
+ func_parameters = signature(f).parameters
+ defaults_dict = {key: val.default for key, val in func_parameters.items()}
+ variable_inputs = [
+ key for key, val in func_parameters.items() if val.kind in [val.VAR_POSITIONAL, val.VAR_KEYWORD]
+ ]
+ if variable_inputs:
+ msg = "Cannot register the component {} with variable inputs {!r}."
+ raise ValidationException(
+ message=msg.format(f.__name__, variable_inputs),
+ no_personal_data_message=msg.format("[keys]", "[name]"),
+ target=ErrorTarget.COMPONENT,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ unknown_type_keys = [
+ key for key, val in defaults_dict.items() if key not in annotations and val is Parameter.empty
+ ]
+ if unknown_type_keys:
+ msg = "Unknown type of parameter {} in pipeline func {!r}, please add type annotation."
+ raise ValidationException(
+ message=msg.format(unknown_type_keys, f.__name__),
+ no_personal_data_message=msg.format("[keys]", "[name]"),
+ target=ErrorTarget.COMPONENT,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ def check_non_pipeline_inputs(f: Any) -> None:
+ """Check whether non_pipeline_inputs exist in pipeline builder.
+
+ :param f: The component function
+ :type f: types.FunctionType
+ """
+ if f._pipeline_builder.non_pipeline_parameter_names:
+ msg = "Cannot register pipeline component {!r} with non_pipeline_inputs."
+ raise ValidationException(
+ message=msg.format(f.__name__),
+ no_personal_data_message=msg.format(""),
+ target=ErrorTarget.COMPONENT,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ if hasattr(component_func, "_is_mldesigner_component") and component_func._is_mldesigner_component:
+ return component_func.component
+ if hasattr(component_func, "_is_dsl_func") and component_func._is_dsl_func:
+ check_non_pipeline_inputs(component_func)
+ check_parameter_type(component_func)
+ if component_func._job_settings:
+ module_logger.warning(
+ "Job settings %s on pipeline function '%s' are ignored when creating PipelineComponent.",
+ component_func._job_settings,
+ component_func.__name__,
+ )
+ # Normally pipeline component are created when dsl.pipeline inputs are provided
+ # so pipeline input .result() can resolve to correct value.
+ # When pipeline component created without dsl.pipeline inputs, pipeline input .result() won't work.
+ return component_func._pipeline_builder.build(user_provided_kwargs={})
+ msg = "Function must be a dsl or mldesigner component function: {!r}"
+ raise ValidationException(
+ message=msg.format(component_func),
+ no_personal_data_message=msg.format("component"),
+ error_category=ErrorCategory.USER_ERROR,
+ target=ErrorTarget.COMPONENT,
+ )
+
+
+def _try_resolve_code_for_component(component: Component, resolver: _AssetResolver) -> None:
+ if isinstance(component, ComponentCodeMixin):
+ with component._build_code() as code:
+ if code is None:
+ code = component._get_origin_code_value()
+ if code is None:
+ return
+ component._fill_back_code_value(resolver(code, azureml_type=AzureMLResourceType.CODE)) # type: ignore