aboutsummaryrefslogtreecommitdiff
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access,no-value-for-parameter

import os
import time
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Generator, Iterable, List, Optional, Union, cast

from marshmallow.exceptions import ValidationError as SchemaValidationError

from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_path
from azure.ai.ml._artifacts._constants import (
    ASSET_PATH_ERROR,
    CHANGED_ASSET_PATH_MSG,
    CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
)
from azure.ai.ml._exception_helper import log_and_raise_error
from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import (
    AzureMachineLearningWorkspaces as ServiceClient102021Dataplane,
)
from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042023_preview
from azure.ai.ml._restclient.v2023_04_01_preview.models import ListViewType
from azure.ai.ml._restclient.v2024_01_01_preview import AzureMachineLearningWorkspaces as ServiceClient012024_preview
from azure.ai.ml._restclient.v2024_01_01_preview.models import ComputeInstanceDataMount
from azure.ai.ml._scope_dependent_operations import (
    OperationConfig,
    OperationsContainer,
    OperationScope,
    _ScopeDependentOperations,
)
from azure.ai.ml._telemetry import ActivityType, monitor_with_activity
from azure.ai.ml._utils._asset_utils import (
    _archive_or_restore,
    _check_or_modify_auto_delete_setting,
    _create_or_update_autoincrement,
    _get_latest_version_from_container,
    _resolve_label_to_asset,
    _validate_auto_delete_setting_in_data_output,
    _validate_workspace_managed_datastore,
)
from azure.ai.ml._utils._data_utils import (
    download_mltable_metadata_schema,
    read_local_mltable_metadata_contents,
    read_remote_mltable_metadata_contents,
    validate_mltable_metadata,
)
from azure.ai.ml._utils._experimental import experimental
from azure.ai.ml._utils._http_utils import HttpPipeline
from azure.ai.ml._utils._logger_utils import OpsLogger
from azure.ai.ml._utils._registry_utils import (
    get_asset_body_for_registry_storage,
    get_registry_client,
    get_sas_uri_for_registry_asset,
)
from azure.ai.ml._utils.utils import is_url
from azure.ai.ml.constants._common import (
    ASSET_ID_FORMAT,
    MLTABLE_METADATA_SCHEMA_URL_FALLBACK,
    AssetTypes,
    AzureMLResourceType,
)
from azure.ai.ml.data_transfer import import_data as import_data_func
from azure.ai.ml.entities import PipelineJob, PipelineJobSettings
from azure.ai.ml.entities._assets import Data, WorkspaceAssetReference
from azure.ai.ml.entities._data.mltable_metadata import MLTableMetadata
from azure.ai.ml.entities._data_import.data_import import DataImport
from azure.ai.ml.entities._inputs_outputs import Output
from azure.ai.ml.entities._inputs_outputs.external_data import Database
from azure.ai.ml.exceptions import (
    AssetPathException,
    ErrorCategory,
    ErrorTarget,
    MlException,
    ValidationErrorType,
    ValidationException,
)
from azure.ai.ml.operations._datastore_operations import DatastoreOperations
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
from azure.core.paging import ItemPaged

ops_logger = OpsLogger(__name__)
module_logger = ops_logger.module_logger


class DataOperations(_ScopeDependentOperations):
    """DataOperations.

    You should not instantiate this class directly. Instead, you should
    create an MLClient instance that instantiates it for you and
    attaches it as an attribute.

    :param operation_scope: Scope variables for the operations classes of an MLClient object.
    :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
    :param operation_config: Common configuration for operations classes of an MLClient object.
    :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
    :param service_client: Service client to allow end users to operate on Azure Machine Learning Workspace
        resources (ServiceClient042023Preview or ServiceClient102021Dataplane).
    :type service_client: typing.Union[
        ~azure.ai.ml._restclient.v2023_04_01_preview._azure_machine_learning_workspaces.AzureMachineLearningWorkspaces,
        ~azure.ai.ml._restclient.v2021_10_01_dataplanepreview._azure_machine_learning_workspaces.
        AzureMachineLearningWorkspaces]
    :param datastore_operations: Represents a client for performing operations on Datastores.
    :type datastore_operations: ~azure.ai.ml.operations._datastore_operations.DatastoreOperations
    """

    def __init__(
        self,
        operation_scope: OperationScope,
        operation_config: OperationConfig,
        service_client: Union[ServiceClient042023_preview, ServiceClient102021Dataplane],
        service_client_012024_preview: ServiceClient012024_preview,
        datastore_operations: DatastoreOperations,
        **kwargs: Any,
    ):
        super(DataOperations, self).__init__(operation_scope, operation_config)
        ops_logger.update_filter()
        self._operation = service_client.data_versions
        self._container_operation = service_client.data_containers
        self._datastore_operation = datastore_operations
        self._compute_operation = service_client_012024_preview.compute
        self._service_client = service_client
        self._init_kwargs = kwargs
        self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline")
        self._all_operations: OperationsContainer = kwargs.pop("all_operations")
        # Maps a label to a function which given an asset name,
        # returns the asset associated with the label
        self._managed_label_resolver = {"latest": self._get_latest_version}

    @monitor_with_activity(ops_logger, "Data.List", ActivityType.PUBLICAPI)
    def list(
        self,
        name: Optional[str] = None,
        *,
        list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
    ) -> ItemPaged[Data]:
        """List the data assets of the workspace.

        :param name: Name of a specific data asset, optional.
        :type name: Optional[str]
        :keyword list_view_type: View type for including/excluding (for example) archived data assets.
            Default: ACTIVE_ONLY.
        :type list_view_type: Optional[ListViewType]
        :return: An iterator like instance of Data objects
        :rtype: ~azure.core.paging.ItemPaged[Data]

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START data_operations_list]
                :end-before: [END data_operations_list]
                :language: python
                :dedent: 8
                :caption: List data assets example.
        """
        if name:
            return (
                self._operation.list(
                    name=name,
                    registry_name=self._registry_name,
                    cls=lambda objs: [Data._from_rest_object(obj) for obj in objs],
                    list_view_type=list_view_type,
                    **self._scope_kwargs,
                )
                if self._registry_name
                else self._operation.list(
                    name=name,
                    workspace_name=self._workspace_name,
                    cls=lambda objs: [Data._from_rest_object(obj) for obj in objs],
                    list_view_type=list_view_type,
                    **self._scope_kwargs,
                )
            )
        return (
            self._container_operation.list(
                registry_name=self._registry_name,
                cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs],
                list_view_type=list_view_type,
                **self._scope_kwargs,
            )
            if self._registry_name
            else self._container_operation.list(
                workspace_name=self._workspace_name,
                cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs],
                list_view_type=list_view_type,
                **self._scope_kwargs,
            )
        )

    def _get(self, name: Optional[str], version: Optional[str] = None) -> Data:
        if version:
            return (
                self._operation.get(
                    name=name,
                    version=version,
                    registry_name=self._registry_name,
                    **self._scope_kwargs,
                    **self._init_kwargs,
                )
                if self._registry_name
                else self._operation.get(
                    resource_group_name=self._resource_group_name,
                    workspace_name=self._workspace_name,
                    name=name,
                    version=version,
                    **self._init_kwargs,
                )
            )
        return (
            self._container_operation.get(
                name=name,
                registry_name=self._registry_name,
                **self._scope_kwargs,
                **self._init_kwargs,
            )
            if self._registry_name
            else self._container_operation.get(
                resource_group_name=self._resource_group_name,
                workspace_name=self._workspace_name,
                name=name,
                **self._init_kwargs,
            )
        )

    @monitor_with_activity(ops_logger, "Data.Get", ActivityType.PUBLICAPI)
    def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Data:  # type: ignore
        """Get the specified data asset.

        :param name: Name of data asset.
        :type name: str
        :param version: Version of data asset.
        :type version: str
        :param label: Label of the data asset. (mutually exclusive with version)
        :type label: str
        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Data cannot be successfully
            identified and retrieved. Details will be provided in the error message.
        :return: Data asset object.
        :rtype: ~azure.ai.ml.entities.Data

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START data_operations_get]
                :end-before: [END data_operations_get]
                :language: python
                :dedent: 8
                :caption: Get data assets example.
        """
        try:
            if version and label:
                msg = "Cannot specify both version and label."
                raise ValidationException(
                    message=msg,
                    target=ErrorTarget.DATA,
                    no_personal_data_message=msg,
                    error_category=ErrorCategory.USER_ERROR,
                    error_type=ValidationErrorType.INVALID_VALUE,
                )

            if label:
                return _resolve_label_to_asset(self, name, label)

            if not version:
                msg = "Must provide either version or label."
                raise ValidationException(
                    message=msg,
                    target=ErrorTarget.DATA,
                    no_personal_data_message=msg,
                    error_category=ErrorCategory.USER_ERROR,
                    error_type=ValidationErrorType.MISSING_FIELD,
                )
            data_version_resource = self._get(name, version)
            return Data._from_rest_object(data_version_resource)
        except (ValidationException, SchemaValidationError) as ex:
            log_and_raise_error(ex)

    @monitor_with_activity(ops_logger, "Data.CreateOrUpdate", ActivityType.PUBLICAPI)
    def create_or_update(self, data: Data) -> Data:
        """Returns created or updated data asset.

        If not already in storage, asset will be uploaded to the workspace's blob storage.

        :param data: Data asset object.
        :type data: azure.ai.ml.entities.Data
        :raises ~azure.ai.ml.exceptions.AssetPathException: Raised when the Data artifact path is
            already linked to another asset
        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Data cannot be successfully validated.
            Details will be provided in the error message.
        :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory.
        :return: Data asset object.
        :rtype: ~azure.ai.ml.entities.Data

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START data_operations_create_or_update]
                :end-before: [END data_operations_create_or_update]
                :language: python
                :dedent: 8
                :caption: Create data assets example.
        """
        try:
            name = data.name
            if not data.version and self._registry_name:
                msg = "Data asset version is required for registry"
                raise ValidationException(
                    message=msg,
                    no_personal_data_message=msg,
                    target=ErrorTarget.DATA,
                    error_category=ErrorCategory.USER_ERROR,
                    error_type=ValidationErrorType.MISSING_FIELD,
                )
            version = data.version

            sas_uri = None
            if self._registry_name:
                # If the data asset is a workspace asset, promote to registry
                if isinstance(data, WorkspaceAssetReference):
                    try:
                        self._operation.get(
                            name=data.name,
                            version=data.version,
                            resource_group_name=self._resource_group_name,
                            registry_name=self._registry_name,
                        )
                    except Exception as err:  # pylint: disable=W0718
                        if isinstance(err, ResourceNotFoundError):
                            pass
                        else:
                            raise err
                    else:
                        msg = "An data asset with this name and version already exists in registry"
                        raise ValidationException(
                            message=msg,
                            no_personal_data_message=msg,
                            target=ErrorTarget.DATA,
                            error_category=ErrorCategory.USER_ERROR,
                        )
                    data_res_obj = data._to_rest_object()
                    result = self._service_client.resource_management_asset_reference.begin_import_method(
                        resource_group_name=self._resource_group_name,
                        registry_name=self._registry_name,
                        body=data_res_obj,
                    ).result()

                    if not result:
                        data_res_obj = self._get(name=data.name, version=data.version)
                        return Data._from_rest_object(data_res_obj)

                sas_uri = get_sas_uri_for_registry_asset(
                    service_client=self._service_client,
                    name=name,
                    version=version,
                    resource_group=self._resource_group_name,
                    registry=self._registry_name,
                    body=get_asset_body_for_registry_storage(self._registry_name, "data", name, version),
                )

            referenced_uris = self._validate(data)
            if referenced_uris:
                data._referenced_uris = referenced_uris

            data, _ = _check_and_upload_path(
                artifact=data,
                asset_operations=self,
                sas_uri=sas_uri,
                artifact_type=ErrorTarget.DATA,
                show_progress=self._show_progress,
            )

            _check_or_modify_auto_delete_setting(data.auto_delete_setting)

            data_version_resource = data._to_rest_object()
            auto_increment_version = data._auto_increment_version

            if auto_increment_version:
                result = _create_or_update_autoincrement(
                    name=data.name,
                    body=data_version_resource,
                    version_operation=self._operation,
                    container_operation=self._container_operation,
                    resource_group_name=self._operation_scope.resource_group_name,
                    workspace_name=self._workspace_name,
                    **self._init_kwargs,
                )
            else:
                result = (
                    self._operation.begin_create_or_update(
                        name=name,
                        version=version,
                        registry_name=self._registry_name,
                        body=data_version_resource,
                        **self._scope_kwargs,
                    ).result()
                    if self._registry_name
                    else self._operation.create_or_update(
                        name=name,
                        version=version,
                        workspace_name=self._workspace_name,
                        body=data_version_resource,
                        **self._scope_kwargs,
                    )
                )

            if not result and self._registry_name:
                result = self._get(name=name, version=version)

            return Data._from_rest_object(result)
        except Exception as ex:
            if isinstance(ex, (ValidationException, SchemaValidationError)):
                log_and_raise_error(ex)
            elif isinstance(ex, HttpResponseError):
                # service side raises an exception if we attempt to update an existing asset's asset path
                if str(ex) == ASSET_PATH_ERROR:
                    raise AssetPathException(
                        message=CHANGED_ASSET_PATH_MSG,
                        tartget=ErrorTarget.DATA,
                        no_personal_data_message=CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
                        error_category=ErrorCategory.USER_ERROR,
                    ) from ex
            raise ex

    @monitor_with_activity(ops_logger, "Data.ImportData", ActivityType.PUBLICAPI)
    @experimental
    def import_data(self, data_import: DataImport, **kwargs: Any) -> PipelineJob:
        """Returns the data import job that is creating the data asset.

        :param data_import: DataImport object.
        :type data_import: azure.ai.ml.entities.DataImport
        :return: data import job object.
        :rtype: ~azure.ai.ml.entities.PipelineJob

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START data_operations_import_data]
                :end-before: [END data_operations_import_data]
                :language: python
                :dedent: 8
                :caption: Import data assets example.
        """

        experiment_name = "data_import_" + str(data_import.name)
        data_import.type = AssetTypes.MLTABLE if isinstance(data_import.source, Database) else AssetTypes.URI_FOLDER

        # avoid specifying auto_delete_setting in job output now
        _validate_auto_delete_setting_in_data_output(data_import.auto_delete_setting)

        # block cumtomer specified path on managed datastore
        data_import.path = _validate_workspace_managed_datastore(data_import.path)

        if "${{name}}" not in str(data_import.path):
            data_import.path = data_import.path.rstrip("/") + "/${{name}}"  # type: ignore
        import_job = import_data_func(
            description=data_import.description or experiment_name,
            display_name=experiment_name,
            experiment_name=experiment_name,
            compute="serverless",
            source=data_import.source,
            outputs={
                "sink": Output(
                    type=data_import.type,
                    path=data_import.path,  # type: ignore
                    name=data_import.name,
                    version=data_import.version,
                )
            },
        )
        import_pipeline = PipelineJob(
            description=data_import.description or experiment_name,
            tags=data_import.tags,
            display_name=experiment_name,
            experiment_name=experiment_name,
            properties=data_import.properties or {},
            settings=PipelineJobSettings(force_rerun=True),
            jobs={experiment_name: import_job},
        )
        import_pipeline.properties["azureml.materializationAssetName"] = data_import.name
        return self._all_operations.all_operations[AzureMLResourceType.JOB].create_or_update(
            job=import_pipeline, skip_validation=True, **kwargs
        )

    @monitor_with_activity(ops_logger, "Data.ListMaterializationStatus", ActivityType.PUBLICAPI)
    def list_materialization_status(
        self,
        name: str,
        *,
        list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
        **kwargs: Any,
    ) -> Iterable[PipelineJob]:
        """List materialization jobs of the asset.

        :param name: name of asset being created by the materialization jobs.
        :type name: str
        :keyword list_view_type: View type for including/excluding (for example) archived jobs. Default: ACTIVE_ONLY.
        :paramtype list_view_type: Optional[ListViewType]
        :return: An iterator like instance of Job objects.
        :rtype: ~azure.core.paging.ItemPaged[PipelineJob]

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START data_operations_list_materialization_status]
                :end-before: [END data_operations_list_materialization_status]
                :language: python
                :dedent: 8
                :caption: List materialization jobs example.
        """

        return cast(
            Iterable[PipelineJob],
            self._all_operations.all_operations[AzureMLResourceType.JOB].list(
                job_type="Pipeline",
                asset_name=name,
                list_view_type=list_view_type,
                **kwargs,
            ),
        )

    @monitor_with_activity(ops_logger, "Data.Validate", ActivityType.INTERNALCALL)
    def _validate(self, data: Data) -> Optional[List[str]]:
        if not data.path:
            msg = "Missing data path. Path is required for data."
            raise ValidationException(
                message=msg,
                no_personal_data_message=msg,
                error_type=ValidationErrorType.MISSING_FIELD,
                target=ErrorTarget.DATA,
                error_category=ErrorCategory.USER_ERROR,
            )

        asset_path = str(data.path)
        asset_type = data.type
        base_path = data.base_path

        if asset_type == AssetTypes.MLTABLE:
            if is_url(asset_path):
                try:
                    metadata_contents = read_remote_mltable_metadata_contents(
                        base_uri=asset_path,
                        datastore_operations=self._datastore_operation,
                        requests_pipeline=self._requests_pipeline,
                    )
                    metadata_yaml_path = None
                except Exception:  # pylint: disable=W0718
                    # skip validation for remote MLTable when the contents cannot be read
                    module_logger.info("Unable to access MLTable metadata at path %s", asset_path)
                    return None
            else:
                metadata_contents = read_local_mltable_metadata_contents(path=asset_path)
                metadata_yaml_path = Path(asset_path, "MLTable")
            metadata = MLTableMetadata._load(metadata_contents, metadata_yaml_path)
            mltable_metadata_schema = self._try_get_mltable_metadata_jsonschema(data._mltable_schema_url)
            if mltable_metadata_schema and not data._skip_validation:
                validate_mltable_metadata(
                    mltable_metadata_dict=metadata_contents,
                    mltable_schema=mltable_metadata_schema,
                )
            return cast(Optional[List[str]], metadata.referenced_uris())

        if is_url(asset_path):
            # skip validation for remote URI_FILE or URI_FOLDER
            pass
        elif os.path.isabs(asset_path):
            _assert_local_path_matches_asset_type(asset_path, asset_type)
        else:
            abs_path = Path(base_path, asset_path).resolve()
            _assert_local_path_matches_asset_type(str(abs_path), asset_type)

        return None

    def _try_get_mltable_metadata_jsonschema(self, mltable_schema_url: Optional[str]) -> Optional[Dict]:
        if mltable_schema_url is None:
            mltable_schema_url = MLTABLE_METADATA_SCHEMA_URL_FALLBACK
        try:
            return cast(Optional[Dict], download_mltable_metadata_schema(mltable_schema_url, self._requests_pipeline))
        except Exception:  # pylint: disable=W0718
            module_logger.info(
                'Failed to download MLTable metadata jsonschema from "%s", skipping validation',
                mltable_schema_url,
            )
            return None

    @monitor_with_activity(ops_logger, "Data.Archive", ActivityType.PUBLICAPI)
    def archive(
        self,
        name: str,
        version: Optional[str] = None,
        label: Optional[str] = None,
        # pylint:disable=unused-argument
        **kwargs: Any,
    ) -> None:
        """Archive a data asset.

        :param name: Name of data asset.
        :type name: str
        :param version: Version of data asset.
        :type version: str
        :param label: Label of the data asset. (mutually exclusive with version)
        :type label: str
        :return: None

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START data_operations_archive]
                :end-before: [END data_operations_archive]
                :language: python
                :dedent: 8
                :caption: Archive data asset example.
        """

        _archive_or_restore(
            asset_operations=self,
            version_operation=self._operation,
            container_operation=self._container_operation,
            is_archived=True,
            name=name,
            version=version,
            label=label,
        )

    @monitor_with_activity(ops_logger, "Data.Restore", ActivityType.PUBLICAPI)
    def restore(
        self,
        name: str,
        version: Optional[str] = None,
        label: Optional[str] = None,
        # pylint:disable=unused-argument
        **kwargs: Any,
    ) -> None:
        """Restore an archived data asset.

        :param name: Name of data asset.
        :type name: str
        :param version: Version of data asset.
        :type version: str
        :param label: Label of the data asset. (mutually exclusive with version)
        :type label: str
        :return: None

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START data_operations_restore]
                :end-before: [END data_operations_restore]
                :language: python
                :dedent: 8
                :caption: Restore data asset example.
        """

        _archive_or_restore(
            asset_operations=self,
            version_operation=self._operation,
            container_operation=self._container_operation,
            is_archived=False,
            name=name,
            version=version,
            label=label,
        )

    def _get_latest_version(self, name: str) -> Data:
        """Returns the latest version of the asset with the given name. Latest is defined as the most recently created,
         not the most recently updated.

        :param name: The asset name
        :type name: str
        :return: The latest asset
        :rtype: Data
        """
        latest_version = _get_latest_version_from_container(
            name,
            self._container_operation,
            self._resource_group_name,
            self._workspace_name,
            self._registry_name,
        )
        return self.get(name, version=latest_version)

    @monitor_with_activity(ops_logger, "data.Share", ActivityType.PUBLICAPI)
    @experimental
    def share(
        self,
        name: str,
        version: str,
        *,
        share_with_name: str,
        share_with_version: str,
        registry_name: str,
        **kwargs: Any,
    ) -> Data:
        """Share a data asset from workspace to registry.

        :param name: Name of data asset.
        :type name: str
        :param version: Version of data asset.
        :type version: str
        :keyword share_with_name: Name of data asset to share with.
        :paramtype share_with_name: str
        :keyword share_with_version: Version of data asset to share with.
        :paramtype share_with_version: str
        :keyword registry_name: Name of the destination registry.
        :paramtype registry_name: str
        :return: Data asset object.
        :rtype: ~azure.ai.ml.entities.Data

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START data_operations_share]
                :end-before: [END data_operations_share]
                :language: python
                :dedent: 8
                :caption: Share data asset example.
        """

        #  Get workspace info to get workspace GUID
        workspace = self._service_client.workspaces.get(
            resource_group_name=self._resource_group_name,
            workspace_name=self._workspace_name,
            **kwargs,
        )
        workspace_guid = workspace.workspace_id
        workspace_location = workspace.location

        # Get data asset ID
        asset_id = ASSET_ID_FORMAT.format(
            workspace_location,
            workspace_guid,
            AzureMLResourceType.DATA,
            name,
            version,
        )

        data_ref = WorkspaceAssetReference(
            name=share_with_name if share_with_name else name,
            version=share_with_version if share_with_version else version,
            asset_id=asset_id,
        )

        with self._set_registry_client(registry_name):
            return self.create_or_update(data_ref)

    @monitor_with_activity(ops_logger, "data.Mount", ActivityType.PUBLICAPI)
    @experimental
    def mount(
        self,
        path: str,
        *,
        mount_point: Optional[str] = None,
        mode: str = "ro_mount",
        debug: bool = False,
        persistent: bool = False,
        **kwargs,
    ) -> None:
        """Mount a data asset to a local path, so that you can access data inside it
        under a local path with any tools of your choice.

        :param path: The data asset path to mount, in the form of `azureml:<name>` or `azureml:<name>:<version>`.
        :type path: str
        :keyword mount_point: A local path used as mount point.
        :type mount_point: str
        :keyword mode: Mount mode. Only `ro_mount` (read-only) is supported for data asset mount.
        :type mode: str
        :keyword debug: Whether to enable verbose logging.
        :type debug: bool
        :keyword persistent: Whether to persist the mount after reboot. Applies only when running on Compute Instance,
                where the 'CI_NAME' environment variable is set."
        :type persistent: bool
        :return: None
        """

        assert mode in ["ro_mount", "rw_mount"], "mode should be either `ro_mount` or `rw_mount`"
        read_only = mode == "ro_mount"
        assert read_only, "read-write mount for data asset is not supported yet"

        ci_name = os.environ.get("CI_NAME")
        assert not persistent or (
            persistent and ci_name is not None
        ), "persistent mount is only supported on Compute Instance, where the 'CI_NAME' environment variable is set."

        try:
            from azureml.dataprep import rslex_fuse_subprocess_wrapper
        except ImportError as exc:
            raise MlException(
                "Mount operations requires package azureml-dataprep-rslex installed. "
                + "You can install it with Azure ML SDK with `pip install azure-ai-ml[mount]`."
            ) from exc

        uri = rslex_fuse_subprocess_wrapper.build_data_asset_uri(
            self._operation_scope._subscription_id, self._resource_group_name, self._workspace_name, path
        )
        if persistent and ci_name is not None:
            mount_name = f"unified_mount_{str(uuid.uuid4()).replace('-', '')}"
            self._compute_operation.update_data_mounts(
                self._resource_group_name,
                self._workspace_name,
                ci_name,
                [
                    ComputeInstanceDataMount(
                        source=uri,
                        source_type="URI",
                        mount_name=mount_name,
                        mount_action="Mount",
                        mount_path=mount_point or "",
                    )
                ],
                api_version="2021-01-01",
                **kwargs,
            )
            print(f"Mount requested [name: {mount_name}]. Waiting for completion ...")
            while True:
                compute = self._compute_operation.get(self._resource_group_name, self._workspace_name, ci_name)
                mounts = compute.properties.properties.data_mounts
                try:
                    mount = [mount for mount in mounts if mount.mount_name == mount_name][0]
                    if mount.mount_state == "Mounted":
                        print(f"Mounted [name: {mount_name}].")
                        break
                    if mount.mount_state == "MountRequested":
                        pass
                    elif mount.mount_state == "MountFailed":
                        msg = f"Mount failed [name: {mount_name}]: {mount.error}"
                        raise MlException(message=msg, no_personal_data_message=msg)
                    else:
                        msg = f"Got unexpected mount state [name: {mount_name}]: {mount.mount_state}"
                        raise MlException(message=msg, no_personal_data_message=msg)
                except IndexError:
                    pass
                time.sleep(5)

        else:
            rslex_fuse_subprocess_wrapper.start_fuse_mount_subprocess(
                uri, mount_point, read_only, debug, credential=self._operation._config.credential
            )

    @contextmanager
    # pylint: disable-next=docstring-missing-return,docstring-missing-rtype
    def _set_registry_client(self, registry_name: str) -> Generator:
        """Sets the registry client for the data operations.

        :param registry_name: Name of the registry.
        :type registry_name: str
        """
        rg_ = self._operation_scope._resource_group_name
        sub_ = self._operation_scope._subscription_id
        registry_ = self._operation_scope.registry_name
        client_ = self._service_client
        data_versions_operation_ = self._operation

        try:
            _client, _rg, _sub = get_registry_client(self._service_client._config.credential, registry_name)
            self._operation_scope.registry_name = registry_name
            self._operation_scope._resource_group_name = _rg
            self._operation_scope._subscription_id = _sub
            self._service_client = _client
            self._operation = _client.data_versions
            yield
        finally:
            self._operation_scope.registry_name = registry_
            self._operation_scope._resource_group_name = rg_
            self._operation_scope._subscription_id = sub_
            self._service_client = client_
            self._operation = data_versions_operation_


def _assert_local_path_matches_asset_type(
    local_path: str,
    asset_type: str,
) -> None:
    # assert file system type matches asset type
    if asset_type == AssetTypes.URI_FOLDER and not os.path.isdir(local_path):
        raise ValidationException(
            message="File path does not match asset type {}: {}".format(asset_type, local_path),
            no_personal_data_message="File path does not match asset type {}".format(asset_type),
            target=ErrorTarget.DATA,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
        )
    if asset_type == AssetTypes.URI_FILE and not os.path.isfile(local_path):
        raise ValidationException(
            message="File path does not match asset type {}: {}".format(asset_type, local_path),
            no_personal_data_message="File path does not match asset type {}".format(asset_type),
            target=ErrorTarget.DATA,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
        )