aboutsummaryrefslogtreecommitdiff
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access,arguments-renamed,unidiomatic-typecheck

import logging
import os
import typing
from abc import abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast

from azure.ai.ml._restclient.v2023_04_01_preview.models import CodeConfiguration as RestCodeConfiguration
from azure.ai.ml._restclient.v2023_04_01_preview.models import EndpointComputeType
from azure.ai.ml._restclient.v2023_04_01_preview.models import (
    KubernetesOnlineDeployment as RestKubernetesOnlineDeployment,
)
from azure.ai.ml._restclient.v2023_04_01_preview.models import ManagedOnlineDeployment as RestManagedOnlineDeployment
from azure.ai.ml._restclient.v2023_04_01_preview.models import OnlineDeployment as RestOnlineDeploymentData
from azure.ai.ml._restclient.v2023_04_01_preview.models import OnlineDeploymentProperties as RestOnlineDeploymentDetails
from azure.ai.ml._restclient.v2023_04_01_preview.models import Sku as RestSku
from azure.ai.ml._schema._deployment.online.online_deployment import (
    KubernetesOnlineDeploymentSchema,
    ManagedOnlineDeploymentSchema,
)
from azure.ai.ml._utils._arm_id_utils import _parse_endpoint_name_from_deployment_id
from azure.ai.ml._utils.utils import camel_to_snake
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, TYPE, ArmConstants
from azure.ai.ml.constants._endpoint import EndpointYamlFields
from azure.ai.ml.entities._assets import Code
from azure.ai.ml.entities._assets._artifacts.model import Model
from azure.ai.ml.entities._assets.environment import Environment
from azure.ai.ml.entities._deployment.code_configuration import CodeConfiguration
from azure.ai.ml.entities._deployment.data_collector import DataCollector
from azure.ai.ml.entities._deployment.deployment_settings import OnlineRequestSettings, ProbeSettings
from azure.ai.ml.entities._deployment.resource_requirements_settings import ResourceRequirementsSettings
from azure.ai.ml.entities._deployment.scale_settings import (
    DefaultScaleSettings,
    OnlineScaleSettings,
    TargetUtilizationScaleSettings,
)
from azure.ai.ml.entities._endpoint._endpoint_helpers import validate_endpoint_or_deployment_name
from azure.ai.ml.entities._util import load_from_dict
from azure.ai.ml.exceptions import (
    DeploymentException,
    ErrorCategory,
    ErrorTarget,
    ValidationErrorType,
    ValidationException,
)

from .deployment import Deployment

module_logger = logging.getLogger(__name__)


# pylint: disable=too-many-instance-attributes
class OnlineDeployment(Deployment):
    """Online endpoint deployment entity.

    :param name: Name of the deployment resource.
    :type name: str
    :param endpoint_name: Name of the endpoint resource, defaults to None
    :type endpoint_name: typing.Optional[str]
    :param tags: Tag dictionary. Tags can be added, removed, and updated, defaults to None
    :type tags: typing.Optional[typing.Dict[str, typing.Any]]
    :param properties: The asset property dictionary, defaults to None
    :type properties: typing.Optional[typing.Dict[str, typing.Any]]
    :param description: Description of the resource, defaults to None
    :type description: typing.Optional[str]
    :param model: Model entity for the endpoint deployment, defaults to None
    :type model: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Model]]
    :param data_collector: Data Collector entity for the endpoint deployment, defaults to None
    :type data_collector: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.DataCollector]]
    :param code_configuration: Code Configuration, defaults to None
    :type code_configuration: typing.Optional[~azure.ai.ml.entities.CodeConfiguration]
    :param environment: Environment entity for the endpoint deployment, defaults to None
    :type environment: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Environment]]
    :param app_insights_enabled: Is appinsights enabled, defaults to False
    :type app_insights_enabled: typing.Optional[bool]
    :param scale_settings: How the online deployment will scale, defaults to None
    :type scale_settings: typing.Optional[~azure.ai.ml.entities.OnlineScaleSettings]
    :param request_settings: Online Request Settings, defaults to None
    :type request_settings: typing.Optional[~azure.ai.ml.entities.OnlineRequestSettings]
    :param liveness_probe: Liveness probe settings, defaults to None
    :type liveness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
    :param readiness_probe: Readiness probe settings, defaults to None
    :type readiness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
    :param environment_variables: Environment variables that will be set in deployment, defaults to None
    :type environment_variables: typing.Optional[typing.Dict[str, str]]
    :param instance_count: The instance count used for this deployment, defaults to None
    :type instance_count: typing.Optional[int]
    :param instance_type: Azure compute sku, defaults to None
    :type instance_type: typing.Optional[str]
    :param model_mount_path: The path to mount the model in custom container, defaults to None
    :type model_mount_path: typing.Optional[str]
    :param code_path: Equivalent to code_configuration.code, will be ignored if code_configuration is present
        , defaults to None
    :type code_path: typing.Optional[typing.Union[str, os.PathLike]]
    :param scoring_script: Equivalent to code_configuration.code.scoring_script.
        Will be ignored if code_configuration is present, defaults to None
    :type scoring_script: typing.Optional[typing.Union[str, os.PathLike]]
    """

    def __init__(
        self,
        name: str,
        *,
        endpoint_name: Optional[str] = None,
        tags: Optional[Dict[str, typing.Any]] = None,
        properties: Optional[Dict[str, typing.Any]] = None,
        description: Optional[str] = None,
        model: Optional[Union[str, "Model"]] = None,
        data_collector: Optional[DataCollector] = None,
        code_configuration: Optional[CodeConfiguration] = None,
        environment: Optional[Union[str, "Environment"]] = None,
        app_insights_enabled: Optional[bool] = False,
        scale_settings: Optional[OnlineScaleSettings] = None,
        request_settings: Optional[OnlineRequestSettings] = None,
        liveness_probe: Optional[ProbeSettings] = None,
        readiness_probe: Optional[ProbeSettings] = None,
        environment_variables: Optional[Dict[str, str]] = None,
        instance_count: Optional[int] = None,
        instance_type: Optional[str] = None,
        model_mount_path: Optional[str] = None,
        code_path: Optional[Union[str, os.PathLike]] = None,  # promoted property from code_configuration.code
        scoring_script: Optional[Union[str, os.PathLike]] = None,  # promoted property code_configuration.scoring_script
        **kwargs: typing.Any,
    ):
        self._provisioning_state: Optional[str] = kwargs.pop("provisioning_state", None)

        super(OnlineDeployment, self).__init__(
            name=name,
            endpoint_name=endpoint_name,
            tags=tags,
            properties=properties,
            description=description,
            model=model,
            code_configuration=code_configuration,
            environment=environment,
            environment_variables=environment_variables,
            code_path=code_path,
            scoring_script=scoring_script,
            **kwargs,
        )

        self.app_insights_enabled = app_insights_enabled
        self.scale_settings = scale_settings
        self.request_settings = request_settings
        self.liveness_probe = liveness_probe
        self.readiness_probe = readiness_probe
        self.instance_count = instance_count
        self._arm_type = ArmConstants.ONLINE_DEPLOYMENT_TYPE
        self.model_mount_path = model_mount_path
        self.instance_type = instance_type
        self.data_collector: Any = data_collector

    @property
    def provisioning_state(self) -> Optional[str]:
        """Deployment provisioning state, readonly.

        :return: Deployment provisioning state.
        :rtype: typing.Optional[str]
        """
        return self._provisioning_state

    def _generate_dependencies(self) -> Tuple:
        """Convert dependencies into ARM id or REST wrapper.

        :return: A 3-tuple of the code configuration, environment ID, and model ID.
        :rtype: Tuple[RestCodeConfiguration, str, str]
        """
        code = None

        if self.code_configuration:
            self.code_configuration._validate()
            if self.code_configuration.code is not None:
                if isinstance(self.code_configuration.code, str):
                    code_id = self.code_configuration.code
                elif not isinstance(self.code_configuration.code, os.PathLike):
                    code_id = self.code_configuration.code.id

                code = RestCodeConfiguration(
                    code_id=code_id,  # pylint: disable=possibly-used-before-assignment
                    scoring_script=self.code_configuration.scoring_script,
                )

        model_id = None
        if self.model:
            model_id = self.model if isinstance(self.model, str) else self.model.id

        environment_id = None
        if self.environment:
            environment_id = self.environment if isinstance(self.environment, str) else self.environment.id

        return code, environment_id, model_id

    @abstractmethod
    def _to_dict(self) -> Dict:
        pass

    @abstractmethod
    def _to_arm_resource_param(self, **kwargs: Any) -> Dict:
        pass

    @abstractmethod
    def _to_rest_object(self) -> RestOnlineDeploymentData:
        pass

    @classmethod
    def _from_rest_object(cls, deployment: RestOnlineDeploymentData) -> RestOnlineDeploymentDetails:
        if deployment.properties.endpoint_compute_type == EndpointComputeType.KUBERNETES:
            return KubernetesOnlineDeployment._from_rest_object(deployment)
        if deployment.properties.endpoint_compute_type == EndpointComputeType.MANAGED:
            return ManagedOnlineDeployment._from_rest_object(deployment)

        msg = f"Unsupported online endpoint type {deployment.properties.endpoint_compute_type}."
        raise DeploymentException(
            message=msg,
            target=ErrorTarget.ONLINE_DEPLOYMENT,
            no_personal_data_message=msg,
            error_category=ErrorCategory.SYSTEM_ERROR,
        )

    def _get_arm_resource(self, **kwargs: Any) -> Dict:
        resource: dict = super(OnlineDeployment, self)._get_arm_resource(**kwargs)
        depends_on = []
        if self.environment and isinstance(self.environment, Environment):
            depends_on.append(f"{self.environment._arm_type}Deployment")
        if self.code_configuration and self.code_configuration.code and isinstance(self.code_configuration.code, Code):
            depends_on.append(f"{self.code_configuration.code._arm_type}Deployment")
        if self.model and isinstance(self.model, Model):
            depends_on.append(f"{self.model._arm_type}Deployment")
        resource[ArmConstants.DEPENDSON_PARAMETER_NAME] = depends_on
        return resource

    def _get_arm_resource_and_params(self, **kwargs: Any) -> List:
        resource_param_tuple_list = [(self._get_arm_resource(**kwargs), self._to_arm_resource_param(**kwargs))]
        if self.environment and isinstance(self.environment, Environment):
            resource_param_tuple_list.extend(self.environment._get_arm_resource_and_params())
        if self.code_configuration and self.code_configuration.code and isinstance(self.code_configuration.code, Code):
            resource_param_tuple_list.extend(self.code_configuration.code._get_arm_resource_and_params())
        if self.model and isinstance(self.model, Model):
            resource_param_tuple_list.extend(self.model._get_arm_resource_and_params())
        return resource_param_tuple_list

    def _validate_name(self) -> None:
        if self.name:
            validate_endpoint_or_deployment_name(self.name, is_deployment=True)

    def _merge_with(self, other: Any) -> None:
        if other:
            if self.name != other.name:
                msg = "The deployment name: {} and {} are not matched when merging."
                raise ValidationException(
                    message=msg.format(self.name, other.name),
                    target=ErrorTarget.ONLINE_DEPLOYMENT,
                    no_personal_data_message=msg.format("[name1]", "[name2]"),
                    error_category=ErrorCategory.USER_ERROR,
                    error_type=ValidationErrorType.INVALID_VALUE,
                )
            super()._merge_with(other)
            self.app_insights_enabled = other.app_insights_enabled or self.app_insights_enabled
            # Adding noqa: Fix E721 do not compare types, use 'isinstance()'
            # isinstance will include checking for subclasses, which is explicitly undesired by a logic.
            if self.scale_settings and type(self.scale_settings) == type(other.scale_settings):  # noqa
                self.scale_settings._merge_with(other.scale_settings)
            else:
                self.scale_settings = other.scale_settings
            if self.request_settings:
                self.request_settings._merge_with(other.request_settings)
            else:
                self.request_settings = other.request_settings
            if self.liveness_probe:
                self.liveness_probe._merge_with(other.liveness_probe)
            else:
                self.liveness_probe = other.liveness_probe
            if self.readiness_probe:
                self.readiness_probe._merge_with(other.readiness_probe)
            else:
                self.readiness_probe = other.readiness_probe
            self.instance_count = other.instance_count or self.instance_count
            self.instance_type = other.instance_type or self.instance_type

    @classmethod
    def _set_scale_settings(cls, data: dict) -> None:
        if not hasattr(data, EndpointYamlFields.SCALE_SETTINGS):
            return

        scale_settings = data[EndpointYamlFields.SCALE_SETTINGS]
        keyName = TYPE
        if scale_settings and scale_settings[keyName] == "default":
            scale_copy = scale_settings.copy()
            for key in scale_copy:
                if key != keyName:
                    scale_settings.pop(key, None)

    @classmethod
    def _load(
        cls,
        data: Optional[Dict] = None,
        yaml_path: Optional[Union[os.PathLike, str]] = None,
        params_override: Optional[list] = None,
        **kwargs: Any,
    ) -> "OnlineDeployment":
        data = data or {}
        params_override = params_override or []
        context = {
            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path.cwd(),
            PARAMS_OVERRIDE_KEY: params_override,
        }

        deployment_type = data.get("type", None)

        if deployment_type == camel_to_snake(EndpointComputeType.KUBERNETES.value):
            res_kub: OnlineDeployment = load_from_dict(KubernetesOnlineDeploymentSchema, data, context, **kwargs)
            return res_kub

        res_manage: OnlineDeployment = load_from_dict(ManagedOnlineDeploymentSchema, data, context, **kwargs)
        return res_manage


class KubernetesOnlineDeployment(OnlineDeployment):
    """Kubernetes Online endpoint deployment entity.

    :param name: Name of the deployment resource.
    :type name: str
    :param endpoint_name: Name of the endpoint resource, defaults to None
    :type endpoint_name: typing.Optional[str]
    :param tags: Tag dictionary. Tags can be added, removed, and updated., defaults to None
    :type tags: typing.Optional[typing.Dict[str, typing.Any]]
    :param properties: The asset property dictionary, defaults to None
    :type properties: typing.Optional[typing.Dict[str, typing.Any]]
    :param description: Description of the resource, defaults to None
    :type description: typing.Optional[str]
    :param model: Model entity for the endpoint deployment, defaults to None
    :type model: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Model]]
    :param code_configuration: Code Configuration, defaults to None
    :type code_configuration: typing.Optional[~azure.ai.ml.entities.CodeConfiguration]
    :param environment: Environment entity for the endpoint deployment, defaults to None
    :type environment: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Environment]]
    :param app_insights_enabled: Is appinsights enabled, defaults to False
    :type app_insights_enabled: bool
    :param scale_settings: How the online deployment will scale, defaults to None
    :type scale_settings: typing.Optional[typing.Union[~azure.ai.ml.entities.DefaultScaleSettings
        , ~azure.ai.ml.entities.TargetUtilizationScaleSettings]]
    :param request_settings: Online Request Settings, defaults to None
    :type request_settings: typing.Optional[OnlineRequestSettings]
    :param liveness_probe: Liveness probe settings, defaults to None
    :type liveness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
    :param readiness_probe: Readiness probe settings, defaults to None
    :type readiness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
    :param environment_variables: Environment variables that will be set in deployment, defaults to None
    :type environment_variables: typing.Optional[typing.Dict[str, str]]
    :param resources: Resource requirements settings, defaults to None
    :type resources: typing.Optional[~azure.ai.ml.entities.ResourceRequirementsSettings]
    :param instance_count: The instance count used for this deployment, defaults to None
    :type instance_count: typing.Optional[int]
    :param instance_type: The instance type defined by K8S cluster admin, defaults to None
    :type instance_type: typing.Optional[str]
    :param code_path: Equivalent to code_configuration.code, will be ignored if code_configuration is present
        , defaults to None
    :type code_path: typing.Optional[typing.Union[str, os.PathLike]]
    :param scoring_script: Equivalent to code_configuration.code.scoring_script.
        Will be ignored if code_configuration is present, defaults to None
    :type scoring_script: typing.Optional[typing.Union[str, os.PathLike]]
    """

    def __init__(
        self,
        *,
        name: str,
        endpoint_name: Optional[str] = None,
        tags: Optional[Dict[str, typing.Any]] = None,
        properties: Optional[Dict[str, typing.Any]] = None,
        description: Optional[str] = None,
        model: Optional[Union[str, "Model"]] = None,
        code_configuration: Optional[CodeConfiguration] = None,
        environment: Optional[Union[str, "Environment"]] = None,
        app_insights_enabled: bool = False,
        scale_settings: Optional[Union[DefaultScaleSettings, TargetUtilizationScaleSettings]] = None,
        request_settings: Optional[OnlineRequestSettings] = None,
        liveness_probe: Optional[ProbeSettings] = None,
        readiness_probe: Optional[ProbeSettings] = None,
        environment_variables: Optional[Dict[str, str]] = None,
        resources: Optional[ResourceRequirementsSettings] = None,
        instance_count: Optional[int] = None,
        instance_type: Optional[str] = None,
        code_path: Optional[Union[str, os.PathLike]] = None,  # promoted property from code_configuration.code
        scoring_script: Optional[
            Union[str, os.PathLike]
        ] = None,  # promoted property from code_configuration.scoring_script
        **kwargs: Any,
    ):
        kwargs["type"] = EndpointComputeType.KUBERNETES.value
        super(KubernetesOnlineDeployment, self).__init__(
            name=name,
            endpoint_name=endpoint_name,
            tags=tags,
            properties=properties,
            description=description,
            model=model,
            code_configuration=code_configuration,
            environment=environment,
            environment_variables=environment_variables,
            instance_count=instance_count,
            instance_type=instance_type,
            app_insights_enabled=app_insights_enabled,
            scale_settings=scale_settings,
            request_settings=request_settings,
            liveness_probe=liveness_probe,
            readiness_probe=readiness_probe,
            code_path=code_path,
            scoring_script=scoring_script,
            **kwargs,
        )

        self.resources = resources

    def _to_dict(self) -> Dict:
        res: dict = KubernetesOnlineDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
        return res

    # pylint: disable=arguments-differ
    def _to_rest_object(self, location: str) -> RestOnlineDeploymentData:  # type: ignore
        self._validate()
        code, environment, model = self._generate_dependencies()

        properties = RestKubernetesOnlineDeployment(
            code_configuration=code,
            environment_id=environment,
            model=model,
            model_mount_path=self.model_mount_path,
            scale_settings=self.scale_settings._to_rest_object() if self.scale_settings else None,
            properties=self.properties,
            description=self.description,
            environment_variables=self.environment_variables,
            app_insights_enabled=self.app_insights_enabled,
            request_settings=self.request_settings._to_rest_object() if self.request_settings else None,
            liveness_probe=self.liveness_probe._to_rest_object() if self.liveness_probe else None,
            readiness_probe=self.readiness_probe._to_rest_object() if self.readiness_probe else None,
            container_resource_requirements=self.resources._to_rest_object() if self.resources else None,
            instance_type=self.instance_type if self.instance_type else None,
            data_collector=self.data_collector._to_rest_object() if self.data_collector else None,
        )
        sku = RestSku(name="Default", capacity=self.instance_count)

        return RestOnlineDeploymentData(location=location, properties=properties, tags=self.tags, sku=sku)

    def _to_arm_resource_param(self, **kwargs: Any) -> Dict:
        rest_object = self._to_rest_object(**kwargs)
        properties = rest_object.properties
        sku = rest_object.sku
        tags = rest_object.tags

        return {
            self._arm_type: {
                ArmConstants.NAME: self.name,
                ArmConstants.PROPERTIES_PARAMETER_NAME: self._serialize.body(properties, "K8SOnlineDeployment"),
                ArmConstants.SKU: self._serialize.body(sku, "Sku"),
                ArmConstants.TAGS: tags,
            }
        }

    def _merge_with(self, other: Any) -> None:
        if other:
            super()._merge_with(other)
            if self.resources:
                self.resources._merge_with(other.resources)
            else:
                self.resources = other.resources

    def _validate(self) -> None:
        self._validate_name()

    @classmethod
    def _from_rest_object(cls, resource: RestOnlineDeploymentData) -> "KubernetesOnlineDeployment":
        deployment = resource.properties

        code_config = (
            CodeConfiguration(
                code=deployment.code_configuration.code_id,
                scoring_script=deployment.code_configuration.scoring_script,
            )
            if deployment.code_configuration
            else None
        )

        return KubernetesOnlineDeployment(
            id=resource.id,
            name=resource.name,
            tags=resource.tags,
            properties=deployment.properties,
            description=deployment.description,
            request_settings=OnlineRequestSettings._from_rest_object(deployment.request_settings),
            model=deployment.model,
            code_configuration=code_config,
            environment=deployment.environment_id,
            resources=ResourceRequirementsSettings._from_rest_object(deployment.container_resource_requirements),
            app_insights_enabled=deployment.app_insights_enabled,
            scale_settings=cast(
                Optional[Union[DefaultScaleSettings, TargetUtilizationScaleSettings]],
                OnlineScaleSettings._from_rest_object(deployment.scale_settings),
            ),
            liveness_probe=ProbeSettings._from_rest_object(deployment.liveness_probe),
            readiness_probe=ProbeSettings._from_rest_object(deployment.readiness_probe),
            environment_variables=deployment.environment_variables,
            endpoint_name=_parse_endpoint_name_from_deployment_id(resource.id),
            instance_count=resource.sku.capacity if resource.sku else None,
            instance_type=deployment.instance_type,
            data_collector=(
                DataCollector._from_rest_object(deployment.data_collector)
                if hasattr(deployment, "data_collector") and deployment.data_collector
                else None
            ),
            provisioning_state=deployment.provisioning_state if hasattr(deployment, "provisioning_state") else None,
        )


class ManagedOnlineDeployment(OnlineDeployment):
    """Managed Online endpoint deployment entity.

    :param name: Name of the deployment resource
    :type name: str
    :param endpoint_name: Name of the endpoint resource, defaults to None
    :type endpoint_name: typing.Optional[str]
    :param tags: Tag dictionary. Tags can be added, removed, and updated., defaults to None
    :type tags: typing.Optional[typing.Dict[str, typing.Any]]
    :param properties: The asset property dictionary, defaults to None
    :type properties: typing.Optional[typing.Dict[str, typing.Any]]
    :param description: Description of the resource, defaults to None
    :type description: typing.Optional[str]
    :param model: Model entity for the endpoint deployment, defaults to None
    :type model: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Model]]
    :param code_configuration: Code Configuration, defaults to None
    :type code_configuration: typing.Optional[~azure.ai.ml.entities.CodeConfiguration]
    :param environment: Environment entity for the endpoint deployment, defaults to None
    :type environment: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Environment]]
    :param app_insights_enabled: Is appinsights enabled, defaults to False
    :type app_insights_enabled: bool
    :param scale_settings: How the online deployment will scale, defaults to None
    :type scale_settings: typing.Optional[typing.Union[~azure.ai.ml.entities.DefaultScaleSettings
        , ~azure.ai.ml.entities.TargetUtilizationScaleSettings]]
    :param request_settings: Online Request Settings, defaults to None
    :type request_settings: typing.Optional[OnlineRequestSettings]
    :param liveness_probe: Liveness probe settings, defaults to None
    :type liveness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
    :param readiness_probe: Readiness probe settings, defaults to None
    :type readiness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
    :param environment_variables: Environment variables that will be set in deployment, defaults to None
    :type environment_variables: typing.Optional[typing.Dict[str, str]]
    :param instance_type: Azure compute sku, defaults to None
    :type instance_type: typing.Optional[str]
    :param instance_count: The instance count used for this deployment, defaults to None
    :type instance_count: typing.Optional[int]
    :param egress_public_network_access: Whether to restrict communication between a deployment and the
         Azure resources used to by the deployment. Allowed values are: "enabled", "disabled", defaults to None
    :type egress_public_network_access: typing.Optional[str]
    :param code_path: Equivalent to code_configuration.code, will be ignored if code_configuration is present
        , defaults to None
    :type code_path: typing.Optional[typing.Union[str, os.PathLike]]
    :param scoring_script_path: Equivalent to code_configuration.scoring_script, will be ignored if
        code_configuration is present, defaults to None
    :type scoring_script_path: typing.Optional[typing.Union[str, os.PathLike]]
    :param data_collector: Data collector, defaults to None
    :type data_collector: typing.Optional[typing.List[~azure.ai.ml.entities.DataCollector]]
    """

    def __init__(
        self,
        *,
        name: str,
        endpoint_name: Optional[str] = None,
        tags: Optional[Dict[str, typing.Any]] = None,
        properties: Optional[Dict[str, typing.Any]] = None,
        description: Optional[str] = None,
        model: Optional[Union[str, "Model"]] = None,
        code_configuration: Optional[CodeConfiguration] = None,
        environment: Optional[Union[str, "Environment"]] = None,
        app_insights_enabled: bool = False,
        scale_settings: Optional[Union[DefaultScaleSettings, TargetUtilizationScaleSettings]] = None,
        request_settings: Optional[OnlineRequestSettings] = None,
        liveness_probe: Optional[ProbeSettings] = None,
        readiness_probe: Optional[ProbeSettings] = None,
        environment_variables: Optional[Dict[str, str]] = None,
        instance_type: Optional[str] = None,
        instance_count: Optional[int] = None,
        egress_public_network_access: Optional[str] = None,
        code_path: Optional[Union[str, os.PathLike]] = None,  # promoted property from code_configuration.code
        scoring_script: Optional[
            Union[str, os.PathLike]
        ] = None,  # promoted property from code_configuration.scoring_script
        data_collector: Optional[DataCollector] = None,
        **kwargs: Any,
    ):
        kwargs["type"] = EndpointComputeType.MANAGED.value
        self.private_network_connection = kwargs.pop("private_network_connection", None)
        self.package_model = kwargs.pop("package_model", False)

        super(ManagedOnlineDeployment, self).__init__(
            name=name,
            endpoint_name=endpoint_name,
            tags=tags,
            properties=properties,
            description=description,
            model=model,
            code_configuration=code_configuration,
            environment=environment,
            environment_variables=environment_variables,
            app_insights_enabled=app_insights_enabled,
            scale_settings=scale_settings,
            request_settings=request_settings,
            liveness_probe=liveness_probe,
            readiness_probe=readiness_probe,
            instance_count=instance_count,
            instance_type=instance_type,
            code_path=code_path,
            scoring_script=scoring_script,
            data_collector=data_collector,
            **kwargs,
        )

        self.readiness_probe = readiness_probe
        self.egress_public_network_access = egress_public_network_access

    def _to_dict(self) -> Dict:
        res: dict = ManagedOnlineDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
        return res

    # pylint: disable=arguments-differ
    def _to_rest_object(self, location: str) -> RestOnlineDeploymentData:  # type: ignore
        self._validate()
        code, environment, model = self._generate_dependencies()
        properties = RestManagedOnlineDeployment(
            code_configuration=code,
            environment_id=environment,
            model=model,
            model_mount_path=self.model_mount_path,
            scale_settings=self.scale_settings._to_rest_object() if self.scale_settings else None,
            properties=self.properties,
            description=self.description,
            environment_variables=self.environment_variables,
            app_insights_enabled=self.app_insights_enabled,
            request_settings=self.request_settings._to_rest_object() if self.request_settings else None,
            liveness_probe=self.liveness_probe._to_rest_object() if self.liveness_probe else None,
            instance_type=self.instance_type,
            readiness_probe=self.readiness_probe._to_rest_object() if self.readiness_probe else None,
            data_collector=self.data_collector._to_rest_object() if self.data_collector else None,
        )
        # TODO: SKU name is defaulted to value "Default" since service side requires it.
        #  Should be removed once service side defaults it.
        sku = RestSku(name="Default", capacity=self.instance_count)

        # mfe is expecting private network connection to be in both the attribute level
        # as well as in the properties dictionary.
        if hasattr(self, "private_network_connection") and self.private_network_connection:
            properties.private_network_connection = self.private_network_connection
            properties.properties["private-network-connection"] = self.private_network_connection
        if hasattr(self, "egress_public_network_access") and self.egress_public_network_access:
            properties.egress_public_network_access = self.egress_public_network_access
        return RestOnlineDeploymentData(location=location, properties=properties, tags=self.tags, sku=sku)

    def _to_arm_resource_param(self, **kwargs: Any) -> Dict:
        rest_object = self._to_rest_object(**kwargs)
        properties = rest_object.properties
        sku = rest_object.sku
        tags = rest_object.tags

        return {
            self._arm_type: {
                ArmConstants.NAME: self.name,
                ArmConstants.PROPERTIES_PARAMETER_NAME: self._serialize.body(properties, "ManagedOnlineDeployment"),
                ArmConstants.SKU: self._serialize.body(sku, "Sku"),
                ArmConstants.TAGS: tags,
            }
        }

    @classmethod
    def _from_rest_object(cls, resource: RestOnlineDeploymentData) -> "ManagedOnlineDeployment":
        deployment = resource.properties

        code_config = (
            CodeConfiguration(
                code=deployment.code_configuration.code_id,
                scoring_script=deployment.code_configuration.scoring_script,
            )
            if deployment.code_configuration
            else None
        )

        return ManagedOnlineDeployment(
            id=resource.id,
            name=resource.name,
            tags=resource.tags,
            properties=deployment.properties,
            description=deployment.description,
            request_settings=OnlineRequestSettings._from_rest_object(deployment.request_settings),
            model=(deployment.model if deployment.model else None),
            code_configuration=code_config,
            environment=deployment.environment_id,
            app_insights_enabled=deployment.app_insights_enabled,
            scale_settings=OnlineScaleSettings._from_rest_object(deployment.scale_settings),  # type: ignore
            liveness_probe=ProbeSettings._from_rest_object(deployment.liveness_probe),
            environment_variables=deployment.environment_variables,
            readiness_probe=ProbeSettings._from_rest_object(deployment.readiness_probe),
            instance_type=deployment.instance_type,
            endpoint_name=_parse_endpoint_name_from_deployment_id(resource.id),
            instance_count=resource.sku.capacity,
            private_network_connection=(
                deployment.private_network_connection if hasattr(deployment, "private_network_connection") else None
            ),
            egress_public_network_access=deployment.egress_public_network_access,
            data_collector=(
                DataCollector._from_rest_object(deployment.data_collector)
                if hasattr(deployment, "data_collector") and deployment.data_collector
                else None
            ),
            provisioning_state=deployment.provisioning_state if hasattr(deployment, "provisioning_state") else None,
            creation_context=resource.system_data,
        )

    def _merge_with(self, other: Any) -> None:
        if other:
            super()._merge_with(other)
            self.instance_type = other.instance_type or self.instance_type

    def _validate(self) -> None:
        self._validate_name()
        self._validate_scale_settings()

    def _validate_scale_settings(self) -> None:
        if self.scale_settings:
            if not isinstance(self.scale_settings, DefaultScaleSettings):
                msg = "ManagedOnlineEndpoint supports DefaultScaleSettings only."
                raise ValidationException(
                    message=msg,
                    target=ErrorTarget.ONLINE_DEPLOYMENT,
                    no_personal_data_message=msg,
                    error_category=ErrorCategory.USER_ERROR,
                    error_type=ValidationErrorType.INVALID_VALUE,
                )