aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py392
1 files changed, 392 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py
new file mode 100644
index 00000000..9bde33c8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py
@@ -0,0 +1,392 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access, too-many-boolean-expressions
+
+import re
+from typing import Any, Optional, TypeVar, Union
+
+from azure.ai.ml._restclient.v2024_01_01_preview import AzureMachineLearningWorkspaces as ServiceClient012024Preview
+from azure.ai.ml._scope_dependent_operations import (
+ OperationConfig,
+ OperationsContainer,
+ OperationScope,
+ _ScopeDependentOperations,
+)
+from azure.ai.ml._telemetry import ActivityType, monitor_with_activity
+from azure.ai.ml._utils._arm_id_utils import AMLVersionedArmId
+from azure.ai.ml._utils._azureml_polling import AzureMLPolling
+from azure.ai.ml._utils._endpoint_utils import upload_dependencies, validate_scoring_script
+from azure.ai.ml._utils._http_utils import HttpPipeline
+from azure.ai.ml._utils._logger_utils import OpsLogger
+from azure.ai.ml._utils._package_utils import package_deployment
+from azure.ai.ml._utils.utils import _get_mfe_base_url_from_discovery_service, modified_operation_client
+from azure.ai.ml.constants._common import ARM_ID_PREFIX, AzureMLResourceType, LROConfigurations
+from azure.ai.ml.entities import BatchDeployment, BatchJob, ModelBatchDeployment, PipelineComponent, PipelineJob
+from azure.ai.ml.entities._deployment.pipeline_component_batch_deployment import PipelineComponentBatchDeployment
+from azure.core.credentials import TokenCredential
+from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
+from azure.core.paging import ItemPaged
+from azure.core.polling import LROPoller
+from azure.core.tracing.decorator import distributed_trace
+
+from ._operation_orchestrator import OperationOrchestrator
+
+ops_logger = OpsLogger(__name__)
+module_logger = ops_logger.module_logger
+DeploymentType = TypeVar(
+ "DeploymentType", bound=Union[BatchDeployment, PipelineComponentBatchDeployment, ModelBatchDeployment]
+)
+
+
+class BatchDeploymentOperations(_ScopeDependentOperations):
+ """BatchDeploymentOperations.
+
+ You should not instantiate this class directly. Instead, you should create an MLClient instance that instantiates it
+ for you and attaches it as an attribute.
+
+ :param operation_scope: Scope variables for the operations classes of an MLClient object.
+ :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
+ :param operation_config: Common configuration for operations classes of an MLClient object.
+ :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
+ :param service_client_05_2022: Service client to allow end users to operate on Azure Machine Learning Workspace
+ resources.
+ :type service_client_05_2022: ~azure.ai.ml._restclient.v2022_05_01._azure_machine_learning_workspaces.
+ AzureMachineLearningWorkspaces
+ :param all_operations: All operations classes of an MLClient object.
+ :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer
+ :param credentials: Credential to use for authentication.
+ :type credentials: ~azure.core.credentials.TokenCredential
+ """
+
+ def __init__(
+ self,
+ operation_scope: OperationScope,
+ operation_config: OperationConfig,
+ service_client_01_2024_preview: ServiceClient012024Preview,
+ all_operations: OperationsContainer,
+ credentials: Optional[TokenCredential] = None,
+ **kwargs: Any,
+ ):
+ super(BatchDeploymentOperations, self).__init__(operation_scope, operation_config)
+ ops_logger.update_filter()
+ self._batch_deployment = service_client_01_2024_preview.batch_deployments
+ self._batch_job_deployment = kwargs.pop("service_client_09_2020_dataplanepreview").batch_job_deployment
+ service_client_02_2023_preview = kwargs.pop("service_client_02_2023_preview")
+ self._component_batch_deployment_operations = service_client_02_2023_preview.batch_deployments
+ self._batch_endpoint_operations = service_client_01_2024_preview.batch_endpoints
+ self._component_operations = service_client_02_2023_preview.component_versions
+ self._all_operations = all_operations
+ self._credentials = credentials
+ self._init_kwargs = kwargs
+
+ self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline")
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "BatchDeployment.BeginCreateOrUpdate", ActivityType.PUBLICAPI)
+ def begin_create_or_update(
+ self,
+ deployment: DeploymentType,
+ *,
+ skip_script_validation: bool = False,
+ **kwargs: Any,
+ ) -> LROPoller[DeploymentType]:
+ """Create or update a batch deployment.
+
+ :param deployment: The deployment entity.
+ :type deployment: ~azure.ai.ml.entities.BatchDeployment
+ :keyword skip_script_validation: If set to True, the script validation will be skipped. Defaults to False.
+ :paramtype skip_script_validation: bool
+ :raises ~azure.ai.ml.exceptions.ValidationException: Raised if BatchDeployment cannot be
+ successfully validated. Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.AssetException: Raised if BatchDeployment assets
+ (e.g. Data, Code, Model, Environment) cannot be successfully validated.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.ModelException: Raised if BatchDeployment model
+ cannot be successfully validated. Details will be provided in the error message.
+ :return: A poller to track the operation status.
+ :rtype: ~azure.core.polling.LROPoller[~azure.ai.ml.entities.BatchDeployment]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START batch_deployment_operations_begin_create_or_update]
+ :end-before: [END batch_deployment_operations_begin_create_or_update]
+ :language: python
+ :dedent: 8
+ :caption: Create example.
+ """
+ if (
+ not skip_script_validation
+ and not isinstance(deployment, PipelineComponentBatchDeployment)
+ and deployment
+ and deployment.code_configuration # type: ignore
+ and not deployment.code_configuration.code.startswith(ARM_ID_PREFIX) # type: ignore
+ and not re.match(AMLVersionedArmId.REGEX_PATTERN, deployment.code_configuration.code) # type: ignore
+ ):
+ validate_scoring_script(deployment)
+ module_logger.debug("Checking endpoint %s exists", deployment.endpoint_name)
+ self._batch_endpoint_operations.get(
+ endpoint_name=deployment.endpoint_name,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ )
+ orchestrators = OperationOrchestrator(
+ operation_container=self._all_operations,
+ operation_scope=self._operation_scope,
+ operation_config=self._operation_config,
+ )
+ if isinstance(deployment, PipelineComponentBatchDeployment):
+ self._validate_component(deployment, orchestrators) # type: ignore
+ else:
+ upload_dependencies(deployment, orchestrators)
+ try:
+ location = self._get_workspace_location()
+ if kwargs.pop("package_model", False):
+ deployment = package_deployment(deployment, self._all_operations.all_operations)
+ module_logger.info("\nStarting deployment")
+ deployment_rest = deployment._to_rest_object(location=location)
+ if isinstance(deployment, PipelineComponentBatchDeployment): # pylint: disable=no-else-return
+ return self._component_batch_deployment_operations.begin_create_or_update(
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ endpoint_name=deployment.endpoint_name,
+ deployment_name=deployment.name,
+ body=deployment_rest,
+ **self._init_kwargs,
+ cls=lambda response, deserialized, headers: PipelineComponentBatchDeployment._from_rest_object(
+ deserialized
+ ),
+ )
+ else:
+ return self._batch_deployment.begin_create_or_update(
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ endpoint_name=deployment.endpoint_name,
+ deployment_name=deployment.name,
+ body=deployment_rest,
+ **self._init_kwargs,
+ cls=lambda response, deserialized, headers: BatchDeployment._from_rest_object(deserialized),
+ )
+ except Exception as ex:
+ raise ex
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "BatchDeployment.Get", ActivityType.PUBLICAPI)
+ def get(self, name: str, endpoint_name: str) -> BatchDeployment:
+ """Get a deployment resource.
+
+ :param name: The name of the deployment
+ :type name: str
+ :param endpoint_name: The name of the endpoint
+ :type endpoint_name: str
+ :return: A deployment entity
+ :rtype: ~azure.ai.ml.entities.BatchDeployment
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START batch_deployment_operations_get]
+ :end-before: [END batch_deployment_operations_get]
+ :language: python
+ :dedent: 8
+ :caption: Get example.
+ """
+ deployment = BatchDeployment._from_rest_object(
+ self._batch_deployment.get(
+ endpoint_name=endpoint_name,
+ deployment_name=name,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._init_kwargs,
+ )
+ )
+ deployment.endpoint_name = endpoint_name
+ return deployment
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "BatchDeployment.BeginDelete", ActivityType.PUBLICAPI)
+ def begin_delete(self, name: str, endpoint_name: str) -> LROPoller[None]:
+ """Delete a batch deployment.
+
+ :param name: Name of the batch deployment.
+ :type name: str
+ :param endpoint_name: Name of the batch endpoint
+ :type endpoint_name: str
+ :return: A poller to track the operation status.
+ :rtype: ~azure.core.polling.LROPoller[None]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START batch_deployment_operations_delete]
+ :end-before: [END batch_deployment_operations_delete]
+ :language: python
+ :dedent: 8
+ :caption: Delete example.
+ """
+ path_format_arguments = {
+ "endpointName": name,
+ "resourceGroupName": self._resource_group_name,
+ "workspaceName": self._workspace_name,
+ }
+
+ delete_poller = self._batch_deployment.begin_delete(
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ endpoint_name=endpoint_name,
+ deployment_name=name,
+ polling=AzureMLPolling(
+ LROConfigurations.POLL_INTERVAL,
+ path_format_arguments=path_format_arguments,
+ **self._init_kwargs,
+ ),
+ polling_interval=LROConfigurations.POLL_INTERVAL,
+ **self._init_kwargs,
+ )
+ return delete_poller
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "BatchDeployment.List", ActivityType.PUBLICAPI)
+ def list(self, endpoint_name: str) -> ItemPaged[BatchDeployment]:
+ """List a deployment resource.
+
+ :param endpoint_name: The name of the endpoint
+ :type endpoint_name: str
+ :return: An iterator of deployment entities
+ :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.BatchDeployment]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START batch_deployment_operations_list]
+ :end-before: [END batch_deployment_operations_list]
+ :language: python
+ :dedent: 8
+ :caption: List deployment resource example.
+ """
+ return self._batch_deployment.list(
+ endpoint_name=endpoint_name,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ cls=lambda objs: [BatchDeployment._from_rest_object(obj) for obj in objs],
+ **self._init_kwargs,
+ )
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "BatchDeployment.ListJobs", ActivityType.PUBLICAPI)
+ def list_jobs(self, endpoint_name: str, *, name: Optional[str] = None) -> ItemPaged[BatchJob]:
+ """List jobs under the provided batch endpoint deployment. This is only valid for batch endpoint.
+
+ :param endpoint_name: Name of endpoint.
+ :type endpoint_name: str
+ :keyword name: (Optional) Name of deployment.
+ :paramtype name: str
+ :raise: Exception if endpoint_type is not BATCH_ENDPOINT_TYPE
+ :return: List of jobs
+ :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.BatchJob]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START batch_deployment_operations_list_jobs]
+ :end-before: [END batch_deployment_operations_list_jobs]
+ :language: python
+ :dedent: 8
+ :caption: List jobs example.
+ """
+
+ workspace_operations = self._all_operations.all_operations[AzureMLResourceType.WORKSPACE]
+ mfe_base_uri = _get_mfe_base_url_from_discovery_service(
+ workspace_operations, self._workspace_name, self._requests_pipeline
+ )
+
+ with modified_operation_client(self._batch_job_deployment, mfe_base_uri):
+ result = self._batch_job_deployment.list(
+ endpoint_name=endpoint_name,
+ deployment_name=name,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._init_kwargs,
+ )
+
+ # This is necessary as the paged result need to be resolved inside the context manager
+ return list(result)
+
+ def _get_workspace_location(self) -> str:
+ """Get the workspace location
+
+ TODO[TASK 1260265]: can we cache this information and only refresh when the operation_scope is changed?
+
+ :return: The workspace location
+ :rtype: str
+ """
+ return str(
+ self._all_operations.all_operations[AzureMLResourceType.WORKSPACE].get(self._workspace_name).location
+ )
+
+ def _validate_component(self, deployment: Any, orchestrators: OperationOrchestrator) -> None:
+ """Validates that the value provided is associated to an existing component or otherwise we will try to create
+ an anonymous component that will be use for batch deployment.
+
+ :param deployment: Batch deployment
+ :type deployment: ~azure.ai.ml.entities._deployment.deployment.Deployment
+ :param orchestrators: Operation Orchestrator
+ :type orchestrators: _operation_orchestrator.OperationOrchestrator
+ """
+ if isinstance(deployment.component, PipelineComponent):
+ try:
+ registered_component = self._all_operations.all_operations[AzureMLResourceType.COMPONENT].get(
+ name=deployment.component.name, version=deployment.component.version
+ )
+ deployment.component = registered_component.id
+ except Exception as err: # pylint: disable=W0718
+ if isinstance(err, (ResourceNotFoundError, HttpResponseError)):
+ deployment.component = self._all_operations.all_operations[
+ AzureMLResourceType.COMPONENT
+ ].create_or_update(
+ name=deployment.component.name,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ component=deployment.component,
+ version=deployment.component.version,
+ **self._init_kwargs,
+ )
+ else:
+ raise err
+ elif isinstance(deployment.component, str):
+ component_id = orchestrators.get_asset_arm_id(
+ deployment.component, azureml_type=AzureMLResourceType.COMPONENT
+ )
+ deployment.component = component_id
+ elif isinstance(deployment.job_definition, str):
+ job_component = PipelineComponent(source_job_id=deployment.job_definition)
+ job_component = self._component_operations.create_or_update(
+ name=job_component.name,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ body=job_component._to_rest_object(),
+ version=job_component.version,
+ **self._init_kwargs,
+ )
+ deployment.component = job_component.id
+
+ elif isinstance(deployment.job_definition, PipelineJob):
+ try:
+ registered_job = self._all_operations.all_operations[AzureMLResourceType.JOB].get(
+ name=deployment.job_definition.name
+ )
+ if registered_job:
+ job_component = PipelineComponent(source_job_id=registered_job.name)
+ job_component = self._component_operations.create_or_update(
+ name=job_component.name,
+ resource_group_name=self._resource_group_name,
+ workspace_name=self._workspace_name,
+ body=job_component._to_rest_object(),
+ version=job_component.version,
+ **self._init_kwargs,
+ )
+ deployment.component = job_component.id
+ except ResourceNotFoundError as err:
+ raise err