aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py891
1 files changed, 891 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py
new file mode 100644
index 00000000..a54b1739
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py
@@ -0,0 +1,891 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access,no-value-for-parameter
+
+import os
+import time
+import uuid
+from contextlib import contextmanager
+from pathlib import Path
+from typing import Any, Dict, Generator, Iterable, List, Optional, Union, cast
+
+from marshmallow.exceptions import ValidationError as SchemaValidationError
+
+from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_path
+from azure.ai.ml._artifacts._constants import (
+ ASSET_PATH_ERROR,
+ CHANGED_ASSET_PATH_MSG,
+ CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
+)
+from azure.ai.ml._exception_helper import log_and_raise_error
+from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import (
+ AzureMachineLearningWorkspaces as ServiceClient102021Dataplane,
+)
+from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042023_preview
+from azure.ai.ml._restclient.v2023_04_01_preview.models import ListViewType
+from azure.ai.ml._restclient.v2024_01_01_preview import AzureMachineLearningWorkspaces as ServiceClient012024_preview
+from azure.ai.ml._restclient.v2024_01_01_preview.models import ComputeInstanceDataMount
+from azure.ai.ml._scope_dependent_operations import (
+ OperationConfig,
+ OperationsContainer,
+ OperationScope,
+ _ScopeDependentOperations,
+)
+from azure.ai.ml._telemetry import ActivityType, monitor_with_activity
+from azure.ai.ml._utils._asset_utils import (
+ _archive_or_restore,
+ _check_or_modify_auto_delete_setting,
+ _create_or_update_autoincrement,
+ _get_latest_version_from_container,
+ _resolve_label_to_asset,
+ _validate_auto_delete_setting_in_data_output,
+ _validate_workspace_managed_datastore,
+)
+from azure.ai.ml._utils._data_utils import (
+ download_mltable_metadata_schema,
+ read_local_mltable_metadata_contents,
+ read_remote_mltable_metadata_contents,
+ validate_mltable_metadata,
+)
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml._utils._http_utils import HttpPipeline
+from azure.ai.ml._utils._logger_utils import OpsLogger
+from azure.ai.ml._utils._registry_utils import (
+ get_asset_body_for_registry_storage,
+ get_registry_client,
+ get_sas_uri_for_registry_asset,
+)
+from azure.ai.ml._utils.utils import is_url
+from azure.ai.ml.constants._common import (
+ ASSET_ID_FORMAT,
+ MLTABLE_METADATA_SCHEMA_URL_FALLBACK,
+ AssetTypes,
+ AzureMLResourceType,
+)
+from azure.ai.ml.data_transfer import import_data as import_data_func
+from azure.ai.ml.entities import PipelineJob, PipelineJobSettings
+from azure.ai.ml.entities._assets import Data, WorkspaceAssetReference
+from azure.ai.ml.entities._data.mltable_metadata import MLTableMetadata
+from azure.ai.ml.entities._data_import.data_import import DataImport
+from azure.ai.ml.entities._inputs_outputs import Output
+from azure.ai.ml.entities._inputs_outputs.external_data import Database
+from azure.ai.ml.exceptions import (
+ AssetPathException,
+ ErrorCategory,
+ ErrorTarget,
+ MlException,
+ ValidationErrorType,
+ ValidationException,
+)
+from azure.ai.ml.operations._datastore_operations import DatastoreOperations
+from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
+from azure.core.paging import ItemPaged
+
+ops_logger = OpsLogger(__name__)
+module_logger = ops_logger.module_logger
+
+
+class DataOperations(_ScopeDependentOperations):
+ """DataOperations.
+
+ You should not instantiate this class directly. Instead, you should
+ create an MLClient instance that instantiates it for you and
+ attaches it as an attribute.
+
+ :param operation_scope: Scope variables for the operations classes of an MLClient object.
+ :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
+ :param operation_config: Common configuration for operations classes of an MLClient object.
+ :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
+ :param service_client: Service client to allow end users to operate on Azure Machine Learning Workspace
+ resources (ServiceClient042023Preview or ServiceClient102021Dataplane).
+ :type service_client: typing.Union[
+ ~azure.ai.ml._restclient.v2023_04_01_preview._azure_machine_learning_workspaces.AzureMachineLearningWorkspaces,
+ ~azure.ai.ml._restclient.v2021_10_01_dataplanepreview._azure_machine_learning_workspaces.
+ AzureMachineLearningWorkspaces]
+ :param datastore_operations: Represents a client for performing operations on Datastores.
+ :type datastore_operations: ~azure.ai.ml.operations._datastore_operations.DatastoreOperations
+ """
+
+ def __init__(
+ self,
+ operation_scope: OperationScope,
+ operation_config: OperationConfig,
+ service_client: Union[ServiceClient042023_preview, ServiceClient102021Dataplane],
+ service_client_012024_preview: ServiceClient012024_preview,
+ datastore_operations: DatastoreOperations,
+ **kwargs: Any,
+ ):
+ super(DataOperations, self).__init__(operation_scope, operation_config)
+ ops_logger.update_filter()
+ self._operation = service_client.data_versions
+ self._container_operation = service_client.data_containers
+ self._datastore_operation = datastore_operations
+ self._compute_operation = service_client_012024_preview.compute
+ self._service_client = service_client
+ self._init_kwargs = kwargs
+ self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline")
+ self._all_operations: OperationsContainer = kwargs.pop("all_operations")
+ # Maps a label to a function which given an asset name,
+ # returns the asset associated with the label
+ self._managed_label_resolver = {"latest": self._get_latest_version}
+
+ @monitor_with_activity(ops_logger, "Data.List", ActivityType.PUBLICAPI)
+ def list(
+ self,
+ name: Optional[str] = None,
+ *,
+ list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
+ ) -> ItemPaged[Data]:
+ """List the data assets of the workspace.
+
+ :param name: Name of a specific data asset, optional.
+ :type name: Optional[str]
+ :keyword list_view_type: View type for including/excluding (for example) archived data assets.
+ Default: ACTIVE_ONLY.
+ :type list_view_type: Optional[ListViewType]
+ :return: An iterator like instance of Data objects
+ :rtype: ~azure.core.paging.ItemPaged[Data]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START data_operations_list]
+ :end-before: [END data_operations_list]
+ :language: python
+ :dedent: 8
+ :caption: List data assets example.
+ """
+ if name:
+ return (
+ self._operation.list(
+ name=name,
+ registry_name=self._registry_name,
+ cls=lambda objs: [Data._from_rest_object(obj) for obj in objs],
+ list_view_type=list_view_type,
+ **self._scope_kwargs,
+ )
+ if self._registry_name
+ else self._operation.list(
+ name=name,
+ workspace_name=self._workspace_name,
+ cls=lambda objs: [Data._from_rest_object(obj) for obj in objs],
+ list_view_type=list_view_type,
+ **self._scope_kwargs,
+ )
+ )
+ return (
+ self._container_operation.list(
+ registry_name=self._registry_name,
+ cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs],
+ list_view_type=list_view_type,
+ **self._scope_kwargs,
+ )
+ if self._registry_name
+ else self._container_operation.list(
+ workspace_name=self._workspace_name,
+ cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs],
+ list_view_type=list_view_type,
+ **self._scope_kwargs,
+ )
+ )
+
+ def _get(self, name: Optional[str], version: Optional[str] = None) -> Data:
+ if version:
+ return (
+ self._operation.get(
+ name=name,
+ version=version,
+ registry_name=self._registry_name,
+ **self._scope_kwargs,
+ **self._init_kwargs,
+ )
+ if self._registry_name
+ else self._operation.get(
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ name=name,
+ version=version,
+ **self._init_kwargs,
+ )
+ )
+ return (
+ self._container_operation.get(
+ name=name,
+ registry_name=self._registry_name,
+ **self._scope_kwargs,
+ **self._init_kwargs,
+ )
+ if self._registry_name
+ else self._container_operation.get(
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ name=name,
+ **self._init_kwargs,
+ )
+ )
+
+ @monitor_with_activity(ops_logger, "Data.Get", ActivityType.PUBLICAPI)
+ def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Data: # type: ignore
+ """Get the specified data asset.
+
+ :param name: Name of data asset.
+ :type name: str
+ :param version: Version of data asset.
+ :type version: str
+ :param label: Label of the data asset. (mutually exclusive with version)
+ :type label: str
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Data cannot be successfully
+ identified and retrieved. Details will be provided in the error message.
+ :return: Data asset object.
+ :rtype: ~azure.ai.ml.entities.Data
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START data_operations_get]
+ :end-before: [END data_operations_get]
+ :language: python
+ :dedent: 8
+ :caption: Get data assets example.
+ """
+ try:
+ if version and label:
+ msg = "Cannot specify both version and label."
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.DATA,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.INVALID_VALUE,
+ )
+
+ if label:
+ return _resolve_label_to_asset(self, name, label)
+
+ if not version:
+ msg = "Must provide either version or label."
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.DATA,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.MISSING_FIELD,
+ )
+ data_version_resource = self._get(name, version)
+ return Data._from_rest_object(data_version_resource)
+ except (ValidationException, SchemaValidationError) as ex:
+ log_and_raise_error(ex)
+
+ @monitor_with_activity(ops_logger, "Data.CreateOrUpdate", ActivityType.PUBLICAPI)
+ def create_or_update(self, data: Data) -> Data:
+ """Returns created or updated data asset.
+
+ If not already in storage, asset will be uploaded to the workspace's blob storage.
+
+ :param data: Data asset object.
+ :type data: azure.ai.ml.entities.Data
+ :raises ~azure.ai.ml.exceptions.AssetPathException: Raised when the Data artifact path is
+ already linked to another asset
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Data cannot be successfully validated.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory.
+ :return: Data asset object.
+ :rtype: ~azure.ai.ml.entities.Data
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START data_operations_create_or_update]
+ :end-before: [END data_operations_create_or_update]
+ :language: python
+ :dedent: 8
+ :caption: Create data assets example.
+ """
+ try:
+ name = data.name
+ if not data.version and self._registry_name:
+ msg = "Data asset version is required for registry"
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.DATA,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.MISSING_FIELD,
+ )
+ version = data.version
+
+ sas_uri = None
+ if self._registry_name:
+ # If the data asset is a workspace asset, promote to registry
+ if isinstance(data, WorkspaceAssetReference):
+ try:
+ self._operation.get(
+ name=data.name,
+ version=data.version,
+ resource_group_name=self._resource_group_name,
+ registry_name=self._registry_name,
+ )
+ except Exception as err: # pylint: disable=W0718
+ if isinstance(err, ResourceNotFoundError):
+ pass
+ else:
+ raise err
+ else:
+ msg = "An data asset with this name and version already exists in registry"
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.DATA,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ data_res_obj = data._to_rest_object()
+ result = self._service_client.resource_management_asset_reference.begin_import_method(
+ resource_group_name=self._resource_group_name,
+ registry_name=self._registry_name,
+ body=data_res_obj,
+ ).result()
+
+ if not result:
+ data_res_obj = self._get(name=data.name, version=data.version)
+ return Data._from_rest_object(data_res_obj)
+
+ sas_uri = get_sas_uri_for_registry_asset(
+ service_client=self._service_client,
+ name=name,
+ version=version,
+ resource_group=self._resource_group_name,
+ registry=self._registry_name,
+ body=get_asset_body_for_registry_storage(self._registry_name, "data", name, version),
+ )
+
+ referenced_uris = self._validate(data)
+ if referenced_uris:
+ data._referenced_uris = referenced_uris
+
+ data, _ = _check_and_upload_path(
+ artifact=data,
+ asset_operations=self,
+ sas_uri=sas_uri,
+ artifact_type=ErrorTarget.DATA,
+ show_progress=self._show_progress,
+ )
+
+ _check_or_modify_auto_delete_setting(data.auto_delete_setting)
+
+ data_version_resource = data._to_rest_object()
+ auto_increment_version = data._auto_increment_version
+
+ if auto_increment_version:
+ result = _create_or_update_autoincrement(
+ name=data.name,
+ body=data_version_resource,
+ version_operation=self._operation,
+ container_operation=self._container_operation,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._init_kwargs,
+ )
+ else:
+ result = (
+ self._operation.begin_create_or_update(
+ name=name,
+ version=version,
+ registry_name=self._registry_name,
+ body=data_version_resource,
+ **self._scope_kwargs,
+ ).result()
+ if self._registry_name
+ else self._operation.create_or_update(
+ name=name,
+ version=version,
+ workspace_name=self._workspace_name,
+ body=data_version_resource,
+ **self._scope_kwargs,
+ )
+ )
+
+ if not result and self._registry_name:
+ result = self._get(name=name, version=version)
+
+ return Data._from_rest_object(result)
+ except Exception as ex:
+ if isinstance(ex, (ValidationException, SchemaValidationError)):
+ log_and_raise_error(ex)
+ elif isinstance(ex, HttpResponseError):
+ # service side raises an exception if we attempt to update an existing asset's asset path
+ if str(ex) == ASSET_PATH_ERROR:
+ raise AssetPathException(
+ message=CHANGED_ASSET_PATH_MSG,
+ tartget=ErrorTarget.DATA,
+ no_personal_data_message=CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
+ error_category=ErrorCategory.USER_ERROR,
+ ) from ex
+ raise ex
+
+ @monitor_with_activity(ops_logger, "Data.ImportData", ActivityType.PUBLICAPI)
+ @experimental
+ def import_data(self, data_import: DataImport, **kwargs: Any) -> PipelineJob:
+ """Returns the data import job that is creating the data asset.
+
+ :param data_import: DataImport object.
+ :type data_import: azure.ai.ml.entities.DataImport
+ :return: data import job object.
+ :rtype: ~azure.ai.ml.entities.PipelineJob
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START data_operations_import_data]
+ :end-before: [END data_operations_import_data]
+ :language: python
+ :dedent: 8
+ :caption: Import data assets example.
+ """
+
+ experiment_name = "data_import_" + str(data_import.name)
+ data_import.type = AssetTypes.MLTABLE if isinstance(data_import.source, Database) else AssetTypes.URI_FOLDER
+
+ # avoid specifying auto_delete_setting in job output now
+ _validate_auto_delete_setting_in_data_output(data_import.auto_delete_setting)
+
+ # block cumtomer specified path on managed datastore
+ data_import.path = _validate_workspace_managed_datastore(data_import.path)
+
+ if "${{name}}" not in str(data_import.path):
+ data_import.path = data_import.path.rstrip("/") + "/${{name}}" # type: ignore
+ import_job = import_data_func(
+ description=data_import.description or experiment_name,
+ display_name=experiment_name,
+ experiment_name=experiment_name,
+ compute="serverless",
+ source=data_import.source,
+ outputs={
+ "sink": Output(
+ type=data_import.type,
+ path=data_import.path, # type: ignore
+ name=data_import.name,
+ version=data_import.version,
+ )
+ },
+ )
+ import_pipeline = PipelineJob(
+ description=data_import.description or experiment_name,
+ tags=data_import.tags,
+ display_name=experiment_name,
+ experiment_name=experiment_name,
+ properties=data_import.properties or {},
+ settings=PipelineJobSettings(force_rerun=True),
+ jobs={experiment_name: import_job},
+ )
+ import_pipeline.properties["azureml.materializationAssetName"] = data_import.name
+ return self._all_operations.all_operations[AzureMLResourceType.JOB].create_or_update(
+ job=import_pipeline, skip_validation=True, **kwargs
+ )
+
+ @monitor_with_activity(ops_logger, "Data.ListMaterializationStatus", ActivityType.PUBLICAPI)
+ def list_materialization_status(
+ self,
+ name: str,
+ *,
+ list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
+ **kwargs: Any,
+ ) -> Iterable[PipelineJob]:
+ """List materialization jobs of the asset.
+
+ :param name: name of asset being created by the materialization jobs.
+ :type name: str
+ :keyword list_view_type: View type for including/excluding (for example) archived jobs. Default: ACTIVE_ONLY.
+ :paramtype list_view_type: Optional[ListViewType]
+ :return: An iterator like instance of Job objects.
+ :rtype: ~azure.core.paging.ItemPaged[PipelineJob]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START data_operations_list_materialization_status]
+ :end-before: [END data_operations_list_materialization_status]
+ :language: python
+ :dedent: 8
+ :caption: List materialization jobs example.
+ """
+
+ return cast(
+ Iterable[PipelineJob],
+ self._all_operations.all_operations[AzureMLResourceType.JOB].list(
+ job_type="Pipeline",
+ asset_name=name,
+ list_view_type=list_view_type,
+ **kwargs,
+ ),
+ )
+
+ @monitor_with_activity(ops_logger, "Data.Validate", ActivityType.INTERNALCALL)
+ def _validate(self, data: Data) -> Optional[List[str]]:
+ if not data.path:
+ msg = "Missing data path. Path is required for data."
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ error_type=ValidationErrorType.MISSING_FIELD,
+ target=ErrorTarget.DATA,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ asset_path = str(data.path)
+ asset_type = data.type
+ base_path = data.base_path
+
+ if asset_type == AssetTypes.MLTABLE:
+ if is_url(asset_path):
+ try:
+ metadata_contents = read_remote_mltable_metadata_contents(
+ base_uri=asset_path,
+ datastore_operations=self._datastore_operation,
+ requests_pipeline=self._requests_pipeline,
+ )
+ metadata_yaml_path = None
+ except Exception: # pylint: disable=W0718
+ # skip validation for remote MLTable when the contents cannot be read
+ module_logger.info("Unable to access MLTable metadata at path %s", asset_path)
+ return None
+ else:
+ metadata_contents = read_local_mltable_metadata_contents(path=asset_path)
+ metadata_yaml_path = Path(asset_path, "MLTable")
+ metadata = MLTableMetadata._load(metadata_contents, metadata_yaml_path)
+ mltable_metadata_schema = self._try_get_mltable_metadata_jsonschema(data._mltable_schema_url)
+ if mltable_metadata_schema and not data._skip_validation:
+ validate_mltable_metadata(
+ mltable_metadata_dict=metadata_contents,
+ mltable_schema=mltable_metadata_schema,
+ )
+ return cast(Optional[List[str]], metadata.referenced_uris())
+
+ if is_url(asset_path):
+ # skip validation for remote URI_FILE or URI_FOLDER
+ pass
+ elif os.path.isabs(asset_path):
+ _assert_local_path_matches_asset_type(asset_path, asset_type)
+ else:
+ abs_path = Path(base_path, asset_path).resolve()
+ _assert_local_path_matches_asset_type(str(abs_path), asset_type)
+
+ return None
+
+ def _try_get_mltable_metadata_jsonschema(self, mltable_schema_url: Optional[str]) -> Optional[Dict]:
+ if mltable_schema_url is None:
+ mltable_schema_url = MLTABLE_METADATA_SCHEMA_URL_FALLBACK
+ try:
+ return cast(Optional[Dict], download_mltable_metadata_schema(mltable_schema_url, self._requests_pipeline))
+ except Exception: # pylint: disable=W0718
+ module_logger.info(
+ 'Failed to download MLTable metadata jsonschema from "%s", skipping validation',
+ mltable_schema_url,
+ )
+ return None
+
+ @monitor_with_activity(ops_logger, "Data.Archive", ActivityType.PUBLICAPI)
+ def archive(
+ self,
+ name: str,
+ version: Optional[str] = None,
+ label: Optional[str] = None,
+ # pylint:disable=unused-argument
+ **kwargs: Any,
+ ) -> None:
+ """Archive a data asset.
+
+ :param name: Name of data asset.
+ :type name: str
+ :param version: Version of data asset.
+ :type version: str
+ :param label: Label of the data asset. (mutually exclusive with version)
+ :type label: str
+ :return: None
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START data_operations_archive]
+ :end-before: [END data_operations_archive]
+ :language: python
+ :dedent: 8
+ :caption: Archive data asset example.
+ """
+
+ _archive_or_restore(
+ asset_operations=self,
+ version_operation=self._operation,
+ container_operation=self._container_operation,
+ is_archived=True,
+ name=name,
+ version=version,
+ label=label,
+ )
+
+ @monitor_with_activity(ops_logger, "Data.Restore", ActivityType.PUBLICAPI)
+ def restore(
+ self,
+ name: str,
+ version: Optional[str] = None,
+ label: Optional[str] = None,
+ # pylint:disable=unused-argument
+ **kwargs: Any,
+ ) -> None:
+ """Restore an archived data asset.
+
+ :param name: Name of data asset.
+ :type name: str
+ :param version: Version of data asset.
+ :type version: str
+ :param label: Label of the data asset. (mutually exclusive with version)
+ :type label: str
+ :return: None
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START data_operations_restore]
+ :end-before: [END data_operations_restore]
+ :language: python
+ :dedent: 8
+ :caption: Restore data asset example.
+ """
+
+ _archive_or_restore(
+ asset_operations=self,
+ version_operation=self._operation,
+ container_operation=self._container_operation,
+ is_archived=False,
+ name=name,
+ version=version,
+ label=label,
+ )
+
+ def _get_latest_version(self, name: str) -> Data:
+ """Returns the latest version of the asset with the given name. Latest is defined as the most recently created,
+ not the most recently updated.
+
+ :param name: The asset name
+ :type name: str
+ :return: The latest asset
+ :rtype: Data
+ """
+ latest_version = _get_latest_version_from_container(
+ name,
+ self._container_operation,
+ self._resource_group_name,
+ self._workspace_name,
+ self._registry_name,
+ )
+ return self.get(name, version=latest_version)
+
+ @monitor_with_activity(ops_logger, "data.Share", ActivityType.PUBLICAPI)
+ @experimental
+ def share(
+ self,
+ name: str,
+ version: str,
+ *,
+ share_with_name: str,
+ share_with_version: str,
+ registry_name: str,
+ **kwargs: Any,
+ ) -> Data:
+ """Share a data asset from workspace to registry.
+
+ :param name: Name of data asset.
+ :type name: str
+ :param version: Version of data asset.
+ :type version: str
+ :keyword share_with_name: Name of data asset to share with.
+ :paramtype share_with_name: str
+ :keyword share_with_version: Version of data asset to share with.
+ :paramtype share_with_version: str
+ :keyword registry_name: Name of the destination registry.
+ :paramtype registry_name: str
+ :return: Data asset object.
+ :rtype: ~azure.ai.ml.entities.Data
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START data_operations_share]
+ :end-before: [END data_operations_share]
+ :language: python
+ :dedent: 8
+ :caption: Share data asset example.
+ """
+
+ # Get workspace info to get workspace GUID
+ workspace = self._service_client.workspaces.get(
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ **kwargs,
+ )
+ workspace_guid = workspace.workspace_id
+ workspace_location = workspace.location
+
+ # Get data asset ID
+ asset_id = ASSET_ID_FORMAT.format(
+ workspace_location,
+ workspace_guid,
+ AzureMLResourceType.DATA,
+ name,
+ version,
+ )
+
+ data_ref = WorkspaceAssetReference(
+ name=share_with_name if share_with_name else name,
+ version=share_with_version if share_with_version else version,
+ asset_id=asset_id,
+ )
+
+ with self._set_registry_client(registry_name):
+ return self.create_or_update(data_ref)
+
+ @monitor_with_activity(ops_logger, "data.Mount", ActivityType.PUBLICAPI)
+ @experimental
+ def mount(
+ self,
+ path: str,
+ *,
+ mount_point: Optional[str] = None,
+ mode: str = "ro_mount",
+ debug: bool = False,
+ persistent: bool = False,
+ **kwargs,
+ ) -> None:
+ """Mount a data asset to a local path, so that you can access data inside it
+ under a local path with any tools of your choice.
+
+ :param path: The data asset path to mount, in the form of `azureml:<name>` or `azureml:<name>:<version>`.
+ :type path: str
+ :keyword mount_point: A local path used as mount point.
+ :type mount_point: str
+ :keyword mode: Mount mode. Only `ro_mount` (read-only) is supported for data asset mount.
+ :type mode: str
+ :keyword debug: Whether to enable verbose logging.
+ :type debug: bool
+ :keyword persistent: Whether to persist the mount after reboot. Applies only when running on Compute Instance,
+ where the 'CI_NAME' environment variable is set."
+ :type persistent: bool
+ :return: None
+ """
+
+ assert mode in ["ro_mount", "rw_mount"], "mode should be either `ro_mount` or `rw_mount`"
+ read_only = mode == "ro_mount"
+ assert read_only, "read-write mount for data asset is not supported yet"
+
+ ci_name = os.environ.get("CI_NAME")
+ assert not persistent or (
+ persistent and ci_name is not None
+ ), "persistent mount is only supported on Compute Instance, where the 'CI_NAME' environment variable is set."
+
+ try:
+ from azureml.dataprep import rslex_fuse_subprocess_wrapper
+ except ImportError as exc:
+ raise MlException(
+ "Mount operations requires package azureml-dataprep-rslex installed. "
+ + "You can install it with Azure ML SDK with `pip install azure-ai-ml[mount]`."
+ ) from exc
+
+ uri = rslex_fuse_subprocess_wrapper.build_data_asset_uri(
+ self._operation_scope._subscription_id, self._resource_group_name, self._workspace_name, path
+ )
+ if persistent and ci_name is not None:
+ mount_name = f"unified_mount_{str(uuid.uuid4()).replace('-', '')}"
+ self._compute_operation.update_data_mounts(
+ self._resource_group_name,
+ self._workspace_name,
+ ci_name,
+ [
+ ComputeInstanceDataMount(
+ source=uri,
+ source_type="URI",
+ mount_name=mount_name,
+ mount_action="Mount",
+ mount_path=mount_point or "",
+ )
+ ],
+ api_version="2021-01-01",
+ **kwargs,
+ )
+ print(f"Mount requested [name: {mount_name}]. Waiting for completion ...")
+ while True:
+ compute = self._compute_operation.get(self._resource_group_name, self._workspace_name, ci_name)
+ mounts = compute.properties.properties.data_mounts
+ try:
+ mount = [mount for mount in mounts if mount.mount_name == mount_name][0]
+ if mount.mount_state == "Mounted":
+ print(f"Mounted [name: {mount_name}].")
+ break
+ if mount.mount_state == "MountRequested":
+ pass
+ elif mount.mount_state == "MountFailed":
+ msg = f"Mount failed [name: {mount_name}]: {mount.error}"
+ raise MlException(message=msg, no_personal_data_message=msg)
+ else:
+ msg = f"Got unexpected mount state [name: {mount_name}]: {mount.mount_state}"
+ raise MlException(message=msg, no_personal_data_message=msg)
+ except IndexError:
+ pass
+ time.sleep(5)
+
+ else:
+ rslex_fuse_subprocess_wrapper.start_fuse_mount_subprocess(
+ uri, mount_point, read_only, debug, credential=self._operation._config.credential
+ )
+
+ @contextmanager
+ # pylint: disable-next=docstring-missing-return,docstring-missing-rtype
+ def _set_registry_client(self, registry_name: str) -> Generator:
+ """Sets the registry client for the data operations.
+
+ :param registry_name: Name of the registry.
+ :type registry_name: str
+ """
+ rg_ = self._operation_scope._resource_group_name
+ sub_ = self._operation_scope._subscription_id
+ registry_ = self._operation_scope.registry_name
+ client_ = self._service_client
+ data_versions_operation_ = self._operation
+
+ try:
+ _client, _rg, _sub = get_registry_client(self._service_client._config.credential, registry_name)
+ self._operation_scope.registry_name = registry_name
+ self._operation_scope._resource_group_name = _rg
+ self._operation_scope._subscription_id = _sub
+ self._service_client = _client
+ self._operation = _client.data_versions
+ yield
+ finally:
+ self._operation_scope.registry_name = registry_
+ self._operation_scope._resource_group_name = rg_
+ self._operation_scope._subscription_id = sub_
+ self._service_client = client_
+ self._operation = data_versions_operation_
+
+
+def _assert_local_path_matches_asset_type(
+ local_path: str,
+ asset_type: str,
+) -> None:
+ # assert file system type matches asset type
+ if asset_type == AssetTypes.URI_FOLDER and not os.path.isdir(local_path):
+ raise ValidationException(
+ message="File path does not match asset type {}: {}".format(asset_type, local_path),
+ no_personal_data_message="File path does not match asset type {}".format(asset_type),
+ target=ErrorTarget.DATA,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+ )
+ if asset_type == AssetTypes.URI_FILE and not os.path.isfile(local_path):
+ raise ValidationException(
+ message="File path does not match asset type {}: {}".format(asset_type, local_path),
+ no_personal_data_message="File path does not match asset type {}".format(asset_type),
+ target=ErrorTarget.DATA,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+ )