about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/__init__.py5
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/batch_deployment.py356
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/batch_job.py38
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/code_configuration.py93
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/container_resource_settings.py74
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/data_asset.py38
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/data_collector.py84
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment.py213
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment_collection.py62
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment_settings.py200
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/event_hub.py32
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/job_definition.py58
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/model_batch_deployment.py207
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/model_batch_deployment_settings.py81
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/online_deployment.py742
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/oversize_data_config.py25
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/payload_response.py26
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/pipeline_component_batch_deployment.py150
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/request_logging.py39
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/resource_requirements_settings.py84
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/run_settings.py50
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/scale_settings.py173
22 files changed, 2830 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/__init__.py
new file mode 100644
index 00000000..fdf8caba
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/__init__.py
@@ -0,0 +1,5 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/batch_deployment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/batch_deployment.py
new file mode 100644
index 00000000..59b23eb8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/batch_deployment.py
@@ -0,0 +1,356 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import logging
+from os import PathLike
+from pathlib import Path
+from typing import Any, Dict, Optional, Union
+
+from azure.ai.ml._restclient.v2024_01_01_preview.models import BatchDeployment as BatchDeploymentData
+from azure.ai.ml._restclient.v2024_01_01_preview.models import BatchDeploymentProperties as RestBatchDeployment
+from azure.ai.ml._restclient.v2024_01_01_preview.models import BatchOutputAction
+from azure.ai.ml._restclient.v2024_01_01_preview.models import CodeConfiguration as RestCodeConfiguration
+from azure.ai.ml._restclient.v2024_01_01_preview.models import IdAssetReference
+from azure.ai.ml._schema._deployment.batch.batch_deployment import BatchDeploymentSchema
+from azure.ai.ml._utils._arm_id_utils import _parse_endpoint_name_from_deployment_id
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY
+from azure.ai.ml.constants._deployment import BatchDeploymentOutputAction
+from azure.ai.ml.entities._assets import Environment, Model
+from azure.ai.ml.entities._deployment.deployment_settings import BatchRetrySettings
+from azure.ai.ml.entities._job.resource_configuration import ResourceConfiguration
+from azure.ai.ml.entities._system_data import SystemData
+from azure.ai.ml.entities._util import load_from_dict
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
+
+from .code_configuration import CodeConfiguration
+from .deployment import Deployment
+
+module_logger = logging.getLogger(__name__)
+
+
+class BatchDeployment(Deployment):
+    """Batch endpoint deployment entity.
+
+    :param name: the name of the batch deployment
+    :type name: str
+    :param description: Description of the resource.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict[str, str]
+    :param properties: The asset property dictionary.
+    :type properties: dict[str, str]
+    :param model: Model entity for the endpoint deployment, defaults to None
+    :type model: Union[str, Model]
+    :param code_configuration: defaults to None
+    :type code_configuration: CodeConfiguration
+    :param environment: Environment entity for the endpoint deployment., defaults to None
+    :type environment: Union[str, Environment]
+    :param compute: Compute target for batch inference operation.
+    :type compute: str
+    :param output_action: Indicates how the output will be organized. Possible values include:
+     "summary_only", "append_row". Defaults to "append_row"
+    :type output_action: str or ~azure.ai.ml.constants._deployment.BatchDeploymentOutputAction
+    :param output_file_name: Customized output file name for append_row output action, defaults to "predictions.csv"
+    :type output_file_name: str
+    :param max_concurrency_per_instance: Indicates maximum number of parallelism per instance, defaults to 1
+    :type max_concurrency_per_instance: int
+    :param error_threshold: Error threshold, if the error count for the entire input goes above
+        this value,
+        the batch inference will be aborted. Range is [-1, int.MaxValue]
+        -1 value indicates, ignore all failures during batch inference
+        For FileDataset count of file failures
+        For TabularDataset, this is the count of record failures, defaults to -1
+    :type error_threshold: int
+    :param retry_settings: Retry settings for a batch inference operation, defaults to None
+    :type retry_settings: BatchRetrySettings
+    :param resources: Indicates compute configuration for the job.
+    :type resources: ~azure.mgmt.machinelearningservices.models.ResourceConfiguration
+    :param logging_level: Logging level for batch inference operation, defaults to "info"
+    :type logging_level: str
+    :param mini_batch_size: Size of the mini-batch passed to each batch invocation, defaults to 10
+    :type mini_batch_size: int
+    :param environment_variables: Environment variables that will be set in deployment.
+    :type environment_variables: dict
+    :param code_path: Folder path to local code assets. Equivalent to code_configuration.code.
+    :type code_path: Union[str, PathLike]
+    :param scoring_script: Scoring script name. Equivalent to code_configuration.code.scoring_script.
+    :type scoring_script: Union[str, PathLike]
+    :param instance_count: Number of instances the interfering will run on. Equivalent to resources.instance_count.
+    :type instance_count: int
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if BatchDeployment cannot be successfully validated.
+        Details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        endpoint_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict[str, Any]] = None,
+        properties: Optional[Dict[str, str]] = None,
+        model: Optional[Union[str, Model]] = None,
+        code_configuration: Optional[CodeConfiguration] = None,
+        environment: Optional[Union[str, Environment]] = None,
+        compute: Optional[str] = None,
+        resources: Optional[ResourceConfiguration] = None,
+        output_file_name: Optional[str] = None,
+        output_action: Optional[Union[BatchDeploymentOutputAction, str]] = None,
+        error_threshold: Optional[int] = None,
+        retry_settings: Optional[BatchRetrySettings] = None,
+        logging_level: Optional[str] = None,
+        mini_batch_size: Optional[int] = None,
+        max_concurrency_per_instance: Optional[int] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        code_path: Optional[Union[str, PathLike]] = None,  # promoted property from code_configuration.code
+        scoring_script: Optional[
+            Union[str, PathLike]
+        ] = None,  # promoted property from code_configuration.scoring_script
+        instance_count: Optional[int] = None,  # promoted property from resources.instance_count
+        **kwargs: Any,
+    ) -> None:
+        self._provisioning_state: Optional[str] = kwargs.pop("provisioning_state", None)
+
+        super(BatchDeployment, self).__init__(
+            name=name,
+            endpoint_name=endpoint_name,
+            properties=properties,
+            tags=tags,
+            description=description,
+            model=model,
+            code_configuration=code_configuration,
+            environment=environment,
+            environment_variables=environment_variables,
+            code_path=code_path,
+            scoring_script=scoring_script,
+            **kwargs,
+        )
+
+        self.compute = compute
+        self.resources = resources
+        self.output_action = output_action
+        self.output_file_name = output_file_name
+        self.error_threshold = error_threshold
+        self.retry_settings = retry_settings
+        self.logging_level = logging_level
+        self.mini_batch_size = mini_batch_size
+        self.max_concurrency_per_instance = max_concurrency_per_instance
+
+        if self.resources and instance_count:
+            msg = "Can't set instance_count when resources is provided."
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.BATCH_DEPLOYMENT,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+
+        if not self.resources and instance_count:
+            self.resources = ResourceConfiguration(instance_count=instance_count)
+
+    @property
+    def instance_count(self) -> Optional[int]:
+        return self.resources.instance_count if self.resources else None
+
+    @instance_count.setter
+    def instance_count(self, value: int) -> None:
+        if not self.resources:
+            self.resources = ResourceConfiguration()
+
+        self.resources.instance_count = value
+
+    @property
+    def provisioning_state(self) -> Optional[str]:
+        """Batch deployment provisioning state, readonly.
+
+        :return: Batch deployment provisioning state.
+        :rtype: Optional[str]
+        """
+        return self._provisioning_state
+
+    def _to_dict(self) -> Dict:
+        res: dict = BatchDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
+
+    @classmethod
+    def _rest_output_action_to_yaml_output_action(cls, rest_output_action: str) -> str:
+        output_switcher = {
+            BatchOutputAction.APPEND_ROW: BatchDeploymentOutputAction.APPEND_ROW,
+            BatchOutputAction.SUMMARY_ONLY: BatchDeploymentOutputAction.SUMMARY_ONLY,
+        }
+
+        return output_switcher.get(rest_output_action, rest_output_action)
+
+    @classmethod
+    def _yaml_output_action_to_rest_output_action(cls, yaml_output_action: Any) -> str:
+        output_switcher = {
+            BatchDeploymentOutputAction.APPEND_ROW: BatchOutputAction.APPEND_ROW,
+            BatchDeploymentOutputAction.SUMMARY_ONLY: BatchOutputAction.SUMMARY_ONLY,
+        }
+
+        return output_switcher.get(yaml_output_action, yaml_output_action)
+
+    # pylint: disable=arguments-differ
+    def _to_rest_object(self, location: str) -> BatchDeploymentData:  # type: ignore
+        self._validate()
+        code_config = (
+            RestCodeConfiguration(
+                code_id=self.code_configuration.code,
+                scoring_script=self.code_configuration.scoring_script,
+            )
+            if self.code_configuration
+            else None
+        )
+        model = IdAssetReference(asset_id=self.model) if self.model else None
+        environment = self.environment
+
+        batch_deployment: RestBatchDeployment = None
+        if isinstance(self.output_action, str):
+            batch_deployment = RestBatchDeployment(
+                compute=self.compute,
+                description=self.description,
+                resources=self.resources._to_rest_object() if self.resources else None,
+                code_configuration=code_config,
+                environment_id=environment,
+                model=model,
+                output_file_name=self.output_file_name,
+                output_action=BatchDeployment._yaml_output_action_to_rest_output_action(self.output_action),
+                error_threshold=self.error_threshold,
+                retry_settings=self.retry_settings._to_rest_object() if self.retry_settings else None,
+                logging_level=self.logging_level,
+                mini_batch_size=self.mini_batch_size,
+                max_concurrency_per_instance=self.max_concurrency_per_instance,
+                environment_variables=self.environment_variables,
+                properties=self.properties,
+            )
+        else:
+            batch_deployment = RestBatchDeployment(
+                compute=self.compute,
+                description=self.description,
+                resources=self.resources._to_rest_object() if self.resources else None,
+                code_configuration=code_config,
+                environment_id=environment,
+                model=model,
+                output_file_name=self.output_file_name,
+                output_action=None,
+                error_threshold=self.error_threshold,
+                retry_settings=self.retry_settings._to_rest_object() if self.retry_settings else None,
+                logging_level=self.logging_level,
+                mini_batch_size=self.mini_batch_size,
+                max_concurrency_per_instance=self.max_concurrency_per_instance,
+                environment_variables=self.environment_variables,
+                properties=self.properties,
+            )
+
+        return BatchDeploymentData(location=location, properties=batch_deployment, tags=self.tags)
+
+    @classmethod
+    def _from_rest_object(  # pylint: disable=arguments-renamed
+        cls, deployment: BatchDeploymentData
+    ) -> BatchDeploymentData:
+        modelId = deployment.properties.model.asset_id if deployment.properties.model else None
+
+        if (
+            hasattr(deployment.properties, "deployment_configuration")
+            and deployment.properties.deployment_configuration is not None
+        ):
+            settings = deployment.properties.deployment_configuration.settings
+            deployment_comp_settings = {
+                "deployment_configuration_type": deployment.properties.deployment_configuration.deployment_configuration_type,  # pylint: disable=line-too-long
+                "componentDeployment.Settings.continue_on_step_failure": settings.get(
+                    "ComponentDeployment.Settings.continue_on_step_failure", None
+                ),
+                "default_datastore": settings.get("default_datastore", None),
+                "default_compute": settings.get("default_compute", None),
+            }
+            properties = {}
+            if deployment.properties.properties:
+                properties.update(deployment.properties.properties)
+            properties.update(deployment_comp_settings)
+        else:
+            properties = deployment.properties.properties
+
+        code_configuration = (
+            CodeConfiguration._from_rest_code_configuration(deployment.properties.code_configuration)
+            if deployment.properties.code_configuration
+            else None
+        )
+        deployment = BatchDeployment(
+            name=deployment.name,
+            description=deployment.properties.description,
+            id=deployment.id,
+            tags=deployment.tags,
+            model=modelId,
+            environment=deployment.properties.environment_id,
+            code_configuration=code_configuration,
+            output_file_name=(
+                deployment.properties.output_file_name
+                if cls._rest_output_action_to_yaml_output_action(deployment.properties.output_action)
+                == BatchDeploymentOutputAction.APPEND_ROW
+                else None
+            ),
+            output_action=cls._rest_output_action_to_yaml_output_action(deployment.properties.output_action),
+            error_threshold=deployment.properties.error_threshold,
+            retry_settings=BatchRetrySettings._from_rest_object(deployment.properties.retry_settings),
+            logging_level=deployment.properties.logging_level,
+            mini_batch_size=deployment.properties.mini_batch_size,
+            compute=deployment.properties.compute,
+            resources=ResourceConfiguration._from_rest_object(deployment.properties.resources),
+            environment_variables=deployment.properties.environment_variables,
+            max_concurrency_per_instance=deployment.properties.max_concurrency_per_instance,
+            endpoint_name=_parse_endpoint_name_from_deployment_id(deployment.id),
+            properties=properties,
+            creation_context=SystemData._from_rest_object(deployment.system_data),
+            provisioning_state=deployment.properties.provisioning_state,
+        )
+
+        return deployment
+
+    @classmethod
+    def _load(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "BatchDeployment":
+        data = data or {}
+        params_override = params_override or []
+        cls._update_params(params_override)
+
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path.cwd(),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+        res: BatchDeployment = load_from_dict(BatchDeploymentSchema, data, context, **kwargs)
+        return res
+
+    def _validate(self) -> None:
+        self._validate_output_action()
+
+    @classmethod
+    def _update_params(cls, params_override: Any) -> None:
+        for param in params_override:
+            endpoint_name = param.get("endpoint_name")
+            if isinstance(endpoint_name, str):
+                param["endpoint_name"] = endpoint_name.lower()
+
+    def _validate_output_action(self) -> None:
+        if (
+            self.output_action
+            and self.output_action == BatchDeploymentOutputAction.SUMMARY_ONLY
+            and self.output_file_name
+        ):
+            msg = "When output_action is set to {}, the output_file_name need not to be specified."
+            msg = msg.format(BatchDeploymentOutputAction.SUMMARY_ONLY)
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.BATCH_DEPLOYMENT,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/batch_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/batch_job.py
new file mode 100644
index 00000000..c078f479
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/batch_job.py
@@ -0,0 +1,38 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict
+
+from azure.ai.ml._restclient.v2020_09_01_dataplanepreview.models import BatchJobResource
+
+
+class BatchJob(object):
+    """Batch jobs that are created with batch deployments/endpoints invocation.
+
+    This class shouldn't be instantiated directly. Instead, it is used as the return type of batch deployment/endpoint
+    invocation and job listing.
+    """
+
+    def __init__(self, **kwargs: Any):
+        self.id = kwargs.get("id", None)
+        self.name = kwargs.get("name", None)
+        self.type = kwargs.get("type", None)
+        self.status = kwargs.get("status", None)
+
+    def _to_dict(self) -> Dict:
+        return {
+            "id": self.id,
+            "name": self.name,
+            "type": self.type,
+            "status": self.status,
+        }
+
+    @classmethod
+    def _from_rest_object(cls, obj: BatchJobResource) -> "BatchJob":
+        return cls(
+            id=obj.id,
+            name=obj.name,
+            type=obj.type,
+            status=obj.properties.status,
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/code_configuration.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/code_configuration.py
new file mode 100644
index 00000000..cbae647d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/code_configuration.py
@@ -0,0 +1,93 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import logging
+import os
+from typing import Optional, Union
+
+from azure.ai.ml._restclient.v2022_05_01.models import CodeConfiguration as RestCodeConfiguration
+from azure.ai.ml.entities._assets import Code
+from azure.ai.ml.entities._mixins import DictMixin
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
+
+module_logger = logging.getLogger(__name__)
+
+
+class CodeConfiguration(DictMixin):
+    """Code configuration for a scoring job.
+
+    :param code: The code directory containing the scoring script. The code can be an Code object, an ARM resource ID
+        of an existing code asset, a local path, or "http:", "https:", or "azureml:" url pointing to a remote location.
+    :type code: Optional[Union[~azure.ai.ml.entities.Code, str]]
+    :param scoring_script: The scoring script file path relative to the code directory.
+    :type scoring_script: Optional[str]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START code_configuration]
+            :end-before: [END code_configuration]
+            :language: python
+            :dedent: 8
+            :caption: Creating a CodeConfiguration for a BatchDeployment.
+    """
+
+    def __init__(
+        self,
+        code: Optional[Union[str, os.PathLike]] = None,
+        scoring_script: Optional[Union[str, os.PathLike]] = None,
+    ) -> None:
+        self.code: Optional[Union[str, os.PathLike]] = code
+        self._scoring_script: Optional[Union[str, os.PathLike]] = scoring_script
+
+    @property
+    def scoring_script(self) -> Optional[Union[str, os.PathLike]]:
+        """The scoring script file path relative to the code directory.
+
+        :rtype: str
+        """
+        return self._scoring_script
+
+    def _to_rest_code_configuration(self) -> RestCodeConfiguration:
+        return RestCodeConfiguration(code_id=self.code, scoring_script=self.scoring_script)
+
+    def _validate(self) -> None:
+        if self.code and not self.scoring_script:
+            msg = "scoring script can't be empty"
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.CODE,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.MISSING_FIELD,
+            )
+
+    @staticmethod
+    def _from_rest_code_configuration(code_configuration: RestCodeConfiguration) -> Optional["CodeConfiguration"]:
+        if code_configuration:
+            return CodeConfiguration(
+                code=code_configuration.code_id,
+                scoring_script=code_configuration.scoring_script,
+            )
+        return None
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, CodeConfiguration):
+            return NotImplemented
+        if not other:
+            return False
+        # only compare mutable fields
+        return (
+            self.scoring_script == other.scoring_script
+            and (
+                isinstance(self.code, Code)
+                and isinstance(other.code, Code)
+                or isinstance(self.code, str)
+                and isinstance(other.code, str)
+            )
+            and self.code == other.code
+        )
+
+    def __ne__(self, other: object) -> bool:
+        return not self.__eq__(other)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/container_resource_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/container_resource_settings.py
new file mode 100644
index 00000000..0d0bc15d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/container_resource_settings.py
@@ -0,0 +1,74 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=arguments-renamed
+
+import logging
+from typing import Optional
+
+from azure.ai.ml._restclient.v2022_05_01.models import ContainerResourceSettings
+from azure.ai.ml.entities._mixins import RestTranslatableMixin
+
+module_logger = logging.getLogger(__name__)
+
+
+class ResourceSettings(RestTranslatableMixin):
+    """Resource settings for a container.
+
+    This class uses Kubernetes Resource unit formats. For more information, see
+    https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/.
+
+    :param cpu: The CPU resource settings for a container.
+    :type cpu: Optional[str]
+    :param memory: The memory resource settings for a container.
+    :type memory: Optional[str]
+    :param gpu: The GPU resource settings for a container.
+    :type gpu: Optional[str]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START resource_requirements_configuration]
+            :end-before: [END resource_requirements_configuration]
+            :language: python
+            :dedent: 8
+            :caption: Configuring ResourceSettings for a Kubernetes deployment.
+    """
+
+    def __init__(self, cpu: Optional[str] = None, memory: Optional[str] = None, gpu: Optional[str] = None) -> None:
+        self.cpu = cpu
+        self.memory = memory
+        self.gpu = gpu
+
+    def _to_rest_object(self) -> ContainerResourceSettings:
+        return ContainerResourceSettings(cpu=self.cpu, memory=self.memory, gpu=self.gpu)
+
+    @classmethod
+    def _from_rest_object(cls, settings: ContainerResourceSettings) -> Optional["ResourceSettings"]:
+        return (
+            ResourceSettings(
+                cpu=settings.cpu,
+                memory=settings.memory,
+                gpu=settings.gpu,
+            )
+            if settings
+            else None
+        )
+
+    def _merge_with(self, other: Optional["ResourceSettings"]) -> None:
+        if other:
+            self.cpu = other.cpu or self.cpu
+            self.memory = other.memory or self.memory
+            self.gpu = other.gpu or self.gpu
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, ResourceSettings):
+            return NotImplemented
+        if not other:
+            return False
+        # only compare mutable fields
+        return self.cpu == other.cpu and self.memory == other.memory and self.gpu == other.gpu
+
+    def __ne__(self, other: object) -> bool:
+        return not self.__eq__(other)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/data_asset.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/data_asset.py
new file mode 100644
index 00000000..72d24131
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/data_asset.py
@@ -0,0 +1,38 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Dict, Optional
+
+from azure.ai.ml._schema._deployment.online.data_asset_schema import DataAssetSchema
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+
+
+@experimental
+class DataAsset:
+    """Data Asset entity
+
+    :keyword Optional[str] data_id: Arm id of registered data asset
+    :keyword Optional[str] name: Name of data asset
+    :keyword Optional[str] path: Path where the data asset is stored.
+    :keyword Optional[int] version: Version of data asset.
+    """
+
+    def __init__(
+        self,
+        *,
+        data_id: Optional[str] = None,
+        name: Optional[str] = None,
+        path: Optional[str] = None,
+        version: Optional[int] = None,
+    ):
+        self.data_id = data_id
+        self.name = name
+        self.path = path
+        self.version = version
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = DataAssetSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/data_collector.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/data_collector.py
new file mode 100644
index 00000000..74277c61
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/data_collector.py
@@ -0,0 +1,84 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+
+from typing import Any, Dict, Optional
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import DataCollector as RestDataCollector
+from azure.ai.ml._schema._deployment.online.data_collector_schema import DataCollectorSchema
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.entities._deployment.deployment_collection import DeploymentCollection
+from azure.ai.ml.entities._deployment.request_logging import RequestLogging
+
+
+@experimental
+class DataCollector:
+    """Data Capture deployment entity.
+
+    :param collections: Mapping dictionary of strings mapped to DeploymentCollection entities.
+    :type collections: Mapping[str, DeploymentCollection]
+    :param rolling_rate: The rolling rate of mdc files, possible values: ["minute", "hour", "day"].
+    :type rolling_rate: str
+    :param sampling_rate: The sampling rate of mdc files, possible values: [0.0, 1.0].
+    :type sampling_rate: float
+    :param request_logging: Logging of request payload parameters.
+    :type request_logging: RequestLogging
+    """
+
+    def __init__(
+        self,
+        collections: Dict[str, DeploymentCollection],
+        *,
+        rolling_rate: Optional[str] = None,
+        sampling_rate: Optional[float] = None,
+        request_logging: Optional[RequestLogging] = None,
+        **kwargs: Any,
+    ):  # pylint: disable=unused-argument
+        self.collections = collections
+        self.rolling_rate = rolling_rate
+        self.sampling_rate = sampling_rate
+        self.request_logging = request_logging
+
+        if self.sampling_rate:
+            for collection in self.collections.values():
+                collection.sampling_rate = self.sampling_rate
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = DataCollectorSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
+
+    @classmethod
+    def _from_rest_object(cls, rest_obj: RestDataCollector) -> "DataCollector":
+        collections = {}
+        sampling_rate = None
+        for k, v in rest_obj.collections.items():
+            sampling_rate = v.sampling_rate
+            collections[k] = DeploymentCollection._from_rest_object(v)
+            delattr(collections[k], "sampling_rate")
+
+        return DataCollector(
+            collections=collections,
+            rolling_rate=rest_obj.rolling_rate,
+            request_logging=(
+                RequestLogging._from_rest_object(rest_obj.request_logging) if rest_obj.request_logging else None
+            ),
+            sampling_rate=sampling_rate,
+        )
+
+    def _to_rest_object(self) -> RestDataCollector:
+        rest_collections: dict = {}
+        for collection in self.collections.values():
+            collection.sampling_rate = self.sampling_rate
+        delattr(self, "sampling_rate")
+        if self.request_logging:
+            self.request_logging = self.request_logging._to_rest_object()
+        if self.collections:
+            rest_collections = {}
+            for k, v in self.collections.items():
+                rest_collections[k] = v._to_rest_object()
+        return RestDataCollector(
+            collections=rest_collections, rolling_rate=self.rolling_rate, request_logging=self.request_logging
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment.py
new file mode 100644
index 00000000..2f857cfa
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment.py
@@ -0,0 +1,213 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access,arguments-renamed
+
+import logging
+from abc import abstractmethod
+from os import PathLike
+from typing import IO, TYPE_CHECKING, Any, AnyStr, Dict, Optional, Union
+
+from azure.ai.ml._restclient.v2022_02_01_preview.models import BatchDeploymentData
+from azure.ai.ml._restclient.v2022_05_01.models import OnlineDeploymentData
+from azure.ai.ml._utils.utils import dump_yaml_to_file
+from azure.ai.ml.entities._mixins import RestTranslatableMixin
+from azure.ai.ml.entities._resource import Resource
+from azure.ai.ml.exceptions import (
+    DeploymentException,
+    ErrorCategory,
+    ErrorTarget,
+    ValidationErrorType,
+    ValidationException,
+)
+
+from .code_configuration import CodeConfiguration
+
+# avoid circular import error
+if TYPE_CHECKING:
+    from azure.ai.ml.entities._assets._artifacts.model import Model
+    from azure.ai.ml.entities._assets.environment import Environment
+
+module_logger = logging.getLogger(__name__)
+
+
+class Deployment(Resource, RestTranslatableMixin):
+    """Endpoint Deployment base class.
+
+    :param name: Name of the deployment resource, defaults to None
+    :type name: typing.Optional[str]
+    :param endpoint_name: Name of the Endpoint resource, defaults to None
+    :type endpoint_name: typing.Optional[str]
+    :param description: Description of the deployment resource, defaults to None
+    :type description: typing.Optional[str]
+    :param tags: Tag dictionary. Tags can be added, removed, and updated, defaults to None
+    :type tags: typing.Optional[typing.Dict[str, typing.Any]]
+    :param properties: The asset property dictionary, defaults to None
+    :type properties: typing.Optional[typing.Dict[str, typing.Any]]
+    :param model: The Model entity, defaults to None
+    :type model: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Model]]
+    :param code_configuration: Code Configuration, defaults to None
+    :type code_configuration: typing.Optional[CodeConfiguration]
+    :param environment: The Environment entity, defaults to None
+    :type environment: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Environment]]
+    :param environment_variables: Environment variables that will be set in deployment, defaults to None
+    :type environment_variables: typing.Optional[typing.Dict[str, str]]
+    :param code_path: Folder path to local code assets. Equivalent to code_configuration.code.path
+        , defaults to None
+    :type code_path: typing.Optional[typing.Union[str, PathLike]]
+    :param scoring_script: Scoring script name. Equivalent to code_configuration.code.scoring_script
+        , defaults to None
+    :type scoring_script: typing.Optional[typing.Union[str, PathLike]]
+    :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Deployment cannot be successfully validated.
+        Exception details will be provided in the error message.
+    """
+
+    def __init__(
+        self,
+        name: Optional[str] = None,
+        *,
+        endpoint_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict[str, Any]] = None,
+        properties: Optional[Dict[str, Any]] = None,
+        model: Optional[Union[str, "Model"]] = None,
+        code_configuration: Optional[CodeConfiguration] = None,
+        environment: Optional[Union[str, "Environment"]] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        code_path: Optional[Union[str, PathLike]] = None,
+        scoring_script: Optional[Union[str, PathLike]] = None,
+        **kwargs: Any,
+    ):
+        # MFE is case-insensitive for Name. So convert the name into lower case here.
+        name = name.lower() if name else None
+        self.endpoint_name = endpoint_name
+        self._type: Optional[str] = kwargs.pop("type", None)
+
+        if code_configuration and (code_path or scoring_script):
+            msg = "code_path and scoring_script are not allowed if code_configuration is provided."
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.DEPLOYMENT,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+
+        super().__init__(name, description, tags, properties, **kwargs)
+
+        self.model = model
+        self.code_configuration = code_configuration
+        if not self.code_configuration and (code_path or scoring_script):
+            self.code_configuration = CodeConfiguration(code=code_path, scoring_script=scoring_script)
+
+        self.environment = environment
+        self.environment_variables = dict(environment_variables) if environment_variables else {}
+
+    @property
+    def type(self) -> Optional[str]:
+        """
+        Type of deployment.
+
+        :rtype: str
+        """
+        return self._type
+
+    @property
+    def code_path(self) -> Optional[Union[str, PathLike]]:
+        """
+        The code directory containing the scoring script.
+
+        :rtype: Union[str, PathLike]
+        """
+        return self.code_configuration.code if self.code_configuration and self.code_configuration.code else None
+
+    @code_path.setter
+    def code_path(self, value: Union[str, PathLike]) -> None:
+        if not self.code_configuration:
+            self.code_configuration = CodeConfiguration()
+
+        self.code_configuration.code = value
+
+    @property
+    def scoring_script(self) -> Optional[Union[str, PathLike]]:
+        """
+        The scoring script file path relative to the code directory.
+
+        :rtype: Union[str, PathLike]
+        """
+        return self.code_configuration.scoring_script if self.code_configuration else None
+
+    @scoring_script.setter
+    def scoring_script(self, value: Union[str, PathLike]) -> None:
+        if not self.code_configuration:
+            self.code_configuration = CodeConfiguration()
+
+        self.code_configuration.scoring_script = value  # type: ignore[misc]
+
+    def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
+        """Dump the deployment content into a file in yaml format.
+
+        :param dest: The destination to receive this deployment's content.
+            Must be either a path to a local file, or an already-open file stream.
+            If dest is a file path, a new file will be created,
+            and an exception is raised if the file exists.
+            If dest is an open file, the file will be written to directly,
+            and an exception will be raised if the file is not writable.
+        :type dest: typing.Union[os.PathLike, str, typing.IO[typing.AnyStr]]
+        """
+        path = kwargs.pop("path", None)
+        yaml_serialized = self._to_dict()
+        dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)
+
+    @abstractmethod
+    def _to_dict(self) -> Dict:
+        pass
+
+    @classmethod
+    def _from_rest_object(
+        cls, deployment_rest_object: Union[OnlineDeploymentData, BatchDeploymentData]
+    ) -> Union[OnlineDeploymentData, BatchDeploymentData]:
+        from azure.ai.ml.entities._deployment.batch_deployment import BatchDeployment
+        from azure.ai.ml.entities._deployment.online_deployment import OnlineDeployment
+
+        if isinstance(deployment_rest_object, OnlineDeploymentData):
+            return OnlineDeployment._from_rest_object(deployment_rest_object)
+        if isinstance(deployment_rest_object, BatchDeploymentData):
+            return BatchDeployment._from_rest_object(deployment_rest_object)
+
+        msg = f"Unsupported deployment type {type(deployment_rest_object)}"
+        raise DeploymentException(
+            message=msg,
+            target=ErrorTarget.DEPLOYMENT,
+            no_personal_data_message=msg,
+            error_category=ErrorCategory.SYSTEM_ERROR,
+        )
+
+    def _to_rest_object(self) -> Any:
+        pass
+
+    def _merge_with(self, other: "Deployment") -> None:
+        if other:
+            if self.name != other.name:
+                msg = "The deployment name: {} and {} are not matched when merging."
+                raise ValidationException(
+                    message=msg.format(self.name, other.name),
+                    target=ErrorTarget.DEPLOYMENT,
+                    no_personal_data_message=msg.format("[name1]", "[name2]"),
+                    error_category=ErrorCategory.USER_ERROR,
+                    error_type=ValidationErrorType.INVALID_VALUE,
+                )
+            if other.tags:
+                self.tags: dict = {**self.tags, **other.tags}
+            if other.properties:
+                self.properties: dict = {**self.properties, **other.properties}
+            if other.environment_variables:
+                self.environment_variables = {
+                    **self.environment_variables,
+                    **other.environment_variables,
+                }
+            self.code_configuration = other.code_configuration or self.code_configuration
+            self.model = other.model or self.model
+            self.environment = other.environment or self.environment
+            self.endpoint_name = other.endpoint_name or self.endpoint_name
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment_collection.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment_collection.py
new file mode 100644
index 00000000..c1b1c750
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment_collection.py
@@ -0,0 +1,62 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, Optional, Union
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import Collection as RestCollection
+from azure.ai.ml._schema._deployment.online.deployment_collection_schema import DeploymentCollectionSchema
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from .data_asset import DataAsset
+
+
+@experimental
+class DeploymentCollection:
+    """Collection entity
+
+    :param enabled: Is logging for this collection enabled. Possible values include: 'true', 'false'.
+    :type enabled: str
+    :param data: Data asset id associated with collection logging.
+    :type data: str
+    :param client_id: Client ID associated with collection logging.
+    :type client_id: str
+
+    """
+
+    def __init__(
+        self,
+        *,
+        enabled: Optional[str] = None,
+        data: Optional[Union[str, DataAsset]] = None,
+        client_id: Optional[str] = None,
+        **kwargs: Any
+    ):
+        self.enabled = enabled  # maps to data_collection_mode
+        self.data = data  # maps to data_id
+        self.sampling_rate = kwargs.get(
+            "sampling_rate", None
+        )  # maps to sampling_rate, but it has to be passed from the data_collector root
+        self.client_id = client_id
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = DeploymentCollectionSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
+
+    @classmethod
+    def _from_rest_object(cls, rest_obj: RestCollection) -> "DeploymentCollection":
+        return DeploymentCollection(
+            enabled="true" if rest_obj.data_collection_mode == "Enabled" else "false",
+            sampling_rate=rest_obj.sampling_rate,
+            data=rest_obj.data_id,
+            client_id=rest_obj.client_id,
+        )
+
+    def _to_rest_object(self) -> RestCollection:
+        return RestCollection(
+            data_collection_mode="enabled" if str(self.enabled).lower() == "true" else "disabled",
+            sampling_rate=self.sampling_rate,
+            data_id=self.data,
+            client_id=self.client_id,
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment_settings.py
new file mode 100644
index 00000000..0dbfc8fc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/deployment_settings.py
@@ -0,0 +1,200 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=arguments-renamed
+
+import logging
+from typing import Optional
+
+from azure.ai.ml._restclient.v2022_05_01.models import BatchRetrySettings as RestBatchRetrySettings
+from azure.ai.ml._restclient.v2022_05_01.models import OnlineRequestSettings as RestOnlineRequestSettings
+from azure.ai.ml._restclient.v2022_05_01.models import ProbeSettings as RestProbeSettings
+from azure.ai.ml._utils.utils import (
+    from_iso_duration_format,
+    from_iso_duration_format_ms,
+    to_iso_duration_format,
+    to_iso_duration_format_ms,
+)
+from azure.ai.ml.entities._mixins import RestTranslatableMixin
+
+module_logger = logging.getLogger(__name__)
+
+
+class BatchRetrySettings(RestTranslatableMixin):
+    """Retry settings for batch deployment.
+
+    :param max_retries: Number of retries in failure, defaults to 3
+    :type max_retries: int
+    :param timeout: Timeout in seconds, defaults to 30
+    :type timeout: int
+    """
+
+    def __init__(self, *, max_retries: Optional[int] = None, timeout: Optional[int] = None):
+        self.max_retries = max_retries
+        self.timeout = timeout
+
+    def _to_rest_object(self) -> RestBatchRetrySettings:
+        return RestBatchRetrySettings(
+            max_retries=self.max_retries,
+            timeout=to_iso_duration_format(self.timeout),
+        )
+
+    @classmethod
+    def _from_rest_object(cls, settings: RestBatchRetrySettings) -> Optional["BatchRetrySettings"]:
+        return (
+            BatchRetrySettings(
+                max_retries=settings.max_retries,
+                timeout=from_iso_duration_format(settings.timeout),
+            )
+            if settings
+            else None
+        )
+
+    def _merge_with(self, other: "BatchRetrySettings") -> None:
+        if other:
+            self.timeout = other.timeout or self.timeout
+            self.max_retries = other.max_retries or self.max_retries
+
+
+class OnlineRequestSettings(RestTranslatableMixin):
+    """Request Settings entity.
+
+    :param request_timeout_ms: defaults to 5000
+    :type request_timeout_ms: int
+    :param max_concurrent_requests_per_instance: defaults to 1
+    :type max_concurrent_requests_per_instance: int
+    :param max_queue_wait_ms: defaults to 500
+    :type max_queue_wait_ms: int
+    """
+
+    def __init__(
+        self,
+        max_concurrent_requests_per_instance: Optional[int] = None,
+        request_timeout_ms: Optional[int] = None,
+        max_queue_wait_ms: Optional[int] = None,
+    ):
+        self.request_timeout_ms = request_timeout_ms
+        self.max_concurrent_requests_per_instance = max_concurrent_requests_per_instance
+        self.max_queue_wait_ms = max_queue_wait_ms
+
+    def _to_rest_object(self) -> RestOnlineRequestSettings:
+        return RestOnlineRequestSettings(
+            max_queue_wait=to_iso_duration_format_ms(self.max_queue_wait_ms),
+            max_concurrent_requests_per_instance=self.max_concurrent_requests_per_instance,
+            request_timeout=to_iso_duration_format_ms(self.request_timeout_ms),
+        )
+
+    def _merge_with(self, other: Optional["OnlineRequestSettings"]) -> None:
+        if other:
+            self.max_concurrent_requests_per_instance = (
+                other.max_concurrent_requests_per_instance or self.max_concurrent_requests_per_instance
+            )
+            self.request_timeout_ms = other.request_timeout_ms or self.request_timeout_ms
+            self.max_queue_wait_ms = other.max_queue_wait_ms or self.max_queue_wait_ms
+
+    @classmethod
+    def _from_rest_object(cls, settings: RestOnlineRequestSettings) -> Optional["OnlineRequestSettings"]:
+        return (
+            OnlineRequestSettings(
+                request_timeout_ms=from_iso_duration_format_ms(settings.request_timeout),
+                max_concurrent_requests_per_instance=settings.max_concurrent_requests_per_instance,
+                max_queue_wait_ms=from_iso_duration_format_ms(settings.max_queue_wait),
+            )
+            if settings
+            else None
+        )
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, OnlineRequestSettings):
+            return NotImplemented
+        if not other:
+            return False
+        # only compare mutable fields
+        return (
+            self.max_concurrent_requests_per_instance == other.max_concurrent_requests_per_instance
+            and self.request_timeout_ms == other.request_timeout_ms
+            and self.max_queue_wait_ms == other.max_queue_wait_ms
+        )
+
+    def __ne__(self, other: object) -> bool:
+        return not self.__eq__(other)
+
+
+class ProbeSettings(RestTranslatableMixin):
+    def __init__(
+        self,
+        *,
+        failure_threshold: Optional[int] = None,
+        success_threshold: Optional[int] = None,
+        timeout: Optional[int] = None,
+        period: Optional[int] = None,
+        initial_delay: Optional[int] = None,
+    ):
+        """Settings on how to probe an endpoint.
+
+        :param failure_threshold: Threshold for probe failures, defaults to 30
+        :type failure_threshold: int
+        :param success_threshold: Threshold for probe success, defaults to 1
+        :type success_threshold: int
+        :param timeout: timeout in seconds, defaults to 2
+        :type timeout: int
+        :param period: How often (in seconds) to perform the probe, defaults to 10
+        :type period: int
+        :param initial_delay: How long (in seconds) to wait for the first probe, defaults to 10
+        :type initial_delay: int
+        """
+
+        self.failure_threshold = failure_threshold
+        self.success_threshold = success_threshold
+        self.timeout = timeout
+        self.period = period
+        self.initial_delay = initial_delay
+
+    def _to_rest_object(self) -> RestProbeSettings:
+        return RestProbeSettings(
+            failure_threshold=self.failure_threshold,
+            success_threshold=self.success_threshold,
+            timeout=to_iso_duration_format(self.timeout),
+            period=to_iso_duration_format(self.period),
+            initial_delay=to_iso_duration_format(self.initial_delay),
+        )
+
+    def _merge_with(self, other: Optional["ProbeSettings"]) -> None:
+        if other:
+            self.failure_threshold = other.failure_threshold or self.failure_threshold
+            self.success_threshold = other.success_threshold or self.success_threshold
+            self.timeout = other.timeout or self.timeout
+            self.period = other.period or self.period
+            self.initial_delay = other.initial_delay or self.initial_delay
+
+    @classmethod
+    def _from_rest_object(cls, settings: RestProbeSettings) -> Optional["ProbeSettings"]:
+        return (
+            ProbeSettings(
+                failure_threshold=settings.failure_threshold,
+                success_threshold=settings.success_threshold,
+                timeout=from_iso_duration_format(settings.timeout),
+                period=from_iso_duration_format(settings.period),
+                initial_delay=from_iso_duration_format(settings.initial_delay),
+            )
+            if settings
+            else None
+        )
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, ProbeSettings):
+            return NotImplemented
+        if not other:
+            return False
+        # only compare mutable fields
+        return (
+            self.failure_threshold == other.failure_threshold
+            and self.success_threshold == other.success_threshold
+            and self.timeout == other.timeout
+            and self.period == other.period
+            and self.initial_delay == other.initial_delay
+        )
+
+    def __ne__(self, other: object) -> bool:
+        return not self.__eq__(other)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/event_hub.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/event_hub.py
new file mode 100644
index 00000000..2729fa50
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/event_hub.py
@@ -0,0 +1,32 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, Optional
+
+from azure.ai.ml._schema._deployment.online.event_hub_schema import EventHubSchema
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.entities._deployment.oversize_data_config import OversizeDataConfig
+
+
+class EventHub:
+    """Event Hub deployment entity
+
+    :param namespace: Name space of eventhub, provided in format of "{namespace}.{name}".
+    :type namespace: str
+    :param oversize_data_config: Oversized payload body configurations.
+    :type oversize_data_config: OversizeDataConfig
+
+    """
+
+    # pylint: disable=unused-argument
+    def __init__(
+        self, namespace: Optional[str] = None, oversize_data_config: Optional[OversizeDataConfig] = None, **kwargs: Any
+    ):
+        self.namespace = namespace
+        self.oversize_data_config = oversize_data_config
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = EventHubSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/job_definition.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/job_definition.py
new file mode 100644
index 00000000..56bebebc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/job_definition.py
@@ -0,0 +1,58 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, Optional, Union
+
+from azure.ai.ml._schema._deployment.batch.job_definition_schema import JobDefinitionSchema
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._job.job import Job
+
+
+@experimental
+class JobDefinition:
+    """Job Definition entity.
+
+    :param type: Job definition type. Allowed value is: pipeline
+    :type type: str
+    :param name: Job name
+    :type name: str
+    :param job: Job definition
+    :type job: Union[Job, str]
+    :param component: Component definition
+    :type component: Union[Component, str]
+    :param settings: Job settings
+    :type settings: Dict[str, Any]
+    :param description: Job description.
+    :type description: str
+    :param tags: Job tags
+    :type tags: Dict[str, Any]
+    """
+
+    def __init__(
+        self,
+        # pylint: disable=redefined-builtin
+        type: str,
+        name: Optional[str] = None,
+        job: Optional[Union[Job, str]] = None,
+        component: Optional[Union[Component, str]] = None,
+        settings: Optional[Dict[str, Any]] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict[str, Any]] = None,
+        # pylint: disable=unused-argument
+        **kwargs: Any,
+    ):
+        self.type = type
+        self.name = name
+        self.job = job
+        self.component = component
+        self.settings = settings
+        self.tags = tags
+        self.description = description
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = JobDefinitionSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/model_batch_deployment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/model_batch_deployment.py
new file mode 100644
index 00000000..0ad4fd6f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/model_batch_deployment.py
@@ -0,0 +1,207 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from os import PathLike
+from pathlib import Path
+from typing import Any, Dict, Optional, Union
+
+from azure.ai.ml._restclient.v2022_05_01.models import BatchDeploymentData
+from azure.ai.ml._restclient.v2022_05_01.models import BatchDeploymentDetails as RestBatchDeployment
+from azure.ai.ml._restclient.v2022_05_01.models import BatchOutputAction
+from azure.ai.ml._restclient.v2022_05_01.models import CodeConfiguration as RestCodeConfiguration
+from azure.ai.ml._restclient.v2022_05_01.models import IdAssetReference
+from azure.ai.ml._schema._deployment.batch.model_batch_deployment import ModelBatchDeploymentSchema
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY
+from azure.ai.ml.constants._deployment import BatchDeploymentOutputAction
+from azure.ai.ml.entities._assets import Environment, Model
+from azure.ai.ml.entities._deployment.batch_deployment import BatchDeployment
+from azure.ai.ml.entities._deployment.deployment import Deployment
+from azure.ai.ml.entities._job.resource_configuration import ResourceConfiguration
+from azure.ai.ml.entities._util import load_from_dict
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
+
+from .code_configuration import CodeConfiguration
+from .model_batch_deployment_settings import ModelBatchDeploymentSettings
+
+
+@experimental
+class ModelBatchDeployment(Deployment):
+    """Job Definition entity.
+
+    :param type: Job definition type. Allowed value is: pipeline
+    :type type: str
+    :param name: Job name
+    :type name: str
+    :param job: Job definition
+    :type job: Union[Job, str]
+    :param component: Component definition
+    :type component: Union[Component, str]
+    :param settings: Job settings
+    :type settings: Dict[str, Any]
+    :param description: Job description.
+    :type description: str
+    :param tags: Job tags
+    :type tags: Dict[str, Any]
+    :param properties: The asset property dictionary.
+    :type properties: dict[str, str]
+    """
+
+    def __init__(
+        self,
+        *,
+        name: Optional[str],
+        endpoint_name: Optional[str] = None,
+        environment: Optional[Union[str, Environment]] = None,
+        properties: Optional[Dict[str, str]] = None,
+        model: Optional[Union[str, Model]] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict[str, Any]] = None,
+        settings: Optional[ModelBatchDeploymentSettings] = None,
+        resources: Optional[ResourceConfiguration] = None,
+        compute: Optional[str] = None,
+        code_configuration: Optional[CodeConfiguration] = None,
+        code_path: Optional[Union[str, PathLike]] = None,  # promoted property from code_configuration.code
+        scoring_script: Optional[
+            Union[str, PathLike]
+        ] = None,  # promoted property from code_configuration.scoring_script
+        **kwargs: Any,
+    ):
+        self._provisioning_state: Optional[str] = kwargs.pop("provisioning_state", None)
+        super().__init__(
+            name=name,
+            endpoint_name=endpoint_name,
+            properties=properties,
+            code_path=code_path,
+            scoring_script=scoring_script,
+            environment=environment,
+            model=model,
+            description=description,
+            tags=tags,
+            code_configuration=code_configuration,
+            **kwargs,
+        )
+        self.compute = compute
+        self.resources = resources
+        if settings is not None:
+            self.settings = ModelBatchDeploymentSettings(
+                mini_batch_size=settings.mini_batch_size,
+                instance_count=settings.instance_count,
+                max_concurrency_per_instance=settings.max_concurrency_per_instance,
+                output_action=settings.output_action,
+                output_file_name=settings.output_file_name,
+                retry_settings=settings.retry_settings,
+                environment_variables=settings.environment_variables,
+                error_threshold=settings.error_threshold,
+                logging_level=settings.logging_level,
+            )
+            if self.resources is not None:
+                if self.resources.instance_count is None and settings.instance_count is not None:
+                    self.resources.instance_count = settings.instance_count
+            if self.resources is None and settings.instance_count is not None:
+                self.resources = ResourceConfiguration(instance_count=settings.instance_count)
+
+    # pylint: disable=arguments-differ
+    def _to_rest_object(self, location: str) -> BatchDeploymentData:  # type: ignore
+        self._validate()
+        code_config = (
+            RestCodeConfiguration(
+                code_id=self.code_configuration.code,
+                scoring_script=self.code_configuration.scoring_script,
+            )
+            if self.code_configuration
+            else None
+        )
+        deployment_settings = self.settings
+        model = IdAssetReference(asset_id=self.model) if self.model else None
+        batch_deployment = RestBatchDeployment(
+            description=self.description,
+            environment_id=self.environment,
+            model=model,
+            code_configuration=code_config,
+            output_file_name=deployment_settings.output_file_name,
+            output_action=BatchDeployment._yaml_output_action_to_rest_output_action(  # pylint: disable=protected-access
+                deployment_settings.output_action
+            ),
+            error_threshold=deployment_settings.error_threshold,
+            resources=self.resources._to_rest_object() if self.resources else None,  # pylint: disable=protected-access
+            retry_settings=(
+                deployment_settings.retry_settings._to_rest_object()  # pylint: disable=protected-access
+                if deployment_settings.retry_settings
+                else None
+            ),
+            logging_level=deployment_settings.logging_level,
+            mini_batch_size=deployment_settings.mini_batch_size,
+            max_concurrency_per_instance=deployment_settings.max_concurrency_per_instance,
+            environment_variables=deployment_settings.environment_variables,
+            compute=self.compute,
+            properties=self.properties,
+        )
+        return BatchDeploymentData(location=location, properties=batch_deployment, tags=self.tags)
+
+    @classmethod
+    def _load(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "ModelBatchDeployment":
+        data = data or {}
+        params_override = params_override or []
+        cls._update_params(params_override)
+
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path.cwd(),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+        res: ModelBatchDeployment = load_from_dict(ModelBatchDeploymentSchema, data, context, **kwargs)
+        return res
+
+    @classmethod
+    def _update_params(cls, params_override: Any) -> None:
+        for param in params_override:
+            endpoint_name = param.get("endpoint_name")
+            if isinstance(endpoint_name, str):
+                param["endpoint_name"] = endpoint_name.lower()
+
+    @classmethod
+    def _yaml_output_action_to_rest_output_action(cls, yaml_output_action: str) -> str:
+        output_switcher = {
+            BatchDeploymentOutputAction.APPEND_ROW: BatchOutputAction.APPEND_ROW,
+            BatchDeploymentOutputAction.SUMMARY_ONLY: BatchOutputAction.SUMMARY_ONLY,
+        }
+        return output_switcher.get(yaml_output_action, yaml_output_action)
+
+    @property
+    def provisioning_state(self) -> Optional[str]:
+        """Batch deployment provisioning state, readonly.
+
+        :return: Batch deployment provisioning state.
+        :rtype: Optional[str]
+        """
+        return self._provisioning_state
+
+    def _validate(self) -> None:
+        self._validate_output_action()
+
+    def _validate_output_action(self) -> None:
+        if (
+            self.settings.output_action
+            and self.settings.output_action == BatchDeploymentOutputAction.SUMMARY_ONLY
+            and self.settings.output_file_name
+        ):
+            msg = "When output_action is set to {}, the output_file_name need not to be specified."
+            msg = msg.format(BatchDeploymentOutputAction.SUMMARY_ONLY)
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.BATCH_DEPLOYMENT,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+
+    def _to_dict(self) -> Dict:
+        res: dict = ModelBatchDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/model_batch_deployment_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/model_batch_deployment_settings.py
new file mode 100644
index 00000000..36151019
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/model_batch_deployment_settings.py
@@ -0,0 +1,81 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, Optional
+
+from azure.ai.ml._schema._deployment.batch.model_batch_deployment_settings import ModelBatchDeploymentSettingsSchema
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.constants._deployment import BatchDeploymentOutputAction
+from azure.ai.ml.entities._deployment.deployment_settings import BatchRetrySettings
+
+
+@experimental
+class ModelBatchDeploymentSettings:
+    """Model Batch Deployment Settings entity.
+
+    :param mini_batch_size: Size of the mini-batch passed to each batch invocation, defaults to 10
+    :type mini_batch_size: int
+    :param instance_count: Number of instances the interfering will run on. Equivalent to resources.instance_count.
+    :type instance_count: int
+    :param output_action: Indicates how the output will be organized. Possible values include:
+     "summary_only", "append_row". Defaults to "append_row"
+    :type output_action: str or ~azure.ai.ml.constants._deployment.BatchDeploymentOutputAction
+    :param output_file_name: Customized output file name for append_row output action, defaults to "predictions.csv"
+    :type output_file_name: str
+    :param max_concurrency_per_instance: Indicates maximum number of parallelism per instance, defaults to 1
+    :type max_concurrency_per_instance: int
+    :param retry_settings: Retry settings for a batch inference operation, defaults to None
+    :type retry_settings: BatchRetrySettings
+    :param environment_variables: Environment variables that will be set in deployment.
+    :type environment_variables: dict
+    :param error_threshold: Error threshold, if the error count for the entire input goes above
+        this value,
+        the batch inference will be aborted. Range is [-1, int.MaxValue]
+        -1 value indicates, ignore all failures during batch inference
+        For FileDataset count of file failures
+        For TabularDataset, this is the count of record failures, defaults to -1
+    :type error_threshold: int
+    :param logging_level: Logging level for batch inference operation, defaults to "info"
+    :type logging_level: str
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START model_batch_deployment_settings_entity_create]
+            :end-before: [END model_batch_deployment_settings_entity_create]
+            :language: python
+            :dedent: 8
+            :caption: Creating a Model Batch Deployment Settings object.
+    """
+
+    def __init__(
+        self,
+        *,
+        mini_batch_size: Optional[int],
+        instance_count: Optional[int] = None,
+        max_concurrency_per_instance: Optional[int] = None,
+        output_action: Optional[BatchDeploymentOutputAction] = None,
+        output_file_name: Optional[str] = None,
+        retry_settings: Optional[BatchRetrySettings] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        error_threshold: Optional[int] = None,
+        logging_level: Optional[str] = None,
+        # pylint: disable=unused-argument
+        **kwargs: Any,
+    ):
+        self.mini_batch_size = mini_batch_size
+        self.instance_count = instance_count
+        self.max_concurrency_per_instance = max_concurrency_per_instance
+        self.output_action = output_action
+        self.output_file_name = output_file_name
+        self.retry_settings = retry_settings
+        self.environment_variables = environment_variables
+        self.error_threshold = error_threshold
+        self.logging_level = logging_level
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = ModelBatchDeploymentSettingsSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/online_deployment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/online_deployment.py
new file mode 100644
index 00000000..131d3293
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/online_deployment.py
@@ -0,0 +1,742 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access,arguments-renamed,unidiomatic-typecheck
+
+import logging
+import os
+import typing
+from abc import abstractmethod
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple, Union, cast
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import CodeConfiguration as RestCodeConfiguration
+from azure.ai.ml._restclient.v2023_04_01_preview.models import EndpointComputeType
+from azure.ai.ml._restclient.v2023_04_01_preview.models import (
+    KubernetesOnlineDeployment as RestKubernetesOnlineDeployment,
+)
+from azure.ai.ml._restclient.v2023_04_01_preview.models import ManagedOnlineDeployment as RestManagedOnlineDeployment
+from azure.ai.ml._restclient.v2023_04_01_preview.models import OnlineDeployment as RestOnlineDeploymentData
+from azure.ai.ml._restclient.v2023_04_01_preview.models import OnlineDeploymentProperties as RestOnlineDeploymentDetails
+from azure.ai.ml._restclient.v2023_04_01_preview.models import Sku as RestSku
+from azure.ai.ml._schema._deployment.online.online_deployment import (
+    KubernetesOnlineDeploymentSchema,
+    ManagedOnlineDeploymentSchema,
+)
+from azure.ai.ml._utils._arm_id_utils import _parse_endpoint_name_from_deployment_id
+from azure.ai.ml._utils.utils import camel_to_snake
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY, TYPE, ArmConstants
+from azure.ai.ml.constants._endpoint import EndpointYamlFields
+from azure.ai.ml.entities._assets import Code
+from azure.ai.ml.entities._assets._artifacts.model import Model
+from azure.ai.ml.entities._assets.environment import Environment
+from azure.ai.ml.entities._deployment.code_configuration import CodeConfiguration
+from azure.ai.ml.entities._deployment.data_collector import DataCollector
+from azure.ai.ml.entities._deployment.deployment_settings import OnlineRequestSettings, ProbeSettings
+from azure.ai.ml.entities._deployment.resource_requirements_settings import ResourceRequirementsSettings
+from azure.ai.ml.entities._deployment.scale_settings import (
+    DefaultScaleSettings,
+    OnlineScaleSettings,
+    TargetUtilizationScaleSettings,
+)
+from azure.ai.ml.entities._endpoint._endpoint_helpers import validate_endpoint_or_deployment_name
+from azure.ai.ml.entities._util import load_from_dict
+from azure.ai.ml.exceptions import (
+    DeploymentException,
+    ErrorCategory,
+    ErrorTarget,
+    ValidationErrorType,
+    ValidationException,
+)
+
+from .deployment import Deployment
+
+module_logger = logging.getLogger(__name__)
+
+
+# pylint: disable=too-many-instance-attributes
+class OnlineDeployment(Deployment):
+    """Online endpoint deployment entity.
+
+    :param name: Name of the deployment resource.
+    :type name: str
+    :param endpoint_name: Name of the endpoint resource, defaults to None
+    :type endpoint_name: typing.Optional[str]
+    :param tags: Tag dictionary. Tags can be added, removed, and updated, defaults to None
+    :type tags: typing.Optional[typing.Dict[str, typing.Any]]
+    :param properties: The asset property dictionary, defaults to None
+    :type properties: typing.Optional[typing.Dict[str, typing.Any]]
+    :param description: Description of the resource, defaults to None
+    :type description: typing.Optional[str]
+    :param model: Model entity for the endpoint deployment, defaults to None
+    :type model: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Model]]
+    :param data_collector: Data Collector entity for the endpoint deployment, defaults to None
+    :type data_collector: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.DataCollector]]
+    :param code_configuration: Code Configuration, defaults to None
+    :type code_configuration: typing.Optional[~azure.ai.ml.entities.CodeConfiguration]
+    :param environment: Environment entity for the endpoint deployment, defaults to None
+    :type environment: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Environment]]
+    :param app_insights_enabled: Is appinsights enabled, defaults to False
+    :type app_insights_enabled: typing.Optional[bool]
+    :param scale_settings: How the online deployment will scale, defaults to None
+    :type scale_settings: typing.Optional[~azure.ai.ml.entities.OnlineScaleSettings]
+    :param request_settings: Online Request Settings, defaults to None
+    :type request_settings: typing.Optional[~azure.ai.ml.entities.OnlineRequestSettings]
+    :param liveness_probe: Liveness probe settings, defaults to None
+    :type liveness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
+    :param readiness_probe: Readiness probe settings, defaults to None
+    :type readiness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
+    :param environment_variables: Environment variables that will be set in deployment, defaults to None
+    :type environment_variables: typing.Optional[typing.Dict[str, str]]
+    :param instance_count: The instance count used for this deployment, defaults to None
+    :type instance_count: typing.Optional[int]
+    :param instance_type: Azure compute sku, defaults to None
+    :type instance_type: typing.Optional[str]
+    :param model_mount_path: The path to mount the model in custom container, defaults to None
+    :type model_mount_path: typing.Optional[str]
+    :param code_path: Equivalent to code_configuration.code, will be ignored if code_configuration is present
+        , defaults to None
+    :type code_path: typing.Optional[typing.Union[str, os.PathLike]]
+    :param scoring_script: Equivalent to code_configuration.code.scoring_script.
+        Will be ignored if code_configuration is present, defaults to None
+    :type scoring_script: typing.Optional[typing.Union[str, os.PathLike]]
+    """
+
+    def __init__(
+        self,
+        name: str,
+        *,
+        endpoint_name: Optional[str] = None,
+        tags: Optional[Dict[str, typing.Any]] = None,
+        properties: Optional[Dict[str, typing.Any]] = None,
+        description: Optional[str] = None,
+        model: Optional[Union[str, "Model"]] = None,
+        data_collector: Optional[DataCollector] = None,
+        code_configuration: Optional[CodeConfiguration] = None,
+        environment: Optional[Union[str, "Environment"]] = None,
+        app_insights_enabled: Optional[bool] = False,
+        scale_settings: Optional[OnlineScaleSettings] = None,
+        request_settings: Optional[OnlineRequestSettings] = None,
+        liveness_probe: Optional[ProbeSettings] = None,
+        readiness_probe: Optional[ProbeSettings] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        instance_count: Optional[int] = None,
+        instance_type: Optional[str] = None,
+        model_mount_path: Optional[str] = None,
+        code_path: Optional[Union[str, os.PathLike]] = None,  # promoted property from code_configuration.code
+        scoring_script: Optional[Union[str, os.PathLike]] = None,  # promoted property code_configuration.scoring_script
+        **kwargs: typing.Any,
+    ):
+        self._provisioning_state: Optional[str] = kwargs.pop("provisioning_state", None)
+
+        super(OnlineDeployment, self).__init__(
+            name=name,
+            endpoint_name=endpoint_name,
+            tags=tags,
+            properties=properties,
+            description=description,
+            model=model,
+            code_configuration=code_configuration,
+            environment=environment,
+            environment_variables=environment_variables,
+            code_path=code_path,
+            scoring_script=scoring_script,
+            **kwargs,
+        )
+
+        self.app_insights_enabled = app_insights_enabled
+        self.scale_settings = scale_settings
+        self.request_settings = request_settings
+        self.liveness_probe = liveness_probe
+        self.readiness_probe = readiness_probe
+        self.instance_count = instance_count
+        self._arm_type = ArmConstants.ONLINE_DEPLOYMENT_TYPE
+        self.model_mount_path = model_mount_path
+        self.instance_type = instance_type
+        self.data_collector: Any = data_collector
+
+    @property
+    def provisioning_state(self) -> Optional[str]:
+        """Deployment provisioning state, readonly.
+
+        :return: Deployment provisioning state.
+        :rtype: typing.Optional[str]
+        """
+        return self._provisioning_state
+
+    def _generate_dependencies(self) -> Tuple:
+        """Convert dependencies into ARM id or REST wrapper.
+
+        :return: A 3-tuple of the code configuration, environment ID, and model ID.
+        :rtype: Tuple[RestCodeConfiguration, str, str]
+        """
+        code = None
+
+        if self.code_configuration:
+            self.code_configuration._validate()
+            if self.code_configuration.code is not None:
+                if isinstance(self.code_configuration.code, str):
+                    code_id = self.code_configuration.code
+                elif not isinstance(self.code_configuration.code, os.PathLike):
+                    code_id = self.code_configuration.code.id
+
+                code = RestCodeConfiguration(
+                    code_id=code_id,  # pylint: disable=possibly-used-before-assignment
+                    scoring_script=self.code_configuration.scoring_script,
+                )
+
+        model_id = None
+        if self.model:
+            model_id = self.model if isinstance(self.model, str) else self.model.id
+
+        environment_id = None
+        if self.environment:
+            environment_id = self.environment if isinstance(self.environment, str) else self.environment.id
+
+        return code, environment_id, model_id
+
+    @abstractmethod
+    def _to_dict(self) -> Dict:
+        pass
+
+    @abstractmethod
+    def _to_arm_resource_param(self, **kwargs: Any) -> Dict:
+        pass
+
+    @abstractmethod
+    def _to_rest_object(self) -> RestOnlineDeploymentData:
+        pass
+
+    @classmethod
+    def _from_rest_object(cls, deployment: RestOnlineDeploymentData) -> RestOnlineDeploymentDetails:
+        if deployment.properties.endpoint_compute_type == EndpointComputeType.KUBERNETES:
+            return KubernetesOnlineDeployment._from_rest_object(deployment)
+        if deployment.properties.endpoint_compute_type == EndpointComputeType.MANAGED:
+            return ManagedOnlineDeployment._from_rest_object(deployment)
+
+        msg = f"Unsupported online endpoint type {deployment.properties.endpoint_compute_type}."
+        raise DeploymentException(
+            message=msg,
+            target=ErrorTarget.ONLINE_DEPLOYMENT,
+            no_personal_data_message=msg,
+            error_category=ErrorCategory.SYSTEM_ERROR,
+        )
+
+    def _get_arm_resource(self, **kwargs: Any) -> Dict:
+        resource: dict = super(OnlineDeployment, self)._get_arm_resource(**kwargs)
+        depends_on = []
+        if self.environment and isinstance(self.environment, Environment):
+            depends_on.append(f"{self.environment._arm_type}Deployment")
+        if self.code_configuration and self.code_configuration.code and isinstance(self.code_configuration.code, Code):
+            depends_on.append(f"{self.code_configuration.code._arm_type}Deployment")
+        if self.model and isinstance(self.model, Model):
+            depends_on.append(f"{self.model._arm_type}Deployment")
+        resource[ArmConstants.DEPENDSON_PARAMETER_NAME] = depends_on
+        return resource
+
+    def _get_arm_resource_and_params(self, **kwargs: Any) -> List:
+        resource_param_tuple_list = [(self._get_arm_resource(**kwargs), self._to_arm_resource_param(**kwargs))]
+        if self.environment and isinstance(self.environment, Environment):
+            resource_param_tuple_list.extend(self.environment._get_arm_resource_and_params())
+        if self.code_configuration and self.code_configuration.code and isinstance(self.code_configuration.code, Code):
+            resource_param_tuple_list.extend(self.code_configuration.code._get_arm_resource_and_params())
+        if self.model and isinstance(self.model, Model):
+            resource_param_tuple_list.extend(self.model._get_arm_resource_and_params())
+        return resource_param_tuple_list
+
+    def _validate_name(self) -> None:
+        if self.name:
+            validate_endpoint_or_deployment_name(self.name, is_deployment=True)
+
+    def _merge_with(self, other: Any) -> None:
+        if other:
+            if self.name != other.name:
+                msg = "The deployment name: {} and {} are not matched when merging."
+                raise ValidationException(
+                    message=msg.format(self.name, other.name),
+                    target=ErrorTarget.ONLINE_DEPLOYMENT,
+                    no_personal_data_message=msg.format("[name1]", "[name2]"),
+                    error_category=ErrorCategory.USER_ERROR,
+                    error_type=ValidationErrorType.INVALID_VALUE,
+                )
+            super()._merge_with(other)
+            self.app_insights_enabled = other.app_insights_enabled or self.app_insights_enabled
+            # Adding noqa: Fix E721 do not compare types, use 'isinstance()'
+            # isinstance will include checking for subclasses, which is explicitly undesired by a logic.
+            if self.scale_settings and type(self.scale_settings) == type(other.scale_settings):  # noqa
+                self.scale_settings._merge_with(other.scale_settings)
+            else:
+                self.scale_settings = other.scale_settings
+            if self.request_settings:
+                self.request_settings._merge_with(other.request_settings)
+            else:
+                self.request_settings = other.request_settings
+            if self.liveness_probe:
+                self.liveness_probe._merge_with(other.liveness_probe)
+            else:
+                self.liveness_probe = other.liveness_probe
+            if self.readiness_probe:
+                self.readiness_probe._merge_with(other.readiness_probe)
+            else:
+                self.readiness_probe = other.readiness_probe
+            self.instance_count = other.instance_count or self.instance_count
+            self.instance_type = other.instance_type or self.instance_type
+
+    @classmethod
+    def _set_scale_settings(cls, data: dict) -> None:
+        if not hasattr(data, EndpointYamlFields.SCALE_SETTINGS):
+            return
+
+        scale_settings = data[EndpointYamlFields.SCALE_SETTINGS]
+        keyName = TYPE
+        if scale_settings and scale_settings[keyName] == "default":
+            scale_copy = scale_settings.copy()
+            for key in scale_copy:
+                if key != keyName:
+                    scale_settings.pop(key, None)
+
+    @classmethod
+    def _load(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[os.PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "OnlineDeployment":
+        data = data or {}
+        params_override = params_override or []
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path.cwd(),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+
+        deployment_type = data.get("type", None)
+
+        if deployment_type == camel_to_snake(EndpointComputeType.KUBERNETES.value):
+            res_kub: OnlineDeployment = load_from_dict(KubernetesOnlineDeploymentSchema, data, context, **kwargs)
+            return res_kub
+
+        res_manage: OnlineDeployment = load_from_dict(ManagedOnlineDeploymentSchema, data, context, **kwargs)
+        return res_manage
+
+
+class KubernetesOnlineDeployment(OnlineDeployment):
+    """Kubernetes Online endpoint deployment entity.
+
+    :param name: Name of the deployment resource.
+    :type name: str
+    :param endpoint_name: Name of the endpoint resource, defaults to None
+    :type endpoint_name: typing.Optional[str]
+    :param tags: Tag dictionary. Tags can be added, removed, and updated., defaults to None
+    :type tags: typing.Optional[typing.Dict[str, typing.Any]]
+    :param properties: The asset property dictionary, defaults to None
+    :type properties: typing.Optional[typing.Dict[str, typing.Any]]
+    :param description: Description of the resource, defaults to None
+    :type description: typing.Optional[str]
+    :param model: Model entity for the endpoint deployment, defaults to None
+    :type model: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Model]]
+    :param code_configuration: Code Configuration, defaults to None
+    :type code_configuration: typing.Optional[~azure.ai.ml.entities.CodeConfiguration]
+    :param environment: Environment entity for the endpoint deployment, defaults to None
+    :type environment: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Environment]]
+    :param app_insights_enabled: Is appinsights enabled, defaults to False
+    :type app_insights_enabled: bool
+    :param scale_settings: How the online deployment will scale, defaults to None
+    :type scale_settings: typing.Optional[typing.Union[~azure.ai.ml.entities.DefaultScaleSettings
+        , ~azure.ai.ml.entities.TargetUtilizationScaleSettings]]
+    :param request_settings: Online Request Settings, defaults to None
+    :type request_settings: typing.Optional[OnlineRequestSettings]
+    :param liveness_probe: Liveness probe settings, defaults to None
+    :type liveness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
+    :param readiness_probe: Readiness probe settings, defaults to None
+    :type readiness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
+    :param environment_variables: Environment variables that will be set in deployment, defaults to None
+    :type environment_variables: typing.Optional[typing.Dict[str, str]]
+    :param resources: Resource requirements settings, defaults to None
+    :type resources: typing.Optional[~azure.ai.ml.entities.ResourceRequirementsSettings]
+    :param instance_count: The instance count used for this deployment, defaults to None
+    :type instance_count: typing.Optional[int]
+    :param instance_type: The instance type defined by K8S cluster admin, defaults to None
+    :type instance_type: typing.Optional[str]
+    :param code_path: Equivalent to code_configuration.code, will be ignored if code_configuration is present
+        , defaults to None
+    :type code_path: typing.Optional[typing.Union[str, os.PathLike]]
+    :param scoring_script: Equivalent to code_configuration.code.scoring_script.
+        Will be ignored if code_configuration is present, defaults to None
+    :type scoring_script: typing.Optional[typing.Union[str, os.PathLike]]
+    """
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        endpoint_name: Optional[str] = None,
+        tags: Optional[Dict[str, typing.Any]] = None,
+        properties: Optional[Dict[str, typing.Any]] = None,
+        description: Optional[str] = None,
+        model: Optional[Union[str, "Model"]] = None,
+        code_configuration: Optional[CodeConfiguration] = None,
+        environment: Optional[Union[str, "Environment"]] = None,
+        app_insights_enabled: bool = False,
+        scale_settings: Optional[Union[DefaultScaleSettings, TargetUtilizationScaleSettings]] = None,
+        request_settings: Optional[OnlineRequestSettings] = None,
+        liveness_probe: Optional[ProbeSettings] = None,
+        readiness_probe: Optional[ProbeSettings] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        resources: Optional[ResourceRequirementsSettings] = None,
+        instance_count: Optional[int] = None,
+        instance_type: Optional[str] = None,
+        code_path: Optional[Union[str, os.PathLike]] = None,  # promoted property from code_configuration.code
+        scoring_script: Optional[
+            Union[str, os.PathLike]
+        ] = None,  # promoted property from code_configuration.scoring_script
+        **kwargs: Any,
+    ):
+        kwargs["type"] = EndpointComputeType.KUBERNETES.value
+        super(KubernetesOnlineDeployment, self).__init__(
+            name=name,
+            endpoint_name=endpoint_name,
+            tags=tags,
+            properties=properties,
+            description=description,
+            model=model,
+            code_configuration=code_configuration,
+            environment=environment,
+            environment_variables=environment_variables,
+            instance_count=instance_count,
+            instance_type=instance_type,
+            app_insights_enabled=app_insights_enabled,
+            scale_settings=scale_settings,
+            request_settings=request_settings,
+            liveness_probe=liveness_probe,
+            readiness_probe=readiness_probe,
+            code_path=code_path,
+            scoring_script=scoring_script,
+            **kwargs,
+        )
+
+        self.resources = resources
+
+    def _to_dict(self) -> Dict:
+        res: dict = KubernetesOnlineDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
+
+    # pylint: disable=arguments-differ
+    def _to_rest_object(self, location: str) -> RestOnlineDeploymentData:  # type: ignore
+        self._validate()
+        code, environment, model = self._generate_dependencies()
+
+        properties = RestKubernetesOnlineDeployment(
+            code_configuration=code,
+            environment_id=environment,
+            model=model,
+            model_mount_path=self.model_mount_path,
+            scale_settings=self.scale_settings._to_rest_object() if self.scale_settings else None,
+            properties=self.properties,
+            description=self.description,
+            environment_variables=self.environment_variables,
+            app_insights_enabled=self.app_insights_enabled,
+            request_settings=self.request_settings._to_rest_object() if self.request_settings else None,
+            liveness_probe=self.liveness_probe._to_rest_object() if self.liveness_probe else None,
+            readiness_probe=self.readiness_probe._to_rest_object() if self.readiness_probe else None,
+            container_resource_requirements=self.resources._to_rest_object() if self.resources else None,
+            instance_type=self.instance_type if self.instance_type else None,
+            data_collector=self.data_collector._to_rest_object() if self.data_collector else None,
+        )
+        sku = RestSku(name="Default", capacity=self.instance_count)
+
+        return RestOnlineDeploymentData(location=location, properties=properties, tags=self.tags, sku=sku)
+
+    def _to_arm_resource_param(self, **kwargs: Any) -> Dict:
+        rest_object = self._to_rest_object(**kwargs)
+        properties = rest_object.properties
+        sku = rest_object.sku
+        tags = rest_object.tags
+
+        return {
+            self._arm_type: {
+                ArmConstants.NAME: self.name,
+                ArmConstants.PROPERTIES_PARAMETER_NAME: self._serialize.body(properties, "K8SOnlineDeployment"),
+                ArmConstants.SKU: self._serialize.body(sku, "Sku"),
+                ArmConstants.TAGS: tags,
+            }
+        }
+
+    def _merge_with(self, other: Any) -> None:
+        if other:
+            super()._merge_with(other)
+            if self.resources:
+                self.resources._merge_with(other.resources)
+            else:
+                self.resources = other.resources
+
+    def _validate(self) -> None:
+        self._validate_name()
+
+    @classmethod
+    def _from_rest_object(cls, resource: RestOnlineDeploymentData) -> "KubernetesOnlineDeployment":
+        deployment = resource.properties
+
+        code_config = (
+            CodeConfiguration(
+                code=deployment.code_configuration.code_id,
+                scoring_script=deployment.code_configuration.scoring_script,
+            )
+            if deployment.code_configuration
+            else None
+        )
+
+        return KubernetesOnlineDeployment(
+            id=resource.id,
+            name=resource.name,
+            tags=resource.tags,
+            properties=deployment.properties,
+            description=deployment.description,
+            request_settings=OnlineRequestSettings._from_rest_object(deployment.request_settings),
+            model=deployment.model,
+            code_configuration=code_config,
+            environment=deployment.environment_id,
+            resources=ResourceRequirementsSettings._from_rest_object(deployment.container_resource_requirements),
+            app_insights_enabled=deployment.app_insights_enabled,
+            scale_settings=cast(
+                Optional[Union[DefaultScaleSettings, TargetUtilizationScaleSettings]],
+                OnlineScaleSettings._from_rest_object(deployment.scale_settings),
+            ),
+            liveness_probe=ProbeSettings._from_rest_object(deployment.liveness_probe),
+            readiness_probe=ProbeSettings._from_rest_object(deployment.readiness_probe),
+            environment_variables=deployment.environment_variables,
+            endpoint_name=_parse_endpoint_name_from_deployment_id(resource.id),
+            instance_count=resource.sku.capacity if resource.sku else None,
+            instance_type=deployment.instance_type,
+            data_collector=(
+                DataCollector._from_rest_object(deployment.data_collector)
+                if hasattr(deployment, "data_collector") and deployment.data_collector
+                else None
+            ),
+            provisioning_state=deployment.provisioning_state if hasattr(deployment, "provisioning_state") else None,
+        )
+
+
+class ManagedOnlineDeployment(OnlineDeployment):
+    """Managed Online endpoint deployment entity.
+
+    :param name: Name of the deployment resource
+    :type name: str
+    :param endpoint_name: Name of the endpoint resource, defaults to None
+    :type endpoint_name: typing.Optional[str]
+    :param tags: Tag dictionary. Tags can be added, removed, and updated., defaults to None
+    :type tags: typing.Optional[typing.Dict[str, typing.Any]]
+    :param properties: The asset property dictionary, defaults to None
+    :type properties: typing.Optional[typing.Dict[str, typing.Any]]
+    :param description: Description of the resource, defaults to None
+    :type description: typing.Optional[str]
+    :param model: Model entity for the endpoint deployment, defaults to None
+    :type model: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Model]]
+    :param code_configuration: Code Configuration, defaults to None
+    :type code_configuration: typing.Optional[~azure.ai.ml.entities.CodeConfiguration]
+    :param environment: Environment entity for the endpoint deployment, defaults to None
+    :type environment: typing.Optional[typing.Union[str, ~azure.ai.ml.entities.Environment]]
+    :param app_insights_enabled: Is appinsights enabled, defaults to False
+    :type app_insights_enabled: bool
+    :param scale_settings: How the online deployment will scale, defaults to None
+    :type scale_settings: typing.Optional[typing.Union[~azure.ai.ml.entities.DefaultScaleSettings
+        , ~azure.ai.ml.entities.TargetUtilizationScaleSettings]]
+    :param request_settings: Online Request Settings, defaults to None
+    :type request_settings: typing.Optional[OnlineRequestSettings]
+    :param liveness_probe: Liveness probe settings, defaults to None
+    :type liveness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
+    :param readiness_probe: Readiness probe settings, defaults to None
+    :type readiness_probe: typing.Optional[~azure.ai.ml.entities.ProbeSettings]
+    :param environment_variables: Environment variables that will be set in deployment, defaults to None
+    :type environment_variables: typing.Optional[typing.Dict[str, str]]
+    :param instance_type: Azure compute sku, defaults to None
+    :type instance_type: typing.Optional[str]
+    :param instance_count: The instance count used for this deployment, defaults to None
+    :type instance_count: typing.Optional[int]
+    :param egress_public_network_access: Whether to restrict communication between a deployment and the
+         Azure resources used to by the deployment. Allowed values are: "enabled", "disabled", defaults to None
+    :type egress_public_network_access: typing.Optional[str]
+    :param code_path: Equivalent to code_configuration.code, will be ignored if code_configuration is present
+        , defaults to None
+    :type code_path: typing.Optional[typing.Union[str, os.PathLike]]
+    :param scoring_script_path: Equivalent to code_configuration.scoring_script, will be ignored if
+        code_configuration is present, defaults to None
+    :type scoring_script_path: typing.Optional[typing.Union[str, os.PathLike]]
+    :param data_collector: Data collector, defaults to None
+    :type data_collector: typing.Optional[typing.List[~azure.ai.ml.entities.DataCollector]]
+    """
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        endpoint_name: Optional[str] = None,
+        tags: Optional[Dict[str, typing.Any]] = None,
+        properties: Optional[Dict[str, typing.Any]] = None,
+        description: Optional[str] = None,
+        model: Optional[Union[str, "Model"]] = None,
+        code_configuration: Optional[CodeConfiguration] = None,
+        environment: Optional[Union[str, "Environment"]] = None,
+        app_insights_enabled: bool = False,
+        scale_settings: Optional[Union[DefaultScaleSettings, TargetUtilizationScaleSettings]] = None,
+        request_settings: Optional[OnlineRequestSettings] = None,
+        liveness_probe: Optional[ProbeSettings] = None,
+        readiness_probe: Optional[ProbeSettings] = None,
+        environment_variables: Optional[Dict[str, str]] = None,
+        instance_type: Optional[str] = None,
+        instance_count: Optional[int] = None,
+        egress_public_network_access: Optional[str] = None,
+        code_path: Optional[Union[str, os.PathLike]] = None,  # promoted property from code_configuration.code
+        scoring_script: Optional[
+            Union[str, os.PathLike]
+        ] = None,  # promoted property from code_configuration.scoring_script
+        data_collector: Optional[DataCollector] = None,
+        **kwargs: Any,
+    ):
+        kwargs["type"] = EndpointComputeType.MANAGED.value
+        self.private_network_connection = kwargs.pop("private_network_connection", None)
+        self.package_model = kwargs.pop("package_model", False)
+
+        super(ManagedOnlineDeployment, self).__init__(
+            name=name,
+            endpoint_name=endpoint_name,
+            tags=tags,
+            properties=properties,
+            description=description,
+            model=model,
+            code_configuration=code_configuration,
+            environment=environment,
+            environment_variables=environment_variables,
+            app_insights_enabled=app_insights_enabled,
+            scale_settings=scale_settings,
+            request_settings=request_settings,
+            liveness_probe=liveness_probe,
+            readiness_probe=readiness_probe,
+            instance_count=instance_count,
+            instance_type=instance_type,
+            code_path=code_path,
+            scoring_script=scoring_script,
+            data_collector=data_collector,
+            **kwargs,
+        )
+
+        self.readiness_probe = readiness_probe
+        self.egress_public_network_access = egress_public_network_access
+
+    def _to_dict(self) -> Dict:
+        res: dict = ManagedOnlineDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
+
+    # pylint: disable=arguments-differ
+    def _to_rest_object(self, location: str) -> RestOnlineDeploymentData:  # type: ignore
+        self._validate()
+        code, environment, model = self._generate_dependencies()
+        properties = RestManagedOnlineDeployment(
+            code_configuration=code,
+            environment_id=environment,
+            model=model,
+            model_mount_path=self.model_mount_path,
+            scale_settings=self.scale_settings._to_rest_object() if self.scale_settings else None,
+            properties=self.properties,
+            description=self.description,
+            environment_variables=self.environment_variables,
+            app_insights_enabled=self.app_insights_enabled,
+            request_settings=self.request_settings._to_rest_object() if self.request_settings else None,
+            liveness_probe=self.liveness_probe._to_rest_object() if self.liveness_probe else None,
+            instance_type=self.instance_type,
+            readiness_probe=self.readiness_probe._to_rest_object() if self.readiness_probe else None,
+            data_collector=self.data_collector._to_rest_object() if self.data_collector else None,
+        )
+        # TODO: SKU name is defaulted to value "Default" since service side requires it.
+        #  Should be removed once service side defaults it.
+        sku = RestSku(name="Default", capacity=self.instance_count)
+
+        # mfe is expecting private network connection to be in both the attribute level
+        # as well as in the properties dictionary.
+        if hasattr(self, "private_network_connection") and self.private_network_connection:
+            properties.private_network_connection = self.private_network_connection
+            properties.properties["private-network-connection"] = self.private_network_connection
+        if hasattr(self, "egress_public_network_access") and self.egress_public_network_access:
+            properties.egress_public_network_access = self.egress_public_network_access
+        return RestOnlineDeploymentData(location=location, properties=properties, tags=self.tags, sku=sku)
+
+    def _to_arm_resource_param(self, **kwargs: Any) -> Dict:
+        rest_object = self._to_rest_object(**kwargs)
+        properties = rest_object.properties
+        sku = rest_object.sku
+        tags = rest_object.tags
+
+        return {
+            self._arm_type: {
+                ArmConstants.NAME: self.name,
+                ArmConstants.PROPERTIES_PARAMETER_NAME: self._serialize.body(properties, "ManagedOnlineDeployment"),
+                ArmConstants.SKU: self._serialize.body(sku, "Sku"),
+                ArmConstants.TAGS: tags,
+            }
+        }
+
+    @classmethod
+    def _from_rest_object(cls, resource: RestOnlineDeploymentData) -> "ManagedOnlineDeployment":
+        deployment = resource.properties
+
+        code_config = (
+            CodeConfiguration(
+                code=deployment.code_configuration.code_id,
+                scoring_script=deployment.code_configuration.scoring_script,
+            )
+            if deployment.code_configuration
+            else None
+        )
+
+        return ManagedOnlineDeployment(
+            id=resource.id,
+            name=resource.name,
+            tags=resource.tags,
+            properties=deployment.properties,
+            description=deployment.description,
+            request_settings=OnlineRequestSettings._from_rest_object(deployment.request_settings),
+            model=(deployment.model if deployment.model else None),
+            code_configuration=code_config,
+            environment=deployment.environment_id,
+            app_insights_enabled=deployment.app_insights_enabled,
+            scale_settings=OnlineScaleSettings._from_rest_object(deployment.scale_settings),  # type: ignore
+            liveness_probe=ProbeSettings._from_rest_object(deployment.liveness_probe),
+            environment_variables=deployment.environment_variables,
+            readiness_probe=ProbeSettings._from_rest_object(deployment.readiness_probe),
+            instance_type=deployment.instance_type,
+            endpoint_name=_parse_endpoint_name_from_deployment_id(resource.id),
+            instance_count=resource.sku.capacity,
+            private_network_connection=(
+                deployment.private_network_connection if hasattr(deployment, "private_network_connection") else None
+            ),
+            egress_public_network_access=deployment.egress_public_network_access,
+            data_collector=(
+                DataCollector._from_rest_object(deployment.data_collector)
+                if hasattr(deployment, "data_collector") and deployment.data_collector
+                else None
+            ),
+            provisioning_state=deployment.provisioning_state if hasattr(deployment, "provisioning_state") else None,
+            creation_context=resource.system_data,
+        )
+
+    def _merge_with(self, other: Any) -> None:
+        if other:
+            super()._merge_with(other)
+            self.instance_type = other.instance_type or self.instance_type
+
+    def _validate(self) -> None:
+        self._validate_name()
+        self._validate_scale_settings()
+
+    def _validate_scale_settings(self) -> None:
+        if self.scale_settings:
+            if not isinstance(self.scale_settings, DefaultScaleSettings):
+                msg = "ManagedOnlineEndpoint supports DefaultScaleSettings only."
+                raise ValidationException(
+                    message=msg,
+                    target=ErrorTarget.ONLINE_DEPLOYMENT,
+                    no_personal_data_message=msg,
+                    error_category=ErrorCategory.USER_ERROR,
+                    error_type=ValidationErrorType.INVALID_VALUE,
+                )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/oversize_data_config.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/oversize_data_config.py
new file mode 100644
index 00000000..80338c39
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/oversize_data_config.py
@@ -0,0 +1,25 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, Optional
+
+from azure.ai.ml._schema._deployment.online.oversize_data_config_schema import OversizeDataConfigSchema
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+
+
+class OversizeDataConfig:
+    """Oversize Data Config deployment entity.
+
+    :param path: Blob path for Model Data Collector file.
+    :type path: str
+    """
+
+    # pylint: disable=unused-argument
+    def __init__(self, path: Optional[str] = None, **kwargs: Any):
+        self.path = path
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = OversizeDataConfigSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/payload_response.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/payload_response.py
new file mode 100644
index 00000000..b67d46c7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/payload_response.py
@@ -0,0 +1,26 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, Optional
+
+from azure.ai.ml._schema._deployment.online.payload_response_schema import PayloadResponseSchema
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+
+
+class PayloadResponse:
+    """Response deployment entity
+
+    :param enabled: Is response logging enabled.
+    :type enabled: str
+
+    """
+
+    # pylint: disable=unused-argument
+    def __init__(self, enabled: Optional[str] = None, **kwargs: Any):
+        self.enabled = enabled
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = PayloadResponseSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/pipeline_component_batch_deployment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/pipeline_component_batch_deployment.py
new file mode 100644
index 00000000..730bc39e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/pipeline_component_batch_deployment.py
@@ -0,0 +1,150 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from os import PathLike
+from pathlib import Path
+from typing import IO, Any, AnyStr, Dict, Optional, Union
+
+from azure.ai.ml._restclient.v2024_01_01_preview.models import BatchDeployment as RestBatchDeployment
+from azure.ai.ml._restclient.v2024_01_01_preview.models import (
+    BatchDeploymentProperties,
+    BatchPipelineComponentDeploymentConfiguration,
+    IdAssetReference,
+)
+from azure.ai.ml._schema._deployment.batch.pipeline_component_batch_deployment_schema import (
+    PipelineComponentBatchDeploymentSchema,
+)
+from azure.ai.ml._utils._arm_id_utils import _parse_endpoint_name_from_deployment_id
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml._utils.utils import dump_yaml_to_file
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY
+from azure.ai.ml.entities import PipelineComponent
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._resource import Resource
+from azure.ai.ml.entities._util import load_from_dict
+
+
+@experimental
+class PipelineComponentBatchDeployment(Resource):
+    """Pipeline Component Batch Deployment entity.
+
+    :param type: Job definition type. Allowed value: "pipeline"
+    :type type: Optional[str]
+    :param name: Name of the deployment resource.
+    :type name: Optional[str]
+    :param description: Description of the deployment resource.
+    :type description: Optional[str]
+    :param component: Component definition.
+    :type component: Optional[Union[Component, str]]
+    :param settings: Run-time settings for the pipeline job.
+    :type settings: Optional[Dict[str, Any]]
+    :param tags: A set of tags. The tags which will be applied to the job.
+    :type tags: Optional[Dict[str, Any]]
+    :param job_definition: Arm ID or PipelineJob entity of an existing pipeline job.
+    :type job_definition: Optional[Dict[str, ~azure.ai.ml.entities._builders.BaseNode]]
+    :param endpoint_name: Name of the Endpoint resource, defaults to None.
+    :type endpoint_name: Optional[str]
+    """
+
+    def __init__(
+        self,
+        *,
+        name: Optional[str],
+        endpoint_name: Optional[str] = None,
+        component: Optional[Union[Component, str]] = None,
+        settings: Optional[Dict[str, str]] = None,
+        job_definition: Optional[Dict[str, BaseNode]] = None,
+        tags: Optional[Dict] = None,
+        description: Optional[str] = None,
+        **kwargs: Any,
+    ):
+        self._type = kwargs.pop("type", None)
+        super().__init__(name=name, tags=tags, description=description, **kwargs)
+        self.component = component
+        self.endpoint_name = endpoint_name
+        self.settings = settings
+        self.job_definition = job_definition
+
+    def _to_rest_object(self, location: str) -> "RestBatchDeployment":
+        if isinstance(self.component, PipelineComponent):
+            id_asset_ref = IdAssetReference(asset_id=self.component.id)
+
+            batch_pipeline_config = BatchPipelineComponentDeploymentConfiguration(
+                settings=self.settings,
+                tags=self.component.tags,
+                description=self.component.description,
+                component_id=id_asset_ref,
+            )
+        else:
+            id_asset_ref = IdAssetReference(asset_id=self.component)
+            batch_pipeline_config = BatchPipelineComponentDeploymentConfiguration(
+                settings=self.settings, component_id=id_asset_ref
+            )
+        return RestBatchDeployment(
+            location=location,
+            tags=self.tags,
+            properties=BatchDeploymentProperties(
+                deployment_configuration=batch_pipeline_config,
+                description=self.description,
+            ),
+        )
+
+    @classmethod
+    def _load(
+        cls,
+        data: Optional[Dict] = None,
+        yaml_path: Optional[Union[PathLike, str]] = None,
+        params_override: Optional[list] = None,
+        **kwargs: Any,
+    ) -> "PipelineComponentBatchDeployment":
+        data = data or {}
+        params_override = params_override or []
+        cls._update_params(params_override)
+
+        context = {
+            BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path.cwd(),
+            PARAMS_OVERRIDE_KEY: params_override,
+        }
+        res: PipelineComponentBatchDeployment = load_from_dict(
+            PipelineComponentBatchDeploymentSchema, data, context, **kwargs
+        )
+        return res
+
+    @classmethod
+    def _update_params(cls, params_override: Any) -> None:
+        for param in params_override:
+            endpoint_name = param.get("endpoint_name")
+            if isinstance(endpoint_name, str):
+                param["endpoint_name"] = endpoint_name.lower()
+
+    @classmethod
+    def _from_rest_object(cls, deployment: RestBatchDeployment) -> "PipelineComponentBatchDeployment":
+        return PipelineComponentBatchDeployment(
+            name=deployment.name,
+            tags=deployment.tags,
+            component=deployment.properties.additional_properties["deploymentConfiguration"]["componentId"]["assetId"],
+            settings=deployment.properties.additional_properties["deploymentConfiguration"]["settings"],
+            endpoint_name=_parse_endpoint_name_from_deployment_id(deployment.id),
+        )
+
+    def dump(self, dest: Union[str, PathLike, IO[AnyStr]], **kwargs: Any) -> None:
+        """Dump the deployment content into a file in yaml format.
+
+        :param dest: The destination to receive this deployment's content.
+            Must be either a path to a local file, or an already-open file stream.
+            If dest is a file path, a new file will be created,
+            and an exception is raised if the file exists.
+            If dest is an open file, the file will be written to directly,
+            and an exception will be raised if the file is not writable.
+        :type dest: typing.Union[os.PathLike, str, typing.IO[typing.AnyStr]]
+        """
+        path = kwargs.pop("path", None)
+        yaml_serialized = self._to_dict()
+        dump_yaml_to_file(dest, yaml_serialized, default_flow_style=False, path=path, **kwargs)
+
+    def _to_dict(self) -> Dict:
+        res: dict = PipelineComponentBatchDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/request_logging.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/request_logging.py
new file mode 100644
index 00000000..20cc83fe
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/request_logging.py
@@ -0,0 +1,39 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, List, Optional
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import RequestLogging as RestRequestLogging
+from azure.ai.ml._schema._deployment.online.request_logging_schema import RequestLoggingSchema
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+
+
+@experimental
+class RequestLogging:
+    """Request Logging deployment entity.
+
+    :param capture_headers: Request payload header.
+    :type capture_headers: list[str]
+    """
+
+    def __init__(
+        self,
+        *,
+        capture_headers: Optional[List[str]] = None,
+        **kwargs: Any,
+    ):  # pylint: disable=unused-argument
+        self.capture_headers = capture_headers
+
+    @classmethod
+    def _from_rest_object(cls, rest_obj: RestRequestLogging) -> "RequestLogging":
+        return RequestLogging(capture_headers=rest_obj.capture_headers)
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = RequestLoggingSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
+
+    def _to_rest_object(self) -> RestRequestLogging:
+        return RestRequestLogging(capture_headers=self.capture_headers)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/resource_requirements_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/resource_requirements_settings.py
new file mode 100644
index 00000000..9db61aae
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/resource_requirements_settings.py
@@ -0,0 +1,84 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import logging
+from typing import Optional
+
+from azure.ai.ml._restclient.v2022_05_01.models import ContainerResourceRequirements
+from azure.ai.ml.entities._deployment.container_resource_settings import ResourceSettings
+from azure.ai.ml.entities._mixins import RestTranslatableMixin
+
+module_logger = logging.getLogger(__name__)
+
+
+class ResourceRequirementsSettings(RestTranslatableMixin):
+    """Resource requirements settings for a container.
+
+    :param requests: The minimum resource requests for a container.
+    :type requests: Optional[~azure.ai.ml.entities.ResourceSettings]
+    :param limits: The resource limits for a container.
+    :type limits: Optional[~azure.ai.ml.entities.ResourceSettings]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_misc.py
+            :start-after: [START resource_requirements_configuration]
+            :end-before: [END resource_requirements_configuration]
+            :language: python
+            :dedent: 8
+            :caption: Configuring ResourceRequirementSettings for a Kubernetes deployment.
+    """
+
+    def __init__(
+        self,
+        requests: Optional[ResourceSettings] = None,
+        limits: Optional[ResourceSettings] = None,
+    ) -> None:
+        self.requests = requests
+        self.limits = limits
+
+    def _to_rest_object(self) -> ContainerResourceRequirements:
+        return ContainerResourceRequirements(
+            container_resource_requests=self.requests._to_rest_object() if self.requests else None,
+            container_resource_limits=self.limits._to_rest_object() if self.limits else None,
+        )
+
+    @classmethod
+    def _from_rest_object(  # pylint: disable=arguments-renamed
+        cls, settings: ContainerResourceRequirements
+    ) -> Optional["ResourceRequirementsSettings"]:
+        requests = settings.container_resource_requests
+        limits = settings.container_resource_limits
+        return (
+            ResourceRequirementsSettings(
+                requests=ResourceSettings._from_rest_object(requests),
+                limits=ResourceSettings._from_rest_object(limits),
+            )
+            if settings
+            else None
+        )
+
+    def _merge_with(self, other: Optional["ResourceRequirementsSettings"]) -> None:
+        if other:
+            if self.requests:
+                self.requests._merge_with(other.requests)
+            else:
+                self.requests = other.requests
+            if self.limits:
+                self.limits._merge_with(other.limits)
+            else:
+                self.limits = other.limits
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, ResourceRequirementsSettings):
+            return NotImplemented
+        if not other:
+            return False
+        # only compare mutable fields
+        return self.requests == other.requests and self.limits == other.limits
+
+    def __ne__(self, other: object) -> bool:
+        return not self.__eq__(other)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/run_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/run_settings.py
new file mode 100644
index 00000000..f1deac83
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/run_settings.py
@@ -0,0 +1,50 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, Optional
+
+from azure.ai.ml._schema._deployment.batch.run_settings_schema import RunSettingsSchema
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+
+
+@experimental
+class RunSettings:
+    """Run Settings entity.
+
+    :param name: Run settings name
+    :type name: str
+    :param display_name: Run settings display name
+    :type display_name: str
+    :param experiment_name: Run settings experiment name
+    :type experiment_name: str
+    :param description: Run settings description
+    :type description: str
+    :param tags: Run settings tags
+    :type tags: Dict[str, Any]
+    :param settings: Run settings - settings
+    :type settings: Dict[str, Any]
+    """
+
+    def __init__(
+        self,
+        name: Optional[str] = None,
+        display_name: Optional[str] = None,
+        experiment_name: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict[str, Any]] = None,
+        settings: Optional[Dict[str, Any]] = None,
+        **kwargs: Any,
+    ):  # pylint: disable=unused-argument
+        self.name = name
+        self.display_name = display_name
+        self.experiment_name = experiment_name
+        self.description = description
+        self.tags = tags
+        self.settings = settings
+
+    def _to_dict(self) -> Dict:
+        # pylint: disable=no-member
+        res: dict = RunSettingsSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
+        return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/scale_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/scale_settings.py
new file mode 100644
index 00000000..85535ca0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_deployment/scale_settings.py
@@ -0,0 +1,173 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import logging
+from abc import abstractmethod
+from typing import Any, Optional
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import DefaultScaleSettings as RestDefaultScaleSettings
+from azure.ai.ml._restclient.v2023_04_01_preview.models import OnlineScaleSettings as RestOnlineScaleSettings
+from azure.ai.ml._restclient.v2023_04_01_preview.models import ScaleType
+from azure.ai.ml._restclient.v2023_04_01_preview.models import (
+    TargetUtilizationScaleSettings as RestTargetUtilizationScaleSettings,
+)
+from azure.ai.ml._utils.utils import camel_to_snake, from_iso_duration_format, to_iso_duration_format
+from azure.ai.ml.entities._mixins import RestTranslatableMixin
+from azure.ai.ml.exceptions import DeploymentException, ErrorCategory, ErrorTarget
+
+module_logger = logging.getLogger(__name__)
+
+
+class OnlineScaleSettings(RestTranslatableMixin):
+    """Scale settings for online deployment.
+
+    :param type: Type of the scale settings, allowed values are "default" and "target_utilization".
+    :type type: str
+    """
+
+    def __init__(
+        self,
+        # pylint: disable=redefined-builtin
+        type: str,
+        # pylint: disable=unused-argument
+        **kwargs: Any,
+    ):
+        self.type = camel_to_snake(type)
+
+    @abstractmethod
+    def _to_rest_object(self) -> RestOnlineScaleSettings:
+        pass
+
+    def _merge_with(self, other: Any) -> None:
+        if other:
+            self.type = other.type or self.type
+
+    @classmethod
+    def _from_rest_object(  # pylint: disable=arguments-renamed
+        cls, settings: RestOnlineScaleSettings
+    ) -> "OnlineScaleSettings":
+        if settings.scale_type == "Default":
+            return DefaultScaleSettings._from_rest_object(settings)
+        if settings.scale_type == "TargetUtilization":
+            return TargetUtilizationScaleSettings._from_rest_object(settings)
+
+        msg = f"Unsupported online scale setting type {settings.scale_type}."
+        raise DeploymentException(
+            message=msg,
+            target=ErrorTarget.ONLINE_DEPLOYMENT,
+            no_personal_data_message=msg,
+            error_category=ErrorCategory.SYSTEM_ERROR,
+        )
+
+
+class DefaultScaleSettings(OnlineScaleSettings):
+    """Default scale settings.
+
+    :ivar type: Default scale settings type. Set automatically to "default" for this class.
+    :vartype type: str
+    """
+
+    def __init__(self, **kwargs: Any):
+        super(DefaultScaleSettings, self).__init__(
+            type=ScaleType.DEFAULT.value,
+        )
+
+    def _to_rest_object(self) -> RestDefaultScaleSettings:
+        return RestDefaultScaleSettings()
+
+    @classmethod
+    def _from_rest_object(cls, settings: RestDefaultScaleSettings) -> "DefaultScaleSettings":
+        return DefaultScaleSettings()
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, DefaultScaleSettings):
+            return NotImplemented
+        if not other:
+            return False
+        # only compare mutable fields
+        res: bool = self.type.lower() == other.type.lower()
+        return res
+
+    def __ne__(self, other: object) -> bool:
+        return not self.__eq__(other)
+
+
+class TargetUtilizationScaleSettings(OnlineScaleSettings):
+    """Auto scale settings.
+
+    :param min_instances: Minimum number of the instances
+    :type min_instances: int
+    :param max_instances: Maximum number of the instances
+    :type max_instances: int
+    :param polling_interval: The polling interval in ISO 8691 format. Only supports duration with
+     precision as low as Seconds.
+    :type polling_interval: str
+    :param target_utilization_percentage:
+    :type target_utilization_percentage: int
+    :ivar type: Target utilization scale settings type. Set automatically to "target_utilization" for this class.
+    :vartype type: str
+    """
+
+    def __init__(
+        self,
+        *,
+        min_instances: Optional[int] = None,
+        max_instances: Optional[int] = None,
+        polling_interval: Optional[int] = None,
+        target_utilization_percentage: Optional[int] = None,
+        **kwargs: Any,
+    ):
+        super(TargetUtilizationScaleSettings, self).__init__(
+            type=ScaleType.TARGET_UTILIZATION.value,
+        )
+        self.min_instances = min_instances
+        self.max_instances = max_instances
+        self.polling_interval = polling_interval
+        self.target_utilization_percentage = target_utilization_percentage
+
+    def _to_rest_object(self) -> RestTargetUtilizationScaleSettings:
+        return RestTargetUtilizationScaleSettings(
+            min_instances=self.min_instances,
+            max_instances=self.max_instances,
+            polling_interval=to_iso_duration_format(self.polling_interval),
+            target_utilization_percentage=self.target_utilization_percentage,
+        )
+
+    def _merge_with(self, other: Optional["TargetUtilizationScaleSettings"]) -> None:
+        if other:
+            super()._merge_with(other)
+            self.min_instances = other.min_instances or self.min_instances
+            self.max_instances = other.max_instances or self.max_instances
+            self.polling_interval = other.polling_interval or self.polling_interval
+            self.target_utilization_percentage = (
+                other.target_utilization_percentage or self.target_utilization_percentage
+            )
+
+    @classmethod
+    def _from_rest_object(cls, settings: RestTargetUtilizationScaleSettings) -> "TargetUtilizationScaleSettings":
+        return cls(
+            min_instances=settings.min_instances,
+            max_instances=settings.max_instances,
+            polling_interval=from_iso_duration_format(settings.polling_interval),
+            target_utilization_percentage=settings.target_utilization_percentage,
+        )
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, TargetUtilizationScaleSettings):
+            return NotImplemented
+        if not other:
+            return False
+        # only compare mutable fields
+        return (
+            self.type.lower() == other.type.lower()
+            and self.min_instances == other.min_instances
+            and self.max_instances == other.max_instances
+            and self.polling_interval == other.polling_interval
+            and self.target_utilization_percentage == other.target_utilization_percentage
+        )
+
+    def __ne__(self, other: object) -> bool:
+        return not self.__eq__(other)