diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py | 891 |
1 files changed, 891 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py new file mode 100644 index 00000000..a54b1739 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py @@ -0,0 +1,891 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access,no-value-for-parameter + +import os +import time +import uuid +from contextlib import contextmanager +from pathlib import Path +from typing import Any, Dict, Generator, Iterable, List, Optional, Union, cast + +from marshmallow.exceptions import ValidationError as SchemaValidationError + +from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_path +from azure.ai.ml._artifacts._constants import ( + ASSET_PATH_ERROR, + CHANGED_ASSET_PATH_MSG, + CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA, +) +from azure.ai.ml._exception_helper import log_and_raise_error +from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import ( + AzureMachineLearningWorkspaces as ServiceClient102021Dataplane, +) +from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042023_preview +from azure.ai.ml._restclient.v2023_04_01_preview.models import ListViewType +from azure.ai.ml._restclient.v2024_01_01_preview import AzureMachineLearningWorkspaces as ServiceClient012024_preview +from azure.ai.ml._restclient.v2024_01_01_preview.models import ComputeInstanceDataMount +from azure.ai.ml._scope_dependent_operations import ( + OperationConfig, + OperationsContainer, + OperationScope, + _ScopeDependentOperations, +) +from azure.ai.ml._telemetry import ActivityType, monitor_with_activity +from azure.ai.ml._utils._asset_utils import ( + _archive_or_restore, + _check_or_modify_auto_delete_setting, + _create_or_update_autoincrement, + _get_latest_version_from_container, + _resolve_label_to_asset, + _validate_auto_delete_setting_in_data_output, + _validate_workspace_managed_datastore, +) +from azure.ai.ml._utils._data_utils import ( + download_mltable_metadata_schema, + read_local_mltable_metadata_contents, + read_remote_mltable_metadata_contents, + validate_mltable_metadata, +) +from azure.ai.ml._utils._experimental import experimental +from azure.ai.ml._utils._http_utils import HttpPipeline +from azure.ai.ml._utils._logger_utils import OpsLogger +from azure.ai.ml._utils._registry_utils import ( + get_asset_body_for_registry_storage, + get_registry_client, + get_sas_uri_for_registry_asset, +) +from azure.ai.ml._utils.utils import is_url +from azure.ai.ml.constants._common import ( + ASSET_ID_FORMAT, + MLTABLE_METADATA_SCHEMA_URL_FALLBACK, + AssetTypes, + AzureMLResourceType, +) +from azure.ai.ml.data_transfer import import_data as import_data_func +from azure.ai.ml.entities import PipelineJob, PipelineJobSettings +from azure.ai.ml.entities._assets import Data, WorkspaceAssetReference +from azure.ai.ml.entities._data.mltable_metadata import MLTableMetadata +from azure.ai.ml.entities._data_import.data_import import DataImport +from azure.ai.ml.entities._inputs_outputs import Output +from azure.ai.ml.entities._inputs_outputs.external_data import Database +from azure.ai.ml.exceptions import ( + AssetPathException, + ErrorCategory, + ErrorTarget, + MlException, + ValidationErrorType, + ValidationException, +) +from azure.ai.ml.operations._datastore_operations import DatastoreOperations +from azure.core.exceptions import HttpResponseError, ResourceNotFoundError +from azure.core.paging import ItemPaged + +ops_logger = OpsLogger(__name__) +module_logger = ops_logger.module_logger + + +class DataOperations(_ScopeDependentOperations): + """DataOperations. + + You should not instantiate this class directly. Instead, you should + create an MLClient instance that instantiates it for you and + attaches it as an attribute. + + :param operation_scope: Scope variables for the operations classes of an MLClient object. + :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope + :param operation_config: Common configuration for operations classes of an MLClient object. + :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig + :param service_client: Service client to allow end users to operate on Azure Machine Learning Workspace + resources (ServiceClient042023Preview or ServiceClient102021Dataplane). + :type service_client: typing.Union[ + ~azure.ai.ml._restclient.v2023_04_01_preview._azure_machine_learning_workspaces.AzureMachineLearningWorkspaces, + ~azure.ai.ml._restclient.v2021_10_01_dataplanepreview._azure_machine_learning_workspaces. + AzureMachineLearningWorkspaces] + :param datastore_operations: Represents a client for performing operations on Datastores. + :type datastore_operations: ~azure.ai.ml.operations._datastore_operations.DatastoreOperations + """ + + def __init__( + self, + operation_scope: OperationScope, + operation_config: OperationConfig, + service_client: Union[ServiceClient042023_preview, ServiceClient102021Dataplane], + service_client_012024_preview: ServiceClient012024_preview, + datastore_operations: DatastoreOperations, + **kwargs: Any, + ): + super(DataOperations, self).__init__(operation_scope, operation_config) + ops_logger.update_filter() + self._operation = service_client.data_versions + self._container_operation = service_client.data_containers + self._datastore_operation = datastore_operations + self._compute_operation = service_client_012024_preview.compute + self._service_client = service_client + self._init_kwargs = kwargs + self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline") + self._all_operations: OperationsContainer = kwargs.pop("all_operations") + # Maps a label to a function which given an asset name, + # returns the asset associated with the label + self._managed_label_resolver = {"latest": self._get_latest_version} + + @monitor_with_activity(ops_logger, "Data.List", ActivityType.PUBLICAPI) + def list( + self, + name: Optional[str] = None, + *, + list_view_type: ListViewType = ListViewType.ACTIVE_ONLY, + ) -> ItemPaged[Data]: + """List the data assets of the workspace. + + :param name: Name of a specific data asset, optional. + :type name: Optional[str] + :keyword list_view_type: View type for including/excluding (for example) archived data assets. + Default: ACTIVE_ONLY. + :type list_view_type: Optional[ListViewType] + :return: An iterator like instance of Data objects + :rtype: ~azure.core.paging.ItemPaged[Data] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START data_operations_list] + :end-before: [END data_operations_list] + :language: python + :dedent: 8 + :caption: List data assets example. + """ + if name: + return ( + self._operation.list( + name=name, + registry_name=self._registry_name, + cls=lambda objs: [Data._from_rest_object(obj) for obj in objs], + list_view_type=list_view_type, + **self._scope_kwargs, + ) + if self._registry_name + else self._operation.list( + name=name, + workspace_name=self._workspace_name, + cls=lambda objs: [Data._from_rest_object(obj) for obj in objs], + list_view_type=list_view_type, + **self._scope_kwargs, + ) + ) + return ( + self._container_operation.list( + registry_name=self._registry_name, + cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs], + list_view_type=list_view_type, + **self._scope_kwargs, + ) + if self._registry_name + else self._container_operation.list( + workspace_name=self._workspace_name, + cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs], + list_view_type=list_view_type, + **self._scope_kwargs, + ) + ) + + def _get(self, name: Optional[str], version: Optional[str] = None) -> Data: + if version: + return ( + self._operation.get( + name=name, + version=version, + registry_name=self._registry_name, + **self._scope_kwargs, + **self._init_kwargs, + ) + if self._registry_name + else self._operation.get( + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + name=name, + version=version, + **self._init_kwargs, + ) + ) + return ( + self._container_operation.get( + name=name, + registry_name=self._registry_name, + **self._scope_kwargs, + **self._init_kwargs, + ) + if self._registry_name + else self._container_operation.get( + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + name=name, + **self._init_kwargs, + ) + ) + + @monitor_with_activity(ops_logger, "Data.Get", ActivityType.PUBLICAPI) + def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Data: # type: ignore + """Get the specified data asset. + + :param name: Name of data asset. + :type name: str + :param version: Version of data asset. + :type version: str + :param label: Label of the data asset. (mutually exclusive with version) + :type label: str + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Data cannot be successfully + identified and retrieved. Details will be provided in the error message. + :return: Data asset object. + :rtype: ~azure.ai.ml.entities.Data + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START data_operations_get] + :end-before: [END data_operations_get] + :language: python + :dedent: 8 + :caption: Get data assets example. + """ + try: + if version and label: + msg = "Cannot specify both version and label." + raise ValidationException( + message=msg, + target=ErrorTarget.DATA, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + if label: + return _resolve_label_to_asset(self, name, label) + + if not version: + msg = "Must provide either version or label." + raise ValidationException( + message=msg, + target=ErrorTarget.DATA, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.MISSING_FIELD, + ) + data_version_resource = self._get(name, version) + return Data._from_rest_object(data_version_resource) + except (ValidationException, SchemaValidationError) as ex: + log_and_raise_error(ex) + + @monitor_with_activity(ops_logger, "Data.CreateOrUpdate", ActivityType.PUBLICAPI) + def create_or_update(self, data: Data) -> Data: + """Returns created or updated data asset. + + If not already in storage, asset will be uploaded to the workspace's blob storage. + + :param data: Data asset object. + :type data: azure.ai.ml.entities.Data + :raises ~azure.ai.ml.exceptions.AssetPathException: Raised when the Data artifact path is + already linked to another asset + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Data cannot be successfully validated. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory. + :return: Data asset object. + :rtype: ~azure.ai.ml.entities.Data + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START data_operations_create_or_update] + :end-before: [END data_operations_create_or_update] + :language: python + :dedent: 8 + :caption: Create data assets example. + """ + try: + name = data.name + if not data.version and self._registry_name: + msg = "Data asset version is required for registry" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.DATA, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.MISSING_FIELD, + ) + version = data.version + + sas_uri = None + if self._registry_name: + # If the data asset is a workspace asset, promote to registry + if isinstance(data, WorkspaceAssetReference): + try: + self._operation.get( + name=data.name, + version=data.version, + resource_group_name=self._resource_group_name, + registry_name=self._registry_name, + ) + except Exception as err: # pylint: disable=W0718 + if isinstance(err, ResourceNotFoundError): + pass + else: + raise err + else: + msg = "An data asset with this name and version already exists in registry" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.DATA, + error_category=ErrorCategory.USER_ERROR, + ) + data_res_obj = data._to_rest_object() + result = self._service_client.resource_management_asset_reference.begin_import_method( + resource_group_name=self._resource_group_name, + registry_name=self._registry_name, + body=data_res_obj, + ).result() + + if not result: + data_res_obj = self._get(name=data.name, version=data.version) + return Data._from_rest_object(data_res_obj) + + sas_uri = get_sas_uri_for_registry_asset( + service_client=self._service_client, + name=name, + version=version, + resource_group=self._resource_group_name, + registry=self._registry_name, + body=get_asset_body_for_registry_storage(self._registry_name, "data", name, version), + ) + + referenced_uris = self._validate(data) + if referenced_uris: + data._referenced_uris = referenced_uris + + data, _ = _check_and_upload_path( + artifact=data, + asset_operations=self, + sas_uri=sas_uri, + artifact_type=ErrorTarget.DATA, + show_progress=self._show_progress, + ) + + _check_or_modify_auto_delete_setting(data.auto_delete_setting) + + data_version_resource = data._to_rest_object() + auto_increment_version = data._auto_increment_version + + if auto_increment_version: + result = _create_or_update_autoincrement( + name=data.name, + body=data_version_resource, + version_operation=self._operation, + container_operation=self._container_operation, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + **self._init_kwargs, + ) + else: + result = ( + self._operation.begin_create_or_update( + name=name, + version=version, + registry_name=self._registry_name, + body=data_version_resource, + **self._scope_kwargs, + ).result() + if self._registry_name + else self._operation.create_or_update( + name=name, + version=version, + workspace_name=self._workspace_name, + body=data_version_resource, + **self._scope_kwargs, + ) + ) + + if not result and self._registry_name: + result = self._get(name=name, version=version) + + return Data._from_rest_object(result) + except Exception as ex: + if isinstance(ex, (ValidationException, SchemaValidationError)): + log_and_raise_error(ex) + elif isinstance(ex, HttpResponseError): + # service side raises an exception if we attempt to update an existing asset's asset path + if str(ex) == ASSET_PATH_ERROR: + raise AssetPathException( + message=CHANGED_ASSET_PATH_MSG, + tartget=ErrorTarget.DATA, + no_personal_data_message=CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA, + error_category=ErrorCategory.USER_ERROR, + ) from ex + raise ex + + @monitor_with_activity(ops_logger, "Data.ImportData", ActivityType.PUBLICAPI) + @experimental + def import_data(self, data_import: DataImport, **kwargs: Any) -> PipelineJob: + """Returns the data import job that is creating the data asset. + + :param data_import: DataImport object. + :type data_import: azure.ai.ml.entities.DataImport + :return: data import job object. + :rtype: ~azure.ai.ml.entities.PipelineJob + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START data_operations_import_data] + :end-before: [END data_operations_import_data] + :language: python + :dedent: 8 + :caption: Import data assets example. + """ + + experiment_name = "data_import_" + str(data_import.name) + data_import.type = AssetTypes.MLTABLE if isinstance(data_import.source, Database) else AssetTypes.URI_FOLDER + + # avoid specifying auto_delete_setting in job output now + _validate_auto_delete_setting_in_data_output(data_import.auto_delete_setting) + + # block cumtomer specified path on managed datastore + data_import.path = _validate_workspace_managed_datastore(data_import.path) + + if "${{name}}" not in str(data_import.path): + data_import.path = data_import.path.rstrip("/") + "/${{name}}" # type: ignore + import_job = import_data_func( + description=data_import.description or experiment_name, + display_name=experiment_name, + experiment_name=experiment_name, + compute="serverless", + source=data_import.source, + outputs={ + "sink": Output( + type=data_import.type, + path=data_import.path, # type: ignore + name=data_import.name, + version=data_import.version, + ) + }, + ) + import_pipeline = PipelineJob( + description=data_import.description or experiment_name, + tags=data_import.tags, + display_name=experiment_name, + experiment_name=experiment_name, + properties=data_import.properties or {}, + settings=PipelineJobSettings(force_rerun=True), + jobs={experiment_name: import_job}, + ) + import_pipeline.properties["azureml.materializationAssetName"] = data_import.name + return self._all_operations.all_operations[AzureMLResourceType.JOB].create_or_update( + job=import_pipeline, skip_validation=True, **kwargs + ) + + @monitor_with_activity(ops_logger, "Data.ListMaterializationStatus", ActivityType.PUBLICAPI) + def list_materialization_status( + self, + name: str, + *, + list_view_type: ListViewType = ListViewType.ACTIVE_ONLY, + **kwargs: Any, + ) -> Iterable[PipelineJob]: + """List materialization jobs of the asset. + + :param name: name of asset being created by the materialization jobs. + :type name: str + :keyword list_view_type: View type for including/excluding (for example) archived jobs. Default: ACTIVE_ONLY. + :paramtype list_view_type: Optional[ListViewType] + :return: An iterator like instance of Job objects. + :rtype: ~azure.core.paging.ItemPaged[PipelineJob] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START data_operations_list_materialization_status] + :end-before: [END data_operations_list_materialization_status] + :language: python + :dedent: 8 + :caption: List materialization jobs example. + """ + + return cast( + Iterable[PipelineJob], + self._all_operations.all_operations[AzureMLResourceType.JOB].list( + job_type="Pipeline", + asset_name=name, + list_view_type=list_view_type, + **kwargs, + ), + ) + + @monitor_with_activity(ops_logger, "Data.Validate", ActivityType.INTERNALCALL) + def _validate(self, data: Data) -> Optional[List[str]]: + if not data.path: + msg = "Missing data path. Path is required for data." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + error_type=ValidationErrorType.MISSING_FIELD, + target=ErrorTarget.DATA, + error_category=ErrorCategory.USER_ERROR, + ) + + asset_path = str(data.path) + asset_type = data.type + base_path = data.base_path + + if asset_type == AssetTypes.MLTABLE: + if is_url(asset_path): + try: + metadata_contents = read_remote_mltable_metadata_contents( + base_uri=asset_path, + datastore_operations=self._datastore_operation, + requests_pipeline=self._requests_pipeline, + ) + metadata_yaml_path = None + except Exception: # pylint: disable=W0718 + # skip validation for remote MLTable when the contents cannot be read + module_logger.info("Unable to access MLTable metadata at path %s", asset_path) + return None + else: + metadata_contents = read_local_mltable_metadata_contents(path=asset_path) + metadata_yaml_path = Path(asset_path, "MLTable") + metadata = MLTableMetadata._load(metadata_contents, metadata_yaml_path) + mltable_metadata_schema = self._try_get_mltable_metadata_jsonschema(data._mltable_schema_url) + if mltable_metadata_schema and not data._skip_validation: + validate_mltable_metadata( + mltable_metadata_dict=metadata_contents, + mltable_schema=mltable_metadata_schema, + ) + return cast(Optional[List[str]], metadata.referenced_uris()) + + if is_url(asset_path): + # skip validation for remote URI_FILE or URI_FOLDER + pass + elif os.path.isabs(asset_path): + _assert_local_path_matches_asset_type(asset_path, asset_type) + else: + abs_path = Path(base_path, asset_path).resolve() + _assert_local_path_matches_asset_type(str(abs_path), asset_type) + + return None + + def _try_get_mltable_metadata_jsonschema(self, mltable_schema_url: Optional[str]) -> Optional[Dict]: + if mltable_schema_url is None: + mltable_schema_url = MLTABLE_METADATA_SCHEMA_URL_FALLBACK + try: + return cast(Optional[Dict], download_mltable_metadata_schema(mltable_schema_url, self._requests_pipeline)) + except Exception: # pylint: disable=W0718 + module_logger.info( + 'Failed to download MLTable metadata jsonschema from "%s", skipping validation', + mltable_schema_url, + ) + return None + + @monitor_with_activity(ops_logger, "Data.Archive", ActivityType.PUBLICAPI) + def archive( + self, + name: str, + version: Optional[str] = None, + label: Optional[str] = None, + # pylint:disable=unused-argument + **kwargs: Any, + ) -> None: + """Archive a data asset. + + :param name: Name of data asset. + :type name: str + :param version: Version of data asset. + :type version: str + :param label: Label of the data asset. (mutually exclusive with version) + :type label: str + :return: None + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START data_operations_archive] + :end-before: [END data_operations_archive] + :language: python + :dedent: 8 + :caption: Archive data asset example. + """ + + _archive_or_restore( + asset_operations=self, + version_operation=self._operation, + container_operation=self._container_operation, + is_archived=True, + name=name, + version=version, + label=label, + ) + + @monitor_with_activity(ops_logger, "Data.Restore", ActivityType.PUBLICAPI) + def restore( + self, + name: str, + version: Optional[str] = None, + label: Optional[str] = None, + # pylint:disable=unused-argument + **kwargs: Any, + ) -> None: + """Restore an archived data asset. + + :param name: Name of data asset. + :type name: str + :param version: Version of data asset. + :type version: str + :param label: Label of the data asset. (mutually exclusive with version) + :type label: str + :return: None + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START data_operations_restore] + :end-before: [END data_operations_restore] + :language: python + :dedent: 8 + :caption: Restore data asset example. + """ + + _archive_or_restore( + asset_operations=self, + version_operation=self._operation, + container_operation=self._container_operation, + is_archived=False, + name=name, + version=version, + label=label, + ) + + def _get_latest_version(self, name: str) -> Data: + """Returns the latest version of the asset with the given name. Latest is defined as the most recently created, + not the most recently updated. + + :param name: The asset name + :type name: str + :return: The latest asset + :rtype: Data + """ + latest_version = _get_latest_version_from_container( + name, + self._container_operation, + self._resource_group_name, + self._workspace_name, + self._registry_name, + ) + return self.get(name, version=latest_version) + + @monitor_with_activity(ops_logger, "data.Share", ActivityType.PUBLICAPI) + @experimental + def share( + self, + name: str, + version: str, + *, + share_with_name: str, + share_with_version: str, + registry_name: str, + **kwargs: Any, + ) -> Data: + """Share a data asset from workspace to registry. + + :param name: Name of data asset. + :type name: str + :param version: Version of data asset. + :type version: str + :keyword share_with_name: Name of data asset to share with. + :paramtype share_with_name: str + :keyword share_with_version: Version of data asset to share with. + :paramtype share_with_version: str + :keyword registry_name: Name of the destination registry. + :paramtype registry_name: str + :return: Data asset object. + :rtype: ~azure.ai.ml.entities.Data + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START data_operations_share] + :end-before: [END data_operations_share] + :language: python + :dedent: 8 + :caption: Share data asset example. + """ + + # Get workspace info to get workspace GUID + workspace = self._service_client.workspaces.get( + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + **kwargs, + ) + workspace_guid = workspace.workspace_id + workspace_location = workspace.location + + # Get data asset ID + asset_id = ASSET_ID_FORMAT.format( + workspace_location, + workspace_guid, + AzureMLResourceType.DATA, + name, + version, + ) + + data_ref = WorkspaceAssetReference( + name=share_with_name if share_with_name else name, + version=share_with_version if share_with_version else version, + asset_id=asset_id, + ) + + with self._set_registry_client(registry_name): + return self.create_or_update(data_ref) + + @monitor_with_activity(ops_logger, "data.Mount", ActivityType.PUBLICAPI) + @experimental + def mount( + self, + path: str, + *, + mount_point: Optional[str] = None, + mode: str = "ro_mount", + debug: bool = False, + persistent: bool = False, + **kwargs, + ) -> None: + """Mount a data asset to a local path, so that you can access data inside it + under a local path with any tools of your choice. + + :param path: The data asset path to mount, in the form of `azureml:<name>` or `azureml:<name>:<version>`. + :type path: str + :keyword mount_point: A local path used as mount point. + :type mount_point: str + :keyword mode: Mount mode. Only `ro_mount` (read-only) is supported for data asset mount. + :type mode: str + :keyword debug: Whether to enable verbose logging. + :type debug: bool + :keyword persistent: Whether to persist the mount after reboot. Applies only when running on Compute Instance, + where the 'CI_NAME' environment variable is set." + :type persistent: bool + :return: None + """ + + assert mode in ["ro_mount", "rw_mount"], "mode should be either `ro_mount` or `rw_mount`" + read_only = mode == "ro_mount" + assert read_only, "read-write mount for data asset is not supported yet" + + ci_name = os.environ.get("CI_NAME") + assert not persistent or ( + persistent and ci_name is not None + ), "persistent mount is only supported on Compute Instance, where the 'CI_NAME' environment variable is set." + + try: + from azureml.dataprep import rslex_fuse_subprocess_wrapper + except ImportError as exc: + raise MlException( + "Mount operations requires package azureml-dataprep-rslex installed. " + + "You can install it with Azure ML SDK with `pip install azure-ai-ml[mount]`." + ) from exc + + uri = rslex_fuse_subprocess_wrapper.build_data_asset_uri( + self._operation_scope._subscription_id, self._resource_group_name, self._workspace_name, path + ) + if persistent and ci_name is not None: + mount_name = f"unified_mount_{str(uuid.uuid4()).replace('-', '')}" + self._compute_operation.update_data_mounts( + self._resource_group_name, + self._workspace_name, + ci_name, + [ + ComputeInstanceDataMount( + source=uri, + source_type="URI", + mount_name=mount_name, + mount_action="Mount", + mount_path=mount_point or "", + ) + ], + api_version="2021-01-01", + **kwargs, + ) + print(f"Mount requested [name: {mount_name}]. Waiting for completion ...") + while True: + compute = self._compute_operation.get(self._resource_group_name, self._workspace_name, ci_name) + mounts = compute.properties.properties.data_mounts + try: + mount = [mount for mount in mounts if mount.mount_name == mount_name][0] + if mount.mount_state == "Mounted": + print(f"Mounted [name: {mount_name}].") + break + if mount.mount_state == "MountRequested": + pass + elif mount.mount_state == "MountFailed": + msg = f"Mount failed [name: {mount_name}]: {mount.error}" + raise MlException(message=msg, no_personal_data_message=msg) + else: + msg = f"Got unexpected mount state [name: {mount_name}]: {mount.mount_state}" + raise MlException(message=msg, no_personal_data_message=msg) + except IndexError: + pass + time.sleep(5) + + else: + rslex_fuse_subprocess_wrapper.start_fuse_mount_subprocess( + uri, mount_point, read_only, debug, credential=self._operation._config.credential + ) + + @contextmanager + # pylint: disable-next=docstring-missing-return,docstring-missing-rtype + def _set_registry_client(self, registry_name: str) -> Generator: + """Sets the registry client for the data operations. + + :param registry_name: Name of the registry. + :type registry_name: str + """ + rg_ = self._operation_scope._resource_group_name + sub_ = self._operation_scope._subscription_id + registry_ = self._operation_scope.registry_name + client_ = self._service_client + data_versions_operation_ = self._operation + + try: + _client, _rg, _sub = get_registry_client(self._service_client._config.credential, registry_name) + self._operation_scope.registry_name = registry_name + self._operation_scope._resource_group_name = _rg + self._operation_scope._subscription_id = _sub + self._service_client = _client + self._operation = _client.data_versions + yield + finally: + self._operation_scope.registry_name = registry_ + self._operation_scope._resource_group_name = rg_ + self._operation_scope._subscription_id = sub_ + self._service_client = client_ + self._operation = data_versions_operation_ + + +def _assert_local_path_matches_asset_type( + local_path: str, + asset_type: str, +) -> None: + # assert file system type matches asset type + if asset_type == AssetTypes.URI_FOLDER and not os.path.isdir(local_path): + raise ValidationException( + message="File path does not match asset type {}: {}".format(asset_type, local_path), + no_personal_data_message="File path does not match asset type {}".format(asset_type), + target=ErrorTarget.DATA, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND, + ) + if asset_type == AssetTypes.URI_FILE and not os.path.isfile(local_path): + raise ValidationException( + message="File path does not match asset type {}: {}".format(asset_type, local_path), + no_personal_data_message="File path does not match asset type {}".format(asset_type), + target=ErrorTarget.DATA, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND, + ) |