about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py891
1 files changed, 891 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py
new file mode 100644
index 00000000..a54b1739
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_data_operations.py
@@ -0,0 +1,891 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access,no-value-for-parameter
+
+import os
+import time
+import uuid
+from contextlib import contextmanager
+from pathlib import Path
+from typing import Any, Dict, Generator, Iterable, List, Optional, Union, cast
+
+from marshmallow.exceptions import ValidationError as SchemaValidationError
+
+from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_path
+from azure.ai.ml._artifacts._constants import (
+    ASSET_PATH_ERROR,
+    CHANGED_ASSET_PATH_MSG,
+    CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
+)
+from azure.ai.ml._exception_helper import log_and_raise_error
+from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import (
+    AzureMachineLearningWorkspaces as ServiceClient102021Dataplane,
+)
+from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042023_preview
+from azure.ai.ml._restclient.v2023_04_01_preview.models import ListViewType
+from azure.ai.ml._restclient.v2024_01_01_preview import AzureMachineLearningWorkspaces as ServiceClient012024_preview
+from azure.ai.ml._restclient.v2024_01_01_preview.models import ComputeInstanceDataMount
+from azure.ai.ml._scope_dependent_operations import (
+    OperationConfig,
+    OperationsContainer,
+    OperationScope,
+    _ScopeDependentOperations,
+)
+from azure.ai.ml._telemetry import ActivityType, monitor_with_activity
+from azure.ai.ml._utils._asset_utils import (
+    _archive_or_restore,
+    _check_or_modify_auto_delete_setting,
+    _create_or_update_autoincrement,
+    _get_latest_version_from_container,
+    _resolve_label_to_asset,
+    _validate_auto_delete_setting_in_data_output,
+    _validate_workspace_managed_datastore,
+)
+from azure.ai.ml._utils._data_utils import (
+    download_mltable_metadata_schema,
+    read_local_mltable_metadata_contents,
+    read_remote_mltable_metadata_contents,
+    validate_mltable_metadata,
+)
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml._utils._http_utils import HttpPipeline
+from azure.ai.ml._utils._logger_utils import OpsLogger
+from azure.ai.ml._utils._registry_utils import (
+    get_asset_body_for_registry_storage,
+    get_registry_client,
+    get_sas_uri_for_registry_asset,
+)
+from azure.ai.ml._utils.utils import is_url
+from azure.ai.ml.constants._common import (
+    ASSET_ID_FORMAT,
+    MLTABLE_METADATA_SCHEMA_URL_FALLBACK,
+    AssetTypes,
+    AzureMLResourceType,
+)
+from azure.ai.ml.data_transfer import import_data as import_data_func
+from azure.ai.ml.entities import PipelineJob, PipelineJobSettings
+from azure.ai.ml.entities._assets import Data, WorkspaceAssetReference
+from azure.ai.ml.entities._data.mltable_metadata import MLTableMetadata
+from azure.ai.ml.entities._data_import.data_import import DataImport
+from azure.ai.ml.entities._inputs_outputs import Output
+from azure.ai.ml.entities._inputs_outputs.external_data import Database
+from azure.ai.ml.exceptions import (
+    AssetPathException,
+    ErrorCategory,
+    ErrorTarget,
+    MlException,
+    ValidationErrorType,
+    ValidationException,
+)
+from azure.ai.ml.operations._datastore_operations import DatastoreOperations
+from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
+from azure.core.paging import ItemPaged
+
+ops_logger = OpsLogger(__name__)
+module_logger = ops_logger.module_logger
+
+
+class DataOperations(_ScopeDependentOperations):
+    """DataOperations.
+
+    You should not instantiate this class directly. Instead, you should
+    create an MLClient instance that instantiates it for you and
+    attaches it as an attribute.
+
+    :param operation_scope: Scope variables for the operations classes of an MLClient object.
+    :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
+    :param operation_config: Common configuration for operations classes of an MLClient object.
+    :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
+    :param service_client: Service client to allow end users to operate on Azure Machine Learning Workspace
+        resources (ServiceClient042023Preview or ServiceClient102021Dataplane).
+    :type service_client: typing.Union[
+        ~azure.ai.ml._restclient.v2023_04_01_preview._azure_machine_learning_workspaces.AzureMachineLearningWorkspaces,
+        ~azure.ai.ml._restclient.v2021_10_01_dataplanepreview._azure_machine_learning_workspaces.
+        AzureMachineLearningWorkspaces]
+    :param datastore_operations: Represents a client for performing operations on Datastores.
+    :type datastore_operations: ~azure.ai.ml.operations._datastore_operations.DatastoreOperations
+    """
+
+    def __init__(
+        self,
+        operation_scope: OperationScope,
+        operation_config: OperationConfig,
+        service_client: Union[ServiceClient042023_preview, ServiceClient102021Dataplane],
+        service_client_012024_preview: ServiceClient012024_preview,
+        datastore_operations: DatastoreOperations,
+        **kwargs: Any,
+    ):
+        super(DataOperations, self).__init__(operation_scope, operation_config)
+        ops_logger.update_filter()
+        self._operation = service_client.data_versions
+        self._container_operation = service_client.data_containers
+        self._datastore_operation = datastore_operations
+        self._compute_operation = service_client_012024_preview.compute
+        self._service_client = service_client
+        self._init_kwargs = kwargs
+        self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline")
+        self._all_operations: OperationsContainer = kwargs.pop("all_operations")
+        # Maps a label to a function which given an asset name,
+        # returns the asset associated with the label
+        self._managed_label_resolver = {"latest": self._get_latest_version}
+
+    @monitor_with_activity(ops_logger, "Data.List", ActivityType.PUBLICAPI)
+    def list(
+        self,
+        name: Optional[str] = None,
+        *,
+        list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
+    ) -> ItemPaged[Data]:
+        """List the data assets of the workspace.
+
+        :param name: Name of a specific data asset, optional.
+        :type name: Optional[str]
+        :keyword list_view_type: View type for including/excluding (for example) archived data assets.
+            Default: ACTIVE_ONLY.
+        :type list_view_type: Optional[ListViewType]
+        :return: An iterator like instance of Data objects
+        :rtype: ~azure.core.paging.ItemPaged[Data]
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START data_operations_list]
+                :end-before: [END data_operations_list]
+                :language: python
+                :dedent: 8
+                :caption: List data assets example.
+        """
+        if name:
+            return (
+                self._operation.list(
+                    name=name,
+                    registry_name=self._registry_name,
+                    cls=lambda objs: [Data._from_rest_object(obj) for obj in objs],
+                    list_view_type=list_view_type,
+                    **self._scope_kwargs,
+                )
+                if self._registry_name
+                else self._operation.list(
+                    name=name,
+                    workspace_name=self._workspace_name,
+                    cls=lambda objs: [Data._from_rest_object(obj) for obj in objs],
+                    list_view_type=list_view_type,
+                    **self._scope_kwargs,
+                )
+            )
+        return (
+            self._container_operation.list(
+                registry_name=self._registry_name,
+                cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs],
+                list_view_type=list_view_type,
+                **self._scope_kwargs,
+            )
+            if self._registry_name
+            else self._container_operation.list(
+                workspace_name=self._workspace_name,
+                cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs],
+                list_view_type=list_view_type,
+                **self._scope_kwargs,
+            )
+        )
+
+    def _get(self, name: Optional[str], version: Optional[str] = None) -> Data:
+        if version:
+            return (
+                self._operation.get(
+                    name=name,
+                    version=version,
+                    registry_name=self._registry_name,
+                    **self._scope_kwargs,
+                    **self._init_kwargs,
+                )
+                if self._registry_name
+                else self._operation.get(
+                    resource_group_name=self._resource_group_name,
+                    workspace_name=self._workspace_name,
+                    name=name,
+                    version=version,
+                    **self._init_kwargs,
+                )
+            )
+        return (
+            self._container_operation.get(
+                name=name,
+                registry_name=self._registry_name,
+                **self._scope_kwargs,
+                **self._init_kwargs,
+            )
+            if self._registry_name
+            else self._container_operation.get(
+                resource_group_name=self._resource_group_name,
+                workspace_name=self._workspace_name,
+                name=name,
+                **self._init_kwargs,
+            )
+        )
+
+    @monitor_with_activity(ops_logger, "Data.Get", ActivityType.PUBLICAPI)
+    def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Data:  # type: ignore
+        """Get the specified data asset.
+
+        :param name: Name of data asset.
+        :type name: str
+        :param version: Version of data asset.
+        :type version: str
+        :param label: Label of the data asset. (mutually exclusive with version)
+        :type label: str
+        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Data cannot be successfully
+            identified and retrieved. Details will be provided in the error message.
+        :return: Data asset object.
+        :rtype: ~azure.ai.ml.entities.Data
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START data_operations_get]
+                :end-before: [END data_operations_get]
+                :language: python
+                :dedent: 8
+                :caption: Get data assets example.
+        """
+        try:
+            if version and label:
+                msg = "Cannot specify both version and label."
+                raise ValidationException(
+                    message=msg,
+                    target=ErrorTarget.DATA,
+                    no_personal_data_message=msg,
+                    error_category=ErrorCategory.USER_ERROR,
+                    error_type=ValidationErrorType.INVALID_VALUE,
+                )
+
+            if label:
+                return _resolve_label_to_asset(self, name, label)
+
+            if not version:
+                msg = "Must provide either version or label."
+                raise ValidationException(
+                    message=msg,
+                    target=ErrorTarget.DATA,
+                    no_personal_data_message=msg,
+                    error_category=ErrorCategory.USER_ERROR,
+                    error_type=ValidationErrorType.MISSING_FIELD,
+                )
+            data_version_resource = self._get(name, version)
+            return Data._from_rest_object(data_version_resource)
+        except (ValidationException, SchemaValidationError) as ex:
+            log_and_raise_error(ex)
+
+    @monitor_with_activity(ops_logger, "Data.CreateOrUpdate", ActivityType.PUBLICAPI)
+    def create_or_update(self, data: Data) -> Data:
+        """Returns created or updated data asset.
+
+        If not already in storage, asset will be uploaded to the workspace's blob storage.
+
+        :param data: Data asset object.
+        :type data: azure.ai.ml.entities.Data
+        :raises ~azure.ai.ml.exceptions.AssetPathException: Raised when the Data artifact path is
+            already linked to another asset
+        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Data cannot be successfully validated.
+            Details will be provided in the error message.
+        :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory.
+        :return: Data asset object.
+        :rtype: ~azure.ai.ml.entities.Data
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START data_operations_create_or_update]
+                :end-before: [END data_operations_create_or_update]
+                :language: python
+                :dedent: 8
+                :caption: Create data assets example.
+        """
+        try:
+            name = data.name
+            if not data.version and self._registry_name:
+                msg = "Data asset version is required for registry"
+                raise ValidationException(
+                    message=msg,
+                    no_personal_data_message=msg,
+                    target=ErrorTarget.DATA,
+                    error_category=ErrorCategory.USER_ERROR,
+                    error_type=ValidationErrorType.MISSING_FIELD,
+                )
+            version = data.version
+
+            sas_uri = None
+            if self._registry_name:
+                # If the data asset is a workspace asset, promote to registry
+                if isinstance(data, WorkspaceAssetReference):
+                    try:
+                        self._operation.get(
+                            name=data.name,
+                            version=data.version,
+                            resource_group_name=self._resource_group_name,
+                            registry_name=self._registry_name,
+                        )
+                    except Exception as err:  # pylint: disable=W0718
+                        if isinstance(err, ResourceNotFoundError):
+                            pass
+                        else:
+                            raise err
+                    else:
+                        msg = "An data asset with this name and version already exists in registry"
+                        raise ValidationException(
+                            message=msg,
+                            no_personal_data_message=msg,
+                            target=ErrorTarget.DATA,
+                            error_category=ErrorCategory.USER_ERROR,
+                        )
+                    data_res_obj = data._to_rest_object()
+                    result = self._service_client.resource_management_asset_reference.begin_import_method(
+                        resource_group_name=self._resource_group_name,
+                        registry_name=self._registry_name,
+                        body=data_res_obj,
+                    ).result()
+
+                    if not result:
+                        data_res_obj = self._get(name=data.name, version=data.version)
+                        return Data._from_rest_object(data_res_obj)
+
+                sas_uri = get_sas_uri_for_registry_asset(
+                    service_client=self._service_client,
+                    name=name,
+                    version=version,
+                    resource_group=self._resource_group_name,
+                    registry=self._registry_name,
+                    body=get_asset_body_for_registry_storage(self._registry_name, "data", name, version),
+                )
+
+            referenced_uris = self._validate(data)
+            if referenced_uris:
+                data._referenced_uris = referenced_uris
+
+            data, _ = _check_and_upload_path(
+                artifact=data,
+                asset_operations=self,
+                sas_uri=sas_uri,
+                artifact_type=ErrorTarget.DATA,
+                show_progress=self._show_progress,
+            )
+
+            _check_or_modify_auto_delete_setting(data.auto_delete_setting)
+
+            data_version_resource = data._to_rest_object()
+            auto_increment_version = data._auto_increment_version
+
+            if auto_increment_version:
+                result = _create_or_update_autoincrement(
+                    name=data.name,
+                    body=data_version_resource,
+                    version_operation=self._operation,
+                    container_operation=self._container_operation,
+                    resource_group_name=self._operation_scope.resource_group_name,
+                    workspace_name=self._workspace_name,
+                    **self._init_kwargs,
+                )
+            else:
+                result = (
+                    self._operation.begin_create_or_update(
+                        name=name,
+                        version=version,
+                        registry_name=self._registry_name,
+                        body=data_version_resource,
+                        **self._scope_kwargs,
+                    ).result()
+                    if self._registry_name
+                    else self._operation.create_or_update(
+                        name=name,
+                        version=version,
+                        workspace_name=self._workspace_name,
+                        body=data_version_resource,
+                        **self._scope_kwargs,
+                    )
+                )
+
+            if not result and self._registry_name:
+                result = self._get(name=name, version=version)
+
+            return Data._from_rest_object(result)
+        except Exception as ex:
+            if isinstance(ex, (ValidationException, SchemaValidationError)):
+                log_and_raise_error(ex)
+            elif isinstance(ex, HttpResponseError):
+                # service side raises an exception if we attempt to update an existing asset's asset path
+                if str(ex) == ASSET_PATH_ERROR:
+                    raise AssetPathException(
+                        message=CHANGED_ASSET_PATH_MSG,
+                        tartget=ErrorTarget.DATA,
+                        no_personal_data_message=CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
+                        error_category=ErrorCategory.USER_ERROR,
+                    ) from ex
+            raise ex
+
+    @monitor_with_activity(ops_logger, "Data.ImportData", ActivityType.PUBLICAPI)
+    @experimental
+    def import_data(self, data_import: DataImport, **kwargs: Any) -> PipelineJob:
+        """Returns the data import job that is creating the data asset.
+
+        :param data_import: DataImport object.
+        :type data_import: azure.ai.ml.entities.DataImport
+        :return: data import job object.
+        :rtype: ~azure.ai.ml.entities.PipelineJob
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START data_operations_import_data]
+                :end-before: [END data_operations_import_data]
+                :language: python
+                :dedent: 8
+                :caption: Import data assets example.
+        """
+
+        experiment_name = "data_import_" + str(data_import.name)
+        data_import.type = AssetTypes.MLTABLE if isinstance(data_import.source, Database) else AssetTypes.URI_FOLDER
+
+        # avoid specifying auto_delete_setting in job output now
+        _validate_auto_delete_setting_in_data_output(data_import.auto_delete_setting)
+
+        # block cumtomer specified path on managed datastore
+        data_import.path = _validate_workspace_managed_datastore(data_import.path)
+
+        if "${{name}}" not in str(data_import.path):
+            data_import.path = data_import.path.rstrip("/") + "/${{name}}"  # type: ignore
+        import_job = import_data_func(
+            description=data_import.description or experiment_name,
+            display_name=experiment_name,
+            experiment_name=experiment_name,
+            compute="serverless",
+            source=data_import.source,
+            outputs={
+                "sink": Output(
+                    type=data_import.type,
+                    path=data_import.path,  # type: ignore
+                    name=data_import.name,
+                    version=data_import.version,
+                )
+            },
+        )
+        import_pipeline = PipelineJob(
+            description=data_import.description or experiment_name,
+            tags=data_import.tags,
+            display_name=experiment_name,
+            experiment_name=experiment_name,
+            properties=data_import.properties or {},
+            settings=PipelineJobSettings(force_rerun=True),
+            jobs={experiment_name: import_job},
+        )
+        import_pipeline.properties["azureml.materializationAssetName"] = data_import.name
+        return self._all_operations.all_operations[AzureMLResourceType.JOB].create_or_update(
+            job=import_pipeline, skip_validation=True, **kwargs
+        )
+
+    @monitor_with_activity(ops_logger, "Data.ListMaterializationStatus", ActivityType.PUBLICAPI)
+    def list_materialization_status(
+        self,
+        name: str,
+        *,
+        list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
+        **kwargs: Any,
+    ) -> Iterable[PipelineJob]:
+        """List materialization jobs of the asset.
+
+        :param name: name of asset being created by the materialization jobs.
+        :type name: str
+        :keyword list_view_type: View type for including/excluding (for example) archived jobs. Default: ACTIVE_ONLY.
+        :paramtype list_view_type: Optional[ListViewType]
+        :return: An iterator like instance of Job objects.
+        :rtype: ~azure.core.paging.ItemPaged[PipelineJob]
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START data_operations_list_materialization_status]
+                :end-before: [END data_operations_list_materialization_status]
+                :language: python
+                :dedent: 8
+                :caption: List materialization jobs example.
+        """
+
+        return cast(
+            Iterable[PipelineJob],
+            self._all_operations.all_operations[AzureMLResourceType.JOB].list(
+                job_type="Pipeline",
+                asset_name=name,
+                list_view_type=list_view_type,
+                **kwargs,
+            ),
+        )
+
+    @monitor_with_activity(ops_logger, "Data.Validate", ActivityType.INTERNALCALL)
+    def _validate(self, data: Data) -> Optional[List[str]]:
+        if not data.path:
+            msg = "Missing data path. Path is required for data."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                error_type=ValidationErrorType.MISSING_FIELD,
+                target=ErrorTarget.DATA,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+        asset_path = str(data.path)
+        asset_type = data.type
+        base_path = data.base_path
+
+        if asset_type == AssetTypes.MLTABLE:
+            if is_url(asset_path):
+                try:
+                    metadata_contents = read_remote_mltable_metadata_contents(
+                        base_uri=asset_path,
+                        datastore_operations=self._datastore_operation,
+                        requests_pipeline=self._requests_pipeline,
+                    )
+                    metadata_yaml_path = None
+                except Exception:  # pylint: disable=W0718
+                    # skip validation for remote MLTable when the contents cannot be read
+                    module_logger.info("Unable to access MLTable metadata at path %s", asset_path)
+                    return None
+            else:
+                metadata_contents = read_local_mltable_metadata_contents(path=asset_path)
+                metadata_yaml_path = Path(asset_path, "MLTable")
+            metadata = MLTableMetadata._load(metadata_contents, metadata_yaml_path)
+            mltable_metadata_schema = self._try_get_mltable_metadata_jsonschema(data._mltable_schema_url)
+            if mltable_metadata_schema and not data._skip_validation:
+                validate_mltable_metadata(
+                    mltable_metadata_dict=metadata_contents,
+                    mltable_schema=mltable_metadata_schema,
+                )
+            return cast(Optional[List[str]], metadata.referenced_uris())
+
+        if is_url(asset_path):
+            # skip validation for remote URI_FILE or URI_FOLDER
+            pass
+        elif os.path.isabs(asset_path):
+            _assert_local_path_matches_asset_type(asset_path, asset_type)
+        else:
+            abs_path = Path(base_path, asset_path).resolve()
+            _assert_local_path_matches_asset_type(str(abs_path), asset_type)
+
+        return None
+
+    def _try_get_mltable_metadata_jsonschema(self, mltable_schema_url: Optional[str]) -> Optional[Dict]:
+        if mltable_schema_url is None:
+            mltable_schema_url = MLTABLE_METADATA_SCHEMA_URL_FALLBACK
+        try:
+            return cast(Optional[Dict], download_mltable_metadata_schema(mltable_schema_url, self._requests_pipeline))
+        except Exception:  # pylint: disable=W0718
+            module_logger.info(
+                'Failed to download MLTable metadata jsonschema from "%s", skipping validation',
+                mltable_schema_url,
+            )
+            return None
+
+    @monitor_with_activity(ops_logger, "Data.Archive", ActivityType.PUBLICAPI)
+    def archive(
+        self,
+        name: str,
+        version: Optional[str] = None,
+        label: Optional[str] = None,
+        # pylint:disable=unused-argument
+        **kwargs: Any,
+    ) -> None:
+        """Archive a data asset.
+
+        :param name: Name of data asset.
+        :type name: str
+        :param version: Version of data asset.
+        :type version: str
+        :param label: Label of the data asset. (mutually exclusive with version)
+        :type label: str
+        :return: None
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START data_operations_archive]
+                :end-before: [END data_operations_archive]
+                :language: python
+                :dedent: 8
+                :caption: Archive data asset example.
+        """
+
+        _archive_or_restore(
+            asset_operations=self,
+            version_operation=self._operation,
+            container_operation=self._container_operation,
+            is_archived=True,
+            name=name,
+            version=version,
+            label=label,
+        )
+
+    @monitor_with_activity(ops_logger, "Data.Restore", ActivityType.PUBLICAPI)
+    def restore(
+        self,
+        name: str,
+        version: Optional[str] = None,
+        label: Optional[str] = None,
+        # pylint:disable=unused-argument
+        **kwargs: Any,
+    ) -> None:
+        """Restore an archived data asset.
+
+        :param name: Name of data asset.
+        :type name: str
+        :param version: Version of data asset.
+        :type version: str
+        :param label: Label of the data asset. (mutually exclusive with version)
+        :type label: str
+        :return: None
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START data_operations_restore]
+                :end-before: [END data_operations_restore]
+                :language: python
+                :dedent: 8
+                :caption: Restore data asset example.
+        """
+
+        _archive_or_restore(
+            asset_operations=self,
+            version_operation=self._operation,
+            container_operation=self._container_operation,
+            is_archived=False,
+            name=name,
+            version=version,
+            label=label,
+        )
+
+    def _get_latest_version(self, name: str) -> Data:
+        """Returns the latest version of the asset with the given name. Latest is defined as the most recently created,
+         not the most recently updated.
+
+        :param name: The asset name
+        :type name: str
+        :return: The latest asset
+        :rtype: Data
+        """
+        latest_version = _get_latest_version_from_container(
+            name,
+            self._container_operation,
+            self._resource_group_name,
+            self._workspace_name,
+            self._registry_name,
+        )
+        return self.get(name, version=latest_version)
+
+    @monitor_with_activity(ops_logger, "data.Share", ActivityType.PUBLICAPI)
+    @experimental
+    def share(
+        self,
+        name: str,
+        version: str,
+        *,
+        share_with_name: str,
+        share_with_version: str,
+        registry_name: str,
+        **kwargs: Any,
+    ) -> Data:
+        """Share a data asset from workspace to registry.
+
+        :param name: Name of data asset.
+        :type name: str
+        :param version: Version of data asset.
+        :type version: str
+        :keyword share_with_name: Name of data asset to share with.
+        :paramtype share_with_name: str
+        :keyword share_with_version: Version of data asset to share with.
+        :paramtype share_with_version: str
+        :keyword registry_name: Name of the destination registry.
+        :paramtype registry_name: str
+        :return: Data asset object.
+        :rtype: ~azure.ai.ml.entities.Data
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START data_operations_share]
+                :end-before: [END data_operations_share]
+                :language: python
+                :dedent: 8
+                :caption: Share data asset example.
+        """
+
+        #  Get workspace info to get workspace GUID
+        workspace = self._service_client.workspaces.get(
+            resource_group_name=self._resource_group_name,
+            workspace_name=self._workspace_name,
+            **kwargs,
+        )
+        workspace_guid = workspace.workspace_id
+        workspace_location = workspace.location
+
+        # Get data asset ID
+        asset_id = ASSET_ID_FORMAT.format(
+            workspace_location,
+            workspace_guid,
+            AzureMLResourceType.DATA,
+            name,
+            version,
+        )
+
+        data_ref = WorkspaceAssetReference(
+            name=share_with_name if share_with_name else name,
+            version=share_with_version if share_with_version else version,
+            asset_id=asset_id,
+        )
+
+        with self._set_registry_client(registry_name):
+            return self.create_or_update(data_ref)
+
+    @monitor_with_activity(ops_logger, "data.Mount", ActivityType.PUBLICAPI)
+    @experimental
+    def mount(
+        self,
+        path: str,
+        *,
+        mount_point: Optional[str] = None,
+        mode: str = "ro_mount",
+        debug: bool = False,
+        persistent: bool = False,
+        **kwargs,
+    ) -> None:
+        """Mount a data asset to a local path, so that you can access data inside it
+        under a local path with any tools of your choice.
+
+        :param path: The data asset path to mount, in the form of `azureml:<name>` or `azureml:<name>:<version>`.
+        :type path: str
+        :keyword mount_point: A local path used as mount point.
+        :type mount_point: str
+        :keyword mode: Mount mode. Only `ro_mount` (read-only) is supported for data asset mount.
+        :type mode: str
+        :keyword debug: Whether to enable verbose logging.
+        :type debug: bool
+        :keyword persistent: Whether to persist the mount after reboot. Applies only when running on Compute Instance,
+                where the 'CI_NAME' environment variable is set."
+        :type persistent: bool
+        :return: None
+        """
+
+        assert mode in ["ro_mount", "rw_mount"], "mode should be either `ro_mount` or `rw_mount`"
+        read_only = mode == "ro_mount"
+        assert read_only, "read-write mount for data asset is not supported yet"
+
+        ci_name = os.environ.get("CI_NAME")
+        assert not persistent or (
+            persistent and ci_name is not None
+        ), "persistent mount is only supported on Compute Instance, where the 'CI_NAME' environment variable is set."
+
+        try:
+            from azureml.dataprep import rslex_fuse_subprocess_wrapper
+        except ImportError as exc:
+            raise MlException(
+                "Mount operations requires package azureml-dataprep-rslex installed. "
+                + "You can install it with Azure ML SDK with `pip install azure-ai-ml[mount]`."
+            ) from exc
+
+        uri = rslex_fuse_subprocess_wrapper.build_data_asset_uri(
+            self._operation_scope._subscription_id, self._resource_group_name, self._workspace_name, path
+        )
+        if persistent and ci_name is not None:
+            mount_name = f"unified_mount_{str(uuid.uuid4()).replace('-', '')}"
+            self._compute_operation.update_data_mounts(
+                self._resource_group_name,
+                self._workspace_name,
+                ci_name,
+                [
+                    ComputeInstanceDataMount(
+                        source=uri,
+                        source_type="URI",
+                        mount_name=mount_name,
+                        mount_action="Mount",
+                        mount_path=mount_point or "",
+                    )
+                ],
+                api_version="2021-01-01",
+                **kwargs,
+            )
+            print(f"Mount requested [name: {mount_name}]. Waiting for completion ...")
+            while True:
+                compute = self._compute_operation.get(self._resource_group_name, self._workspace_name, ci_name)
+                mounts = compute.properties.properties.data_mounts
+                try:
+                    mount = [mount for mount in mounts if mount.mount_name == mount_name][0]
+                    if mount.mount_state == "Mounted":
+                        print(f"Mounted [name: {mount_name}].")
+                        break
+                    if mount.mount_state == "MountRequested":
+                        pass
+                    elif mount.mount_state == "MountFailed":
+                        msg = f"Mount failed [name: {mount_name}]: {mount.error}"
+                        raise MlException(message=msg, no_personal_data_message=msg)
+                    else:
+                        msg = f"Got unexpected mount state [name: {mount_name}]: {mount.mount_state}"
+                        raise MlException(message=msg, no_personal_data_message=msg)
+                except IndexError:
+                    pass
+                time.sleep(5)
+
+        else:
+            rslex_fuse_subprocess_wrapper.start_fuse_mount_subprocess(
+                uri, mount_point, read_only, debug, credential=self._operation._config.credential
+            )
+
+    @contextmanager
+    # pylint: disable-next=docstring-missing-return,docstring-missing-rtype
+    def _set_registry_client(self, registry_name: str) -> Generator:
+        """Sets the registry client for the data operations.
+
+        :param registry_name: Name of the registry.
+        :type registry_name: str
+        """
+        rg_ = self._operation_scope._resource_group_name
+        sub_ = self._operation_scope._subscription_id
+        registry_ = self._operation_scope.registry_name
+        client_ = self._service_client
+        data_versions_operation_ = self._operation
+
+        try:
+            _client, _rg, _sub = get_registry_client(self._service_client._config.credential, registry_name)
+            self._operation_scope.registry_name = registry_name
+            self._operation_scope._resource_group_name = _rg
+            self._operation_scope._subscription_id = _sub
+            self._service_client = _client
+            self._operation = _client.data_versions
+            yield
+        finally:
+            self._operation_scope.registry_name = registry_
+            self._operation_scope._resource_group_name = rg_
+            self._operation_scope._subscription_id = sub_
+            self._service_client = client_
+            self._operation = data_versions_operation_
+
+
+def _assert_local_path_matches_asset_type(
+    local_path: str,
+    asset_type: str,
+) -> None:
+    # assert file system type matches asset type
+    if asset_type == AssetTypes.URI_FOLDER and not os.path.isdir(local_path):
+        raise ValidationException(
+            message="File path does not match asset type {}: {}".format(asset_type, local_path),
+            no_personal_data_message="File path does not match asset type {}".format(asset_type),
+            target=ErrorTarget.DATA,
+            error_category=ErrorCategory.USER_ERROR,
+            error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+        )
+    if asset_type == AssetTypes.URI_FILE and not os.path.isfile(local_path):
+        raise ValidationException(
+            message="File path does not match asset type {}: {}".format(asset_type, local_path),
+            no_personal_data_message="File path does not match asset type {}".format(asset_type),
+            target=ErrorTarget.DATA,
+            error_category=ErrorCategory.USER_ERROR,
+            error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+        )