diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py | 392 |
1 files changed, 392 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py new file mode 100644 index 00000000..9bde33c8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_batch_deployment_operations.py @@ -0,0 +1,392 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access, too-many-boolean-expressions + +import re +from typing import Any, Optional, TypeVar, Union + +from azure.ai.ml._restclient.v2024_01_01_preview import AzureMachineLearningWorkspaces as ServiceClient012024Preview +from azure.ai.ml._scope_dependent_operations import ( + OperationConfig, + OperationsContainer, + OperationScope, + _ScopeDependentOperations, +) +from azure.ai.ml._telemetry import ActivityType, monitor_with_activity +from azure.ai.ml._utils._arm_id_utils import AMLVersionedArmId +from azure.ai.ml._utils._azureml_polling import AzureMLPolling +from azure.ai.ml._utils._endpoint_utils import upload_dependencies, validate_scoring_script +from azure.ai.ml._utils._http_utils import HttpPipeline +from azure.ai.ml._utils._logger_utils import OpsLogger +from azure.ai.ml._utils._package_utils import package_deployment +from azure.ai.ml._utils.utils import _get_mfe_base_url_from_discovery_service, modified_operation_client +from azure.ai.ml.constants._common import ARM_ID_PREFIX, AzureMLResourceType, LROConfigurations +from azure.ai.ml.entities import BatchDeployment, BatchJob, ModelBatchDeployment, PipelineComponent, PipelineJob +from azure.ai.ml.entities._deployment.pipeline_component_batch_deployment import PipelineComponentBatchDeployment +from azure.core.credentials import TokenCredential +from azure.core.exceptions import HttpResponseError, ResourceNotFoundError +from azure.core.paging import ItemPaged +from azure.core.polling import LROPoller +from azure.core.tracing.decorator import distributed_trace + +from ._operation_orchestrator import OperationOrchestrator + +ops_logger = OpsLogger(__name__) +module_logger = ops_logger.module_logger +DeploymentType = TypeVar( + "DeploymentType", bound=Union[BatchDeployment, PipelineComponentBatchDeployment, ModelBatchDeployment] +) + + +class BatchDeploymentOperations(_ScopeDependentOperations): + """BatchDeploymentOperations. + + You should not instantiate this class directly. Instead, you should create an MLClient instance that instantiates it + for you and attaches it as an attribute. + + :param operation_scope: Scope variables for the operations classes of an MLClient object. + :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope + :param operation_config: Common configuration for operations classes of an MLClient object. + :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig + :param service_client_05_2022: Service client to allow end users to operate on Azure Machine Learning Workspace + resources. + :type service_client_05_2022: ~azure.ai.ml._restclient.v2022_05_01._azure_machine_learning_workspaces. + AzureMachineLearningWorkspaces + :param all_operations: All operations classes of an MLClient object. + :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer + :param credentials: Credential to use for authentication. + :type credentials: ~azure.core.credentials.TokenCredential + """ + + def __init__( + self, + operation_scope: OperationScope, + operation_config: OperationConfig, + service_client_01_2024_preview: ServiceClient012024Preview, + all_operations: OperationsContainer, + credentials: Optional[TokenCredential] = None, + **kwargs: Any, + ): + super(BatchDeploymentOperations, self).__init__(operation_scope, operation_config) + ops_logger.update_filter() + self._batch_deployment = service_client_01_2024_preview.batch_deployments + self._batch_job_deployment = kwargs.pop("service_client_09_2020_dataplanepreview").batch_job_deployment + service_client_02_2023_preview = kwargs.pop("service_client_02_2023_preview") + self._component_batch_deployment_operations = service_client_02_2023_preview.batch_deployments + self._batch_endpoint_operations = service_client_01_2024_preview.batch_endpoints + self._component_operations = service_client_02_2023_preview.component_versions + self._all_operations = all_operations + self._credentials = credentials + self._init_kwargs = kwargs + + self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline") + + @distributed_trace + @monitor_with_activity(ops_logger, "BatchDeployment.BeginCreateOrUpdate", ActivityType.PUBLICAPI) + def begin_create_or_update( + self, + deployment: DeploymentType, + *, + skip_script_validation: bool = False, + **kwargs: Any, + ) -> LROPoller[DeploymentType]: + """Create or update a batch deployment. + + :param deployment: The deployment entity. + :type deployment: ~azure.ai.ml.entities.BatchDeployment + :keyword skip_script_validation: If set to True, the script validation will be skipped. Defaults to False. + :paramtype skip_script_validation: bool + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if BatchDeployment cannot be + successfully validated. Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.AssetException: Raised if BatchDeployment assets + (e.g. Data, Code, Model, Environment) cannot be successfully validated. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.ModelException: Raised if BatchDeployment model + cannot be successfully validated. Details will be provided in the error message. + :return: A poller to track the operation status. + :rtype: ~azure.core.polling.LROPoller[~azure.ai.ml.entities.BatchDeployment] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START batch_deployment_operations_begin_create_or_update] + :end-before: [END batch_deployment_operations_begin_create_or_update] + :language: python + :dedent: 8 + :caption: Create example. + """ + if ( + not skip_script_validation + and not isinstance(deployment, PipelineComponentBatchDeployment) + and deployment + and deployment.code_configuration # type: ignore + and not deployment.code_configuration.code.startswith(ARM_ID_PREFIX) # type: ignore + and not re.match(AMLVersionedArmId.REGEX_PATTERN, deployment.code_configuration.code) # type: ignore + ): + validate_scoring_script(deployment) + module_logger.debug("Checking endpoint %s exists", deployment.endpoint_name) + self._batch_endpoint_operations.get( + endpoint_name=deployment.endpoint_name, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + ) + orchestrators = OperationOrchestrator( + operation_container=self._all_operations, + operation_scope=self._operation_scope, + operation_config=self._operation_config, + ) + if isinstance(deployment, PipelineComponentBatchDeployment): + self._validate_component(deployment, orchestrators) # type: ignore + else: + upload_dependencies(deployment, orchestrators) + try: + location = self._get_workspace_location() + if kwargs.pop("package_model", False): + deployment = package_deployment(deployment, self._all_operations.all_operations) + module_logger.info("\nStarting deployment") + deployment_rest = deployment._to_rest_object(location=location) + if isinstance(deployment, PipelineComponentBatchDeployment): # pylint: disable=no-else-return + return self._component_batch_deployment_operations.begin_create_or_update( + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + endpoint_name=deployment.endpoint_name, + deployment_name=deployment.name, + body=deployment_rest, + **self._init_kwargs, + cls=lambda response, deserialized, headers: PipelineComponentBatchDeployment._from_rest_object( + deserialized + ), + ) + else: + return self._batch_deployment.begin_create_or_update( + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + endpoint_name=deployment.endpoint_name, + deployment_name=deployment.name, + body=deployment_rest, + **self._init_kwargs, + cls=lambda response, deserialized, headers: BatchDeployment._from_rest_object(deserialized), + ) + except Exception as ex: + raise ex + + @distributed_trace + @monitor_with_activity(ops_logger, "BatchDeployment.Get", ActivityType.PUBLICAPI) + def get(self, name: str, endpoint_name: str) -> BatchDeployment: + """Get a deployment resource. + + :param name: The name of the deployment + :type name: str + :param endpoint_name: The name of the endpoint + :type endpoint_name: str + :return: A deployment entity + :rtype: ~azure.ai.ml.entities.BatchDeployment + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START batch_deployment_operations_get] + :end-before: [END batch_deployment_operations_get] + :language: python + :dedent: 8 + :caption: Get example. + """ + deployment = BatchDeployment._from_rest_object( + self._batch_deployment.get( + endpoint_name=endpoint_name, + deployment_name=name, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + **self._init_kwargs, + ) + ) + deployment.endpoint_name = endpoint_name + return deployment + + @distributed_trace + @monitor_with_activity(ops_logger, "BatchDeployment.BeginDelete", ActivityType.PUBLICAPI) + def begin_delete(self, name: str, endpoint_name: str) -> LROPoller[None]: + """Delete a batch deployment. + + :param name: Name of the batch deployment. + :type name: str + :param endpoint_name: Name of the batch endpoint + :type endpoint_name: str + :return: A poller to track the operation status. + :rtype: ~azure.core.polling.LROPoller[None] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START batch_deployment_operations_delete] + :end-before: [END batch_deployment_operations_delete] + :language: python + :dedent: 8 + :caption: Delete example. + """ + path_format_arguments = { + "endpointName": name, + "resourceGroupName": self._resource_group_name, + "workspaceName": self._workspace_name, + } + + delete_poller = self._batch_deployment.begin_delete( + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + endpoint_name=endpoint_name, + deployment_name=name, + polling=AzureMLPolling( + LROConfigurations.POLL_INTERVAL, + path_format_arguments=path_format_arguments, + **self._init_kwargs, + ), + polling_interval=LROConfigurations.POLL_INTERVAL, + **self._init_kwargs, + ) + return delete_poller + + @distributed_trace + @monitor_with_activity(ops_logger, "BatchDeployment.List", ActivityType.PUBLICAPI) + def list(self, endpoint_name: str) -> ItemPaged[BatchDeployment]: + """List a deployment resource. + + :param endpoint_name: The name of the endpoint + :type endpoint_name: str + :return: An iterator of deployment entities + :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.BatchDeployment] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START batch_deployment_operations_list] + :end-before: [END batch_deployment_operations_list] + :language: python + :dedent: 8 + :caption: List deployment resource example. + """ + return self._batch_deployment.list( + endpoint_name=endpoint_name, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + cls=lambda objs: [BatchDeployment._from_rest_object(obj) for obj in objs], + **self._init_kwargs, + ) + + @distributed_trace + @monitor_with_activity(ops_logger, "BatchDeployment.ListJobs", ActivityType.PUBLICAPI) + def list_jobs(self, endpoint_name: str, *, name: Optional[str] = None) -> ItemPaged[BatchJob]: + """List jobs under the provided batch endpoint deployment. This is only valid for batch endpoint. + + :param endpoint_name: Name of endpoint. + :type endpoint_name: str + :keyword name: (Optional) Name of deployment. + :paramtype name: str + :raise: Exception if endpoint_type is not BATCH_ENDPOINT_TYPE + :return: List of jobs + :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.BatchJob] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START batch_deployment_operations_list_jobs] + :end-before: [END batch_deployment_operations_list_jobs] + :language: python + :dedent: 8 + :caption: List jobs example. + """ + + workspace_operations = self._all_operations.all_operations[AzureMLResourceType.WORKSPACE] + mfe_base_uri = _get_mfe_base_url_from_discovery_service( + workspace_operations, self._workspace_name, self._requests_pipeline + ) + + with modified_operation_client(self._batch_job_deployment, mfe_base_uri): + result = self._batch_job_deployment.list( + endpoint_name=endpoint_name, + deployment_name=name, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + **self._init_kwargs, + ) + + # This is necessary as the paged result need to be resolved inside the context manager + return list(result) + + def _get_workspace_location(self) -> str: + """Get the workspace location + + TODO[TASK 1260265]: can we cache this information and only refresh when the operation_scope is changed? + + :return: The workspace location + :rtype: str + """ + return str( + self._all_operations.all_operations[AzureMLResourceType.WORKSPACE].get(self._workspace_name).location + ) + + def _validate_component(self, deployment: Any, orchestrators: OperationOrchestrator) -> None: + """Validates that the value provided is associated to an existing component or otherwise we will try to create + an anonymous component that will be use for batch deployment. + + :param deployment: Batch deployment + :type deployment: ~azure.ai.ml.entities._deployment.deployment.Deployment + :param orchestrators: Operation Orchestrator + :type orchestrators: _operation_orchestrator.OperationOrchestrator + """ + if isinstance(deployment.component, PipelineComponent): + try: + registered_component = self._all_operations.all_operations[AzureMLResourceType.COMPONENT].get( + name=deployment.component.name, version=deployment.component.version + ) + deployment.component = registered_component.id + except Exception as err: # pylint: disable=W0718 + if isinstance(err, (ResourceNotFoundError, HttpResponseError)): + deployment.component = self._all_operations.all_operations[ + AzureMLResourceType.COMPONENT + ].create_or_update( + name=deployment.component.name, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + component=deployment.component, + version=deployment.component.version, + **self._init_kwargs, + ) + else: + raise err + elif isinstance(deployment.component, str): + component_id = orchestrators.get_asset_arm_id( + deployment.component, azureml_type=AzureMLResourceType.COMPONENT + ) + deployment.component = component_id + elif isinstance(deployment.job_definition, str): + job_component = PipelineComponent(source_job_id=deployment.job_definition) + job_component = self._component_operations.create_or_update( + name=job_component.name, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + body=job_component._to_rest_object(), + version=job_component.version, + **self._init_kwargs, + ) + deployment.component = job_component.id + + elif isinstance(deployment.job_definition, PipelineJob): + try: + registered_job = self._all_operations.all_operations[AzureMLResourceType.JOB].get( + name=deployment.job_definition.name + ) + if registered_job: + job_component = PipelineComponent(source_job_id=registered_job.name) + job_component = self._component_operations.create_or_update( + name=job_component.name, + resource_group_name=self._resource_group_name, + workspace_name=self._workspace_name, + body=job_component._to_rest_object(), + version=job_component.version, + **self._init_kwargs, + ) + deployment.component = job_component.id + except ResourceNotFoundError as err: + raise err |