aboutsummaryrefslogtreecommitdiff
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access,disable=docstring-missing-return,docstring-missing-param,docstring-missing-rtype,line-too-long,too-many-statements

import re
from contextlib import contextmanager
from os import PathLike, path
from typing import Any, Dict, Generator, Iterable, Optional, Union, cast

from marshmallow.exceptions import ValidationError as SchemaValidationError

from azure.ai.ml._artifacts._artifact_utilities import (
    _check_and_upload_path,
    _get_default_datastore_info,
    _update_metadata,
)
from azure.ai.ml._artifacts._constants import (
    ASSET_PATH_ERROR,
    CHANGED_ASSET_PATH_MSG,
    CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
)
from azure.ai.ml._exception_helper import log_and_raise_error
from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import (
    AzureMachineLearningWorkspaces as ServiceClient102021Dataplane,
)
from azure.ai.ml._restclient.v2023_08_01_preview import AzureMachineLearningWorkspaces as ServiceClient082023Preview
from azure.ai.ml._restclient.v2023_08_01_preview.models import ListViewType, ModelVersion
from azure.ai.ml._scope_dependent_operations import (
    OperationConfig,
    OperationsContainer,
    OperationScope,
    _ScopeDependentOperations,
)
from azure.ai.ml._telemetry import ActivityType, monitor_with_activity
from azure.ai.ml._utils._arm_id_utils import AMLVersionedArmId, is_ARM_id_for_resource
from azure.ai.ml._utils._asset_utils import (
    _archive_or_restore,
    _get_latest,
    _get_next_version_from_container,
    _resolve_label_to_asset,
)
from azure.ai.ml._utils._experimental import experimental
from azure.ai.ml._utils._logger_utils import OpsLogger
from azure.ai.ml._utils._registry_utils import (
    get_asset_body_for_registry_storage,
    get_registry_client,
    get_sas_uri_for_registry_asset,
    get_storage_details_for_registry_assets,
)
from azure.ai.ml._utils._storage_utils import get_ds_name_and_path_prefix, get_storage_client
from azure.ai.ml._utils.utils import _is_evaluator, resolve_short_datastore_url, validate_ml_flow_folder
from azure.ai.ml.constants._common import ARM_ID_PREFIX, ASSET_ID_FORMAT, REGISTRY_URI_FORMAT, AzureMLResourceType
from azure.ai.ml.entities import AzureDataLakeGen2Datastore
from azure.ai.ml.entities._assets import Environment, Model, ModelPackage
from azure.ai.ml.entities._assets._artifacts.code import Code
from azure.ai.ml.entities._assets.workspace_asset_reference import WorkspaceAssetReference
from azure.ai.ml.entities._credentials import AccountKeyConfiguration
from azure.ai.ml.exceptions import (
    AssetPathException,
    ErrorCategory,
    ErrorTarget,
    ValidationErrorType,
    ValidationException,
)
from azure.ai.ml.operations._datastore_operations import DatastoreOperations
from azure.core.exceptions import ResourceNotFoundError

from ._operation_orchestrator import OperationOrchestrator

ops_logger = OpsLogger(__name__)
module_logger = ops_logger.module_logger


# pylint: disable=too-many-instance-attributes
class ModelOperations(_ScopeDependentOperations):
    """ModelOperations.

    You should not instantiate this class directly. Instead, you should create an MLClient instance that instantiates it
    for you and attaches it as an attribute.

    :param operation_scope: Scope variables for the operations classes of an MLClient object.
    :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
    :param operation_config: Common configuration for operations classes of an MLClient object.
    :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
    :param service_client: Service client to allow end users to operate on Azure Machine Learning Workspace
        resources (ServiceClient082023Preview or ServiceClient102021Dataplane).
    :type service_client: typing.Union[
        azure.ai.ml._restclient.v2023_04_01_preview._azure_machine_learning_workspaces.AzureMachineLearningWorkspaces,
        azure.ai.ml._restclient.v2021_10_01_dataplanepreview._azure_machine_learning_workspaces.
        AzureMachineLearningWorkspaces]
    :param datastore_operations: Represents a client for performing operations on Datastores.
    :type datastore_operations: ~azure.ai.ml.operations._datastore_operations.DatastoreOperations
    :param all_operations: All operations classes of an MLClient object.
    :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer
    """

    _IS_EVALUATOR = "__is_evaluator"

    # pylint: disable=unused-argument
    def __init__(
        self,
        operation_scope: OperationScope,
        operation_config: OperationConfig,
        service_client: Union[ServiceClient082023Preview, ServiceClient102021Dataplane],
        datastore_operations: DatastoreOperations,
        all_operations: Optional[OperationsContainer] = None,
        **kwargs,
    ):
        super(ModelOperations, self).__init__(operation_scope, operation_config)
        ops_logger.update_filter()
        self._model_versions_operation = service_client.model_versions
        self._model_container_operation = service_client.model_containers
        self._service_client = service_client
        self._datastore_operation = datastore_operations
        self._all_operations = all_operations
        self._control_plane_client: Any = kwargs.get("control_plane_client", None)
        self._workspace_rg = kwargs.pop("workspace_rg", None)
        self._workspace_sub = kwargs.pop("workspace_sub", None)
        self._registry_reference = kwargs.pop("registry_reference", None)

        # Maps a label to a function which given an asset name,
        # returns the asset associated with the label
        self._managed_label_resolver = {"latest": self._get_latest_version}
        self.__is_evaluator = kwargs.pop(ModelOperations._IS_EVALUATOR, False)

    @monitor_with_activity(ops_logger, "Model.CreateOrUpdate", ActivityType.PUBLICAPI)
    def create_or_update(  # type: ignore
        self, model: Union[Model, WorkspaceAssetReference]
    ) -> Model:  # TODO: Are we going to implement job_name?
        """Returns created or updated model asset.

        :param model: Model asset object.
        :type model: ~azure.ai.ml.entities.Model
        :raises ~azure.ai.ml.exceptions.AssetPathException: Raised when the Model artifact path is
            already linked to another asset
        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Model cannot be successfully validated.
            Details will be provided in the error message.
        :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory.
        :return: Model asset object.
        :rtype: ~azure.ai.ml.entities.Model
        """
        # Check if we have the model with the same name and it is an
        # evaluator. In this aces raise the exception do not create the model.
        if not self.__is_evaluator and _is_evaluator(model.properties):
            msg = (
                "Unable to create the evaluator using ModelOperations. To create "
                "evaluator, please use EvaluatorOperations by calling "
                "ml_client.evaluators.create_or_update(model) instead."
            )
            raise ValidationException(
                message=msg,
                no_personal_data_message=msg,
                target=ErrorTarget.MODEL,
                error_category=ErrorCategory.USER_ERROR,
            )
        if model.name is not None:
            model_properties = self._get_model_properties(model.name)
            if model_properties is not None and _is_evaluator(model_properties) != _is_evaluator(model.properties):
                if _is_evaluator(model.properties):
                    msg = (
                        f"Unable to create the model with name {model.name} "
                        "because this version of model was marked as promptflow evaluator, but the previous "
                        "version is a regular model. "
                        "Please change the model name and try again."
                    )
                else:
                    msg = (
                        f"Unable to create the model with name {model.name} "
                        "because previous version of model was marked as promptflow evaluator, but this "
                        "version is a regular model. "
                        "Please change the model name and try again."
                    )
                raise ValidationException(
                    message=msg,
                    no_personal_data_message=msg,
                    target=ErrorTarget.MODEL,
                    error_category=ErrorCategory.USER_ERROR,
                )
        try:
            name = model.name
            if not model.version and model._auto_increment_version:
                model.version = _get_next_version_from_container(
                    name=model.name,
                    container_operation=self._model_container_operation,
                    resource_group_name=self._operation_scope.resource_group_name,
                    workspace_name=self._workspace_name,
                    registry_name=self._registry_name,
                )

            version = model.version

            sas_uri = None

            if self._registry_name:
                # Case of copy model to registry
                if isinstance(model, WorkspaceAssetReference):
                    # verify that model is not already in registry
                    try:
                        self._model_versions_operation.get(
                            name=model.name,
                            version=model.version,
                            resource_group_name=self._resource_group_name,
                            registry_name=self._registry_name,
                        )
                    except Exception as err:  # pylint: disable=W0718
                        if isinstance(err, ResourceNotFoundError):
                            pass
                        else:
                            raise err
                    else:
                        msg = "A model with this name and version already exists in registry"
                        raise ValidationException(
                            message=msg,
                            no_personal_data_message=msg,
                            target=ErrorTarget.MODEL,
                            error_category=ErrorCategory.USER_ERROR,
                        )

                    model_rest = model._to_rest_object()
                    result = self._service_client.resource_management_asset_reference.begin_import_method(
                        resource_group_name=self._resource_group_name,
                        registry_name=self._registry_name,
                        body=model_rest,
                    ).result()

                    if not result:
                        model_rest_obj = self._get(name=str(model.name), version=model.version)
                        return Model._from_rest_object(model_rest_obj)

                sas_uri = get_sas_uri_for_registry_asset(
                    service_client=self._service_client,
                    name=model.name,
                    version=model.version,
                    resource_group=self._resource_group_name,
                    registry=self._registry_name,
                    body=get_asset_body_for_registry_storage(self._registry_name, "models", model.name, model.version),
                )

            model, indicator_file = _check_and_upload_path(  # type: ignore[type-var]
                artifact=model,
                asset_operations=self,
                sas_uri=sas_uri,
                artifact_type=ErrorTarget.MODEL,
                show_progress=self._show_progress,
            )

            model.path = resolve_short_datastore_url(model.path, self._operation_scope)  # type: ignore
            validate_ml_flow_folder(model.path, model.type)  # type: ignore
            model_version_resource = model._to_rest_object()
            auto_increment_version = model._auto_increment_version
            try:
                result = (
                    self._model_versions_operation.begin_create_or_update(
                        name=name,
                        version=version,
                        body=model_version_resource,
                        registry_name=self._registry_name,
                        **self._scope_kwargs,
                    ).result()
                    if self._registry_name
                    else self._model_versions_operation.create_or_update(
                        name=name,
                        version=version,
                        body=model_version_resource,
                        workspace_name=self._workspace_name,
                        **self._scope_kwargs,
                    )
                )

                if not result and self._registry_name:
                    result = self._get(name=str(model.name), version=model.version)

            except Exception as e:
                # service side raises an exception if we attempt to update an existing asset's path
                if str(e) == ASSET_PATH_ERROR:
                    raise AssetPathException(
                        message=CHANGED_ASSET_PATH_MSG,
                        target=ErrorTarget.MODEL,
                        no_personal_data_message=CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
                        error_category=ErrorCategory.USER_ERROR,
                    ) from e
                raise e

            model = Model._from_rest_object(result)
            if auto_increment_version and indicator_file:
                datastore_info = _get_default_datastore_info(self._datastore_operation)
                _update_metadata(model.name, model.version, indicator_file, datastore_info)  # update version in storage

            return model
        except Exception as ex:  # pylint: disable=W0718
            if isinstance(ex, SchemaValidationError):
                log_and_raise_error(ex)
            else:
                raise ex

    def _get(self, name: str, version: Optional[str] = None) -> ModelVersion:  # name:latest
        if version:
            return (
                self._model_versions_operation.get(
                    name=name,
                    version=version,
                    registry_name=self._registry_name,
                    **self._scope_kwargs,
                )
                if self._registry_name
                else self._model_versions_operation.get(
                    name=name,
                    version=version,
                    workspace_name=self._workspace_name,
                    **self._scope_kwargs,
                )
            )

        return (
            self._model_container_operation.get(name=name, registry_name=self._registry_name, **self._scope_kwargs)
            if self._registry_name
            else self._model_container_operation.get(
                name=name, workspace_name=self._workspace_name, **self._scope_kwargs
            )
        )

    @monitor_with_activity(ops_logger, "Model.Get", ActivityType.PUBLICAPI)
    def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Model:
        """Returns information about the specified model asset.

        :param name: Name of the model.
        :type name: str
        :param version: Version of the model.
        :type version: str
        :param label: Label of the model. (mutually exclusive with version)
        :type label: str
        :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Model cannot be successfully validated.
            Details will be provided in the error message.
        :return: Model asset object.
        :rtype: ~azure.ai.ml.entities.Model
        """
        if version and label:
            msg = "Cannot specify both version and label."
            raise ValidationException(
                message=msg,
                target=ErrorTarget.MODEL,
                no_personal_data_message=msg,
                error_category=ErrorCategory.USER_ERROR,
                error_type=ValidationErrorType.INVALID_VALUE,
            )

        if label:
            return _resolve_label_to_asset(self, name, label)

        if not version:
            msg = "Must provide either version or label"
            raise ValidationException(
                message=msg,
                target=ErrorTarget.MODEL,
                no_personal_data_message=msg,
                error_category=ErrorCategory.USER_ERROR,
                error_type=ValidationErrorType.MISSING_FIELD,
            )
        # TODO: We should consider adding an exception trigger for internal_model=None
        model_version_resource = self._get(name, version)

        return Model._from_rest_object(model_version_resource)

    @monitor_with_activity(ops_logger, "Model.Download", ActivityType.PUBLICAPI)
    def download(self, name: str, version: str, download_path: Union[PathLike, str] = ".") -> None:
        """Download files related to a model.

        :param name: Name of the model.
        :type name: str
        :param version: Version of the model.
        :type version: str
        :param download_path: Local path as download destination, defaults to current working directory of the current
            user. Contents will be overwritten.
        :type download_path: Union[PathLike, str]
        :raises ResourceNotFoundError: if can't find a model matching provided name.
        """

        model_uri = self.get(name=name, version=version).path
        ds_name, path_prefix = get_ds_name_and_path_prefix(model_uri, self._registry_name)
        if self._registry_name:
            sas_uri, auth_type = get_storage_details_for_registry_assets(
                service_client=self._service_client,
                asset_name=name,
                asset_version=version,
                reg_name=self._registry_name,
                asset_type=AzureMLResourceType.MODEL,
                rg_name=self._resource_group_name,
                uri=model_uri,
            )
            if auth_type == "SAS":
                storage_client = get_storage_client(credential=None, storage_account=None, account_url=sas_uri)
            else:
                parts = sas_uri.split("/")
                storage_account = parts[2].split(".")[0]
                container_name = parts[3]
                storage_client = get_storage_client(
                    credential=None,
                    storage_account=storage_account,
                    container_name=container_name,
                )

        else:
            ds = self._datastore_operation.get(ds_name, include_secrets=True)
            acc_name = ds.account_name

            if isinstance(ds.credentials, AccountKeyConfiguration):
                credential = ds.credentials.account_key
            else:
                try:
                    credential = ds.credentials.sas_token
                except Exception as e:  # pylint: disable=W0718
                    if not hasattr(ds.credentials, "sas_token"):
                        credential = self._datastore_operation._credential
                    else:
                        raise e

            if isinstance(ds, AzureDataLakeGen2Datastore):
                container = ds.filesystem
                try:
                    from azure.identity import ClientSecretCredential

                    token_credential = ClientSecretCredential(
                        tenant_id=ds.credentials["tenant_id"],
                        client_id=ds.credentials["client_id"],
                        client_secret=ds.credentials["client_secret"],
                        authority=ds.credentials["authority_url"],
                    )
                    credential = token_credential
                except (KeyError, TypeError):
                    pass

            else:
                container = ds.container_name
            datastore_type = ds.type
            storage_client = get_storage_client(
                credential=credential,
                container_name=container,
                storage_account=acc_name,
                storage_type=datastore_type,
            )

        path_file = "{}{}{}".format(download_path, path.sep, name)
        is_directory = storage_client.exists(f"{path_prefix.rstrip('/')}/")
        if is_directory:
            path_file = path.join(path_file, path.basename(path_prefix.rstrip("/")))
        module_logger.info("Downloading the model %s at %s\n", path_prefix, path_file)
        storage_client.download(starts_with=path_prefix, destination=path_file)

    @monitor_with_activity(ops_logger, "Model.Archive", ActivityType.PUBLICAPI)
    def archive(
        self,
        name: str,
        version: Optional[str] = None,
        label: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """Archive a model asset.

        :param name: Name of model asset.
        :type name: str
        :param version: Version of model asset.
        :type version: str
        :param label: Label of the model asset. (mutually exclusive with version)
        :type label: str

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START model_operations_archive]
                :end-before: [END model_operations_archive]
                :language: python
                :dedent: 8
                :caption: Archive a model.
        """
        _archive_or_restore(
            asset_operations=self,
            version_operation=self._model_versions_operation,
            container_operation=self._model_container_operation,
            is_archived=True,
            name=name,
            version=version,
            label=label,
        )

    @monitor_with_activity(ops_logger, "Model.Restore", ActivityType.PUBLICAPI)
    def restore(
        self,
        name: str,
        version: Optional[str] = None,
        label: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """Restore an archived model asset.

        :param name: Name of model asset.
        :type name: str
        :param version: Version of model asset.
        :type version: str
        :param label: Label of the model asset. (mutually exclusive with version)
        :type label: str

        .. admonition:: Example:

            .. literalinclude:: ../samples/ml_samples_misc.py
                :start-after: [START model_operations_restore]
                :end-before: [END model_operations_restore]
                :language: python
                :dedent: 8
                :caption: Restore an archived model.
        """
        _archive_or_restore(
            asset_operations=self,
            version_operation=self._model_versions_operation,
            container_operation=self._model_container_operation,
            is_archived=False,
            name=name,
            version=version,
            label=label,
        )

    @monitor_with_activity(ops_logger, "Model.List", ActivityType.PUBLICAPI)
    def list(
        self,
        name: Optional[str] = None,
        stage: Optional[str] = None,
        *,
        list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
    ) -> Iterable[Model]:
        """List all model assets in workspace.

        :param name: Name of the model.
        :type name: Optional[str]
        :param stage: The Model stage
        :type stage: Optional[str]
        :keyword list_view_type: View type for including/excluding (for example) archived models.
            Defaults to :attr:`ListViewType.ACTIVE_ONLY`.
        :paramtype list_view_type: ListViewType
        :return: An iterator like instance of Model objects
        :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.Model]
        """
        if name:
            return cast(
                Iterable[Model],
                (
                    self._model_versions_operation.list(
                        name=name,
                        registry_name=self._registry_name,
                        cls=lambda objs: [Model._from_rest_object(obj) for obj in objs],
                        **self._scope_kwargs,
                    )
                    if self._registry_name
                    else self._model_versions_operation.list(
                        name=name,
                        workspace_name=self._workspace_name,
                        cls=lambda objs: [Model._from_rest_object(obj) for obj in objs],
                        list_view_type=list_view_type,
                        stage=stage,
                        **self._scope_kwargs,
                    )
                ),
            )

        return cast(
            Iterable[Model],
            (
                self._model_container_operation.list(
                    registry_name=self._registry_name,
                    cls=lambda objs: [Model._from_container_rest_object(obj) for obj in objs],
                    list_view_type=list_view_type,
                    **self._scope_kwargs,
                )
                if self._registry_name
                else self._model_container_operation.list(
                    workspace_name=self._workspace_name,
                    cls=lambda objs: [Model._from_container_rest_object(obj) for obj in objs],
                    list_view_type=list_view_type,
                    **self._scope_kwargs,
                )
            ),
        )

    @monitor_with_activity(ops_logger, "Model.Share", ActivityType.PUBLICAPI)
    @experimental
    def share(
        self, name: str, version: str, *, share_with_name: str, share_with_version: str, registry_name: str
    ) -> Model:
        """Share a model asset from workspace to registry.

        :param name: Name of model asset.
        :type name: str
        :param version: Version of model asset.
        :type version: str
        :keyword share_with_name: Name of model asset to share with.
        :paramtype share_with_name: str
        :keyword share_with_version: Version of model asset to share with.
        :paramtype share_with_version: str
        :keyword registry_name: Name of the destination registry.
        :paramtype registry_name: str
        :return: Model asset object.
        :rtype: ~azure.ai.ml.entities.Model
        """

        #  Get workspace info to get workspace GUID
        workspace = self._service_client.workspaces.get(
            resource_group_name=self._resource_group_name,
            workspace_name=self._workspace_name,
        )
        workspace_guid = workspace.workspace_id
        workspace_location = workspace.location

        # Get model asset ID
        asset_id = ASSET_ID_FORMAT.format(
            workspace_location,
            workspace_guid,
            AzureMLResourceType.MODEL,
            name,
            version,
        )

        model_ref = WorkspaceAssetReference(
            name=share_with_name if share_with_name else name,
            version=share_with_version if share_with_version else version,
            asset_id=asset_id,
        )

        with self._set_registry_client(registry_name):
            return self.create_or_update(model_ref)

    def _get_latest_version(self, name: str) -> Model:
        """Returns the latest version of the asset with the given name.

        Latest is defined as the most recently created, not the most recently updated.
        """
        result = _get_latest(
            name,
            self._model_versions_operation,
            self._resource_group_name,
            self._workspace_name,
            self._registry_name,
        )
        return Model._from_rest_object(result)

    @contextmanager
    def _set_registry_client(self, registry_name: str) -> Generator:
        """Sets the registry client for the model operations.

        :param registry_name: Name of the registry.
        :type registry_name: str
        """
        rg_ = self._operation_scope._resource_group_name
        sub_ = self._operation_scope._subscription_id
        registry_ = self._operation_scope.registry_name
        client_ = self._service_client
        model_versions_operation_ = self._model_versions_operation

        try:
            _client, _rg, _sub = get_registry_client(self._service_client._config.credential, registry_name)
            self._operation_scope.registry_name = registry_name
            self._operation_scope._resource_group_name = _rg
            self._operation_scope._subscription_id = _sub
            self._service_client = _client
            self._model_versions_operation = _client.model_versions
            yield
        finally:
            self._operation_scope.registry_name = registry_
            self._operation_scope._resource_group_name = rg_
            self._operation_scope._subscription_id = sub_
            self._service_client = client_
            self._model_versions_operation = model_versions_operation_

    @experimental
    @monitor_with_activity(ops_logger, "Model.Package", ActivityType.PUBLICAPI)
    def package(self, name: str, version: str, package_request: ModelPackage, **kwargs: Any) -> Environment:
        """Package a model asset

        :param name: Name of model asset.
        :type name: str
        :param version: Version of model asset.
        :type version: str
        :param package_request: Model package request.
        :type package_request: ~azure.ai.ml.entities.ModelPackage
        :return: Environment object
        :rtype: ~azure.ai.ml.entities.Environment
        """

        is_deployment_flow = kwargs.pop("skip_to_rest", False)
        if not is_deployment_flow:
            orchestrators = OperationOrchestrator(
                operation_container=self._all_operations,  # type: ignore[arg-type]
                operation_scope=self._operation_scope,
                operation_config=self._operation_config,
            )

            # Create a code asset if code is not already an ARM ID
            if hasattr(package_request.inferencing_server, "code_configuration"):
                if package_request.inferencing_server.code_configuration and not is_ARM_id_for_resource(
                    package_request.inferencing_server.code_configuration.code,
                    AzureMLResourceType.CODE,
                ):
                    if package_request.inferencing_server.code_configuration.code.startswith(ARM_ID_PREFIX):
                        package_request.inferencing_server.code_configuration.code = orchestrators.get_asset_arm_id(
                            package_request.inferencing_server.code_configuration.code[len(ARM_ID_PREFIX) :],
                            azureml_type=AzureMLResourceType.CODE,
                        )
                    else:
                        package_request.inferencing_server.code_configuration.code = orchestrators.get_asset_arm_id(
                            Code(
                                base_path=package_request._base_path,
                                path=package_request.inferencing_server.code_configuration.code,
                            ),
                            azureml_type=AzureMLResourceType.CODE,
                        )
                if package_request.inferencing_server.code_configuration and hasattr(
                    package_request.inferencing_server.code_configuration, "code"
                ):
                    package_request.inferencing_server.code_configuration.code = (
                        "azureml:/" + package_request.inferencing_server.code_configuration.code
                    )

            if package_request.base_environment_source and hasattr(
                package_request.base_environment_source, "resource_id"
            ):
                if not package_request.base_environment_source.resource_id.startswith(REGISTRY_URI_FORMAT):
                    package_request.base_environment_source.resource_id = orchestrators.get_asset_arm_id(
                        package_request.base_environment_source.resource_id,
                        azureml_type=AzureMLResourceType.ENVIRONMENT,
                    )

                package_request.base_environment_source.resource_id = (
                    "azureml:/" + package_request.base_environment_source.resource_id
                    if not package_request.base_environment_source.resource_id.startswith(ARM_ID_PREFIX)
                    else package_request.base_environment_source.resource_id
                )

            # create ARM id for the target environment
            if self._operation_scope._workspace_location and self._operation_scope._workspace_id:
                package_request.target_environment_id = f"azureml://locations/{self._operation_scope._workspace_location}/workspaces/{self._operation_scope._workspace_id}/environments/{package_request.target_environment_id}"
            else:
                if self._all_operations is not None:
                    ws: Any = self._all_operations.all_operations.get("workspaces")
                    ws_details = ws.get(self._workspace_name)
                    workspace_location, workspace_id = (
                        ws_details.location,
                        ws_details._workspace_id,
                    )
                    package_request.target_environment_id = f"azureml://locations/{workspace_location}/workspaces/{workspace_id}/environments/{package_request.target_environment_id}"

            if package_request.environment_version is not None:
                package_request.target_environment_id = (
                    package_request.target_environment_id + f"/versions/{package_request.environment_version}"
                )
            package_request = package_request._to_rest_object()

        if self._registry_reference:
            package_request.target_environment_id = f"azureml://locations/{self._operation_scope._workspace_location}/workspaces/{self._operation_scope._workspace_id}/environments/{package_request.target_environment_id}"
        package_out = (
            self._model_versions_operation.begin_package(
                name=name,
                version=version,
                registry_name=self._registry_name if self._registry_name else self._registry_reference,
                body=package_request,
                **self._scope_kwargs,
            ).result()
            if self._registry_name or self._registry_reference
            else self._model_versions_operation.begin_package(
                name=name,
                version=version,
                workspace_name=self._workspace_name,
                body=package_request,
                **self._scope_kwargs,
            ).result()
        )
        if is_deployment_flow:  # No need to go through the schema, as this is for deployment notification only
            return package_out
        if hasattr(package_out, "target_environment_id"):
            environment_id = package_out.target_environment_id
        else:
            environment_id = package_out.additional_properties["targetEnvironmentId"]

        pattern = r"azureml://locations/(\w+)/workspaces/([\w-]+)/environments/([\w.-]+)/versions/(\d+)"
        parsed_id: Any = re.search(pattern, environment_id)

        if parsed_id:
            environment_name = parsed_id.group(3)
            environment_version = parsed_id.group(4)
        else:
            parsed_id = AMLVersionedArmId(environment_id)
            environment_name = parsed_id.asset_name
            environment_version = parsed_id.asset_version

        module_logger.info("\nPackage Created")
        if package_out is not None and package_out.__class__.__name__ == "PackageResponse":
            if self._registry_name:
                current_rg = self._scope_kwargs.pop("resource_group_name", None)
                self._scope_kwargs["resource_group_name"] = self._workspace_rg
                self._control_plane_client._config.subscription_id = self._workspace_sub
                env_out = self._control_plane_client.environment_versions.get(
                    name=environment_name,
                    version=environment_version,
                    workspace_name=self._workspace_name,
                    **self._scope_kwargs,
                )
                package_out = Environment._from_rest_object(env_out)
                self._scope_kwargs["resource_group_name"] = current_rg
            else:
                if self._all_operations is not None:
                    environment_operation = self._all_operations.all_operations[AzureMLResourceType.ENVIRONMENT]
                    package_out = environment_operation.get(name=environment_name, version=environment_version)

        return package_out

    def _get_model_properties(
        self, name: str, version: Optional[str] = None, label: Optional[str] = None
    ) -> Optional[Dict[str, Any]]:
        """
        Return the model properties if the model with this name exists.

        :param name: Model name.
        :type name: str
        :param version: Model version.
        :type version: Optional[str]
        :param label: model label.
        :type label: Optional[str]
        :return: Model properties, if the model exists, or None.
        """
        try:
            if version or label:
                return self.get(name, version, label).properties
            return self._get_latest_version(name).properties
        except (ResourceNotFoundError, ValidationException):
            return None