# --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- # pylint: disable=protected-access from typing import Any, Dict, Iterable, Optional, cast from azure.ai.ml._restclient.v2023_08_01_preview import AzureMachineLearningWorkspaces as ServiceClient022023Preview from azure.ai.ml._restclient.v2024_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042024Preview from azure.ai.ml._restclient.v2024_04_01_preview.models import SsoSetting from azure.ai.ml._scope_dependent_operations import OperationConfig, OperationScope, _ScopeDependentOperations from azure.ai.ml._telemetry import ActivityType, monitor_with_activity from azure.ai.ml._utils._experimental import experimental from azure.ai.ml._utils._logger_utils import OpsLogger from azure.ai.ml.constants._common import COMPUTE_UPDATE_ERROR from azure.ai.ml.constants._compute import ComputeType from azure.ai.ml.entities import AmlComputeNodeInfo, Compute, Usage, VmSize from azure.core.polling import LROPoller from azure.core.tracing.decorator import distributed_trace ops_logger = OpsLogger(__name__) module_logger = ops_logger.module_logger class ComputeOperations(_ScopeDependentOperations): """ComputeOperations. This class should not be instantiated directly. Instead, use the `compute` attribute of an MLClient object. :param operation_scope: Scope variables for the operations classes of an MLClient object. :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope :param operation_config: Common configuration for operations classes of an MLClient object. :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig :param service_client: Service client to allow end users to operate on Azure Machine Learning Workspace resources. :type service_client: ~azure.ai.ml._restclient.v2023_02_01_preview.AzureMachineLearningWorkspaces """ def __init__( self, operation_scope: OperationScope, operation_config: OperationConfig, service_client: ServiceClient022023Preview, service_client_2024: ServiceClient042024Preview, **kwargs: Dict, ) -> None: super(ComputeOperations, self).__init__(operation_scope, operation_config) ops_logger.update_filter() self._operation = service_client.compute self._operation2024 = service_client_2024.compute self._workspace_operations = service_client.workspaces self._vmsize_operations = service_client.virtual_machine_sizes self._usage_operations = service_client.usages self._init_kwargs = kwargs @distributed_trace @monitor_with_activity(ops_logger, "Compute.List", ActivityType.PUBLICAPI) def list(self, *, compute_type: Optional[str] = None) -> Iterable[Compute]: """List computes of the workspace. :keyword compute_type: The type of the compute to be listed, case-insensitive. Defaults to AMLCompute. :paramtype compute_type: Optional[str] :return: An iterator like instance of Compute objects. :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.Compute] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_list] :end-before: [END compute_operations_list] :language: python :dedent: 8 :caption: Retrieving a list of the AzureML Kubernetes compute resources in a workspace. """ return cast( Iterable[Compute], self._operation.list( self._operation_scope.resource_group_name, self._workspace_name, cls=lambda objs: [ Compute._from_rest_object(obj) for obj in objs if compute_type is None or str(Compute._from_rest_object(obj).type).lower() == compute_type.lower() ], ), ) @distributed_trace @monitor_with_activity(ops_logger, "Compute.Get", ActivityType.PUBLICAPI) def get(self, name: str) -> Compute: """Get a compute resource. :param name: Name of the compute resource. :type name: str :return: A Compute object. :rtype: ~azure.ai.ml.entities.Compute .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_get] :end-before: [END compute_operations_get] :language: python :dedent: 8 :caption: Retrieving a compute resource from a workspace. """ rest_obj = self._operation.get( self._operation_scope.resource_group_name, self._workspace_name, name, ) return Compute._from_rest_object(rest_obj) @distributed_trace @monitor_with_activity(ops_logger, "Compute.ListNodes", ActivityType.PUBLICAPI) def list_nodes(self, name: str) -> Iterable[AmlComputeNodeInfo]: """Retrieve a list of a compute resource's nodes. :param name: Name of the compute resource. :type name: str :return: An iterator-like instance of AmlComputeNodeInfo objects. :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.AmlComputeNodeInfo] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_list_nodes] :end-before: [END compute_operations_list_nodes] :language: python :dedent: 8 :caption: Retrieving a list of nodes from a compute resource. """ return cast( Iterable[AmlComputeNodeInfo], self._operation.list_nodes( self._operation_scope.resource_group_name, self._workspace_name, name, cls=lambda objs: [AmlComputeNodeInfo._from_rest_object(obj) for obj in objs], ), ) @distributed_trace @monitor_with_activity(ops_logger, "Compute.BeginCreateOrUpdate", ActivityType.PUBLICAPI) def begin_create_or_update(self, compute: Compute) -> LROPoller[Compute]: """Create and register a compute resource. :param compute: The compute resource definition. :type compute: ~azure.ai.ml.entities.Compute :return: An instance of LROPoller that returns a Compute object once the long-running operation is complete. :rtype: ~azure.core.polling.LROPoller[~azure.ai.ml.entities.Compute] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_create_update] :end-before: [END compute_operations_create_update] :language: python :dedent: 8 :caption: Creating and registering a compute resource. """ if compute.type != ComputeType.AMLCOMPUTE: if compute.location: module_logger.warning( "Warning: 'Location' is not supported for compute type %s and will not be used.", compute.type, ) compute.location = self._get_workspace_location() if not compute.location: compute.location = self._get_workspace_location() compute._set_full_subnet_name( self._operation_scope.subscription_id, self._operation_scope.resource_group_name, ) compute_rest_obj = compute._to_rest_object() poller = self._operation.begin_create_or_update( self._operation_scope.resource_group_name, self._workspace_name, compute_name=compute.name, parameters=compute_rest_obj, polling=True, cls=lambda response, deserialized, headers: Compute._from_rest_object(deserialized), ) return poller @distributed_trace @monitor_with_activity(ops_logger, "Compute.Attach", ActivityType.PUBLICAPI) def begin_attach(self, compute: Compute, **kwargs: Any) -> LROPoller[Compute]: """Attach a compute resource to the workspace. :param compute: The compute resource definition. :type compute: ~azure.ai.ml.entities.Compute :return: An instance of LROPoller that returns a Compute object once the long-running operation is complete. :rtype: ~azure.core.polling.LROPoller[~azure.ai.ml.entities.Compute] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_attach] :end-before: [END compute_operations_attach] :language: python :dedent: 8 :caption: Attaching a compute resource to the workspace. """ return self.begin_create_or_update(compute=compute, **kwargs) @distributed_trace @monitor_with_activity(ops_logger, "Compute.BeginUpdate", ActivityType.PUBLICAPI) def begin_update(self, compute: Compute) -> LROPoller[Compute]: """Update a compute resource. Currently only valid for AmlCompute resource types. :param compute: The compute resource definition. :type compute: ~azure.ai.ml.entities.Compute :return: An instance of LROPoller that returns a Compute object once the long-running operation is complete. :rtype: ~azure.core.polling.LROPoller[~azure.ai.ml.entities.Compute] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_update] :end-before: [END compute_operations_update] :language: python :dedent: 8 :caption: Updating an AmlCompute resource. """ if not compute.type == ComputeType.AMLCOMPUTE: COMPUTE_UPDATE_ERROR.format(compute.name, compute.type) compute_rest_obj = compute._to_rest_object() poller = self._operation.begin_create_or_update( self._operation_scope.resource_group_name, self._workspace_name, compute_name=compute.name, parameters=compute_rest_obj, polling=True, cls=lambda response, deserialized, headers: Compute._from_rest_object(deserialized), ) return poller @distributed_trace @monitor_with_activity(ops_logger, "Compute.BeginDelete", ActivityType.PUBLICAPI) def begin_delete(self, name: str, *, action: str = "Delete") -> LROPoller[None]: """Delete or detach a compute resource. :param name: The name of the compute resource. :type name: str :keyword action: Action to perform. Possible values: ["Delete", "Detach"]. Defaults to "Delete". :type action: str :return: A poller to track the operation status. :rtype: ~azure.core.polling.LROPoller[None] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_delete] :end-before: [END compute_operations_delete] :language: python :dedent: 8 :caption: Delete compute example. """ return self._operation.begin_delete( resource_group_name=self._operation_scope.resource_group_name, workspace_name=self._workspace_name, compute_name=name, underlying_resource_action=action, **self._init_kwargs, ) @distributed_trace @monitor_with_activity(ops_logger, "Compute.BeginStart", ActivityType.PUBLICAPI) def begin_start(self, name: str) -> LROPoller[None]: """Start a compute instance. :param name: The name of the compute instance. :type name: str :return: A poller to track the operation status. :rtype: azure.core.polling.LROPoller[None] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_start] :end-before: [END compute_operations_start] :language: python :dedent: 8 :caption: Starting a compute instance. """ return self._operation.begin_start( self._operation_scope.resource_group_name, self._workspace_name, name, ) @distributed_trace @monitor_with_activity(ops_logger, "Compute.BeginStop", ActivityType.PUBLICAPI) def begin_stop(self, name: str) -> LROPoller[None]: """Stop a compute instance. :param name: The name of the compute instance. :type name: str :return: A poller to track the operation status. :rtype: azure.core.polling.LROPoller[None] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_stop] :end-before: [END compute_operations_stop] :language: python :dedent: 8 :caption: Stopping a compute instance. """ return self._operation.begin_stop( self._operation_scope.resource_group_name, self._workspace_name, name, ) @distributed_trace @monitor_with_activity(ops_logger, "Compute.BeginRestart", ActivityType.PUBLICAPI) def begin_restart(self, name: str) -> LROPoller[None]: """Restart a compute instance. :param name: The name of the compute instance. :type name: str :return: A poller to track the operation status. :rtype: azure.core.polling.LROPoller[None] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_restart] :end-before: [END compute_operations_restart] :language: python :dedent: 8 :caption: Restarting a stopped compute instance. """ return self._operation.begin_restart( self._operation_scope.resource_group_name, self._workspace_name, name, ) @distributed_trace @monitor_with_activity(ops_logger, "Compute.ListUsage", ActivityType.PUBLICAPI) def list_usage(self, *, location: Optional[str] = None) -> Iterable[Usage]: """List the current usage information as well as AzureML resource limits for the given subscription and location. :keyword location: The location for which resource usage is queried. Defaults to workspace location. :paramtype location: Optional[str] :return: An iterator over current usage info objects. :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.Usage] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_list_usage] :end-before: [END compute_operations_list_usage] :language: python :dedent: 8 :caption: Listing resource usage for the workspace location. """ if not location: location = self._get_workspace_location() return cast( Iterable[Usage], self._usage_operations.list( location=location, cls=lambda objs: [Usage._from_rest_object(obj) for obj in objs], ), ) @distributed_trace @monitor_with_activity(ops_logger, "Compute.ListSizes", ActivityType.PUBLICAPI) def list_sizes(self, *, location: Optional[str] = None, compute_type: Optional[str] = None) -> Iterable[VmSize]: """List the supported VM sizes in a location. :keyword location: The location upon which virtual-machine-sizes is queried. Defaults to workspace location. :paramtype location: str :keyword compute_type: The type of the compute to be listed, case-insensitive. Defaults to AMLCompute. :paramtype compute_type: Optional[str] :return: An iterator over virtual machine size objects. :rtype: Iterable[~azure.ai.ml.entities.VmSize] .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_compute.py :start-after: [START compute_operations_list_sizes] :end-before: [END compute_operations_list_sizes] :language: python :dedent: 8 :caption: Listing the supported VM sizes in the workspace location. """ if not location: location = self._get_workspace_location() size_list = self._vmsize_operations.list(location=location) if not size_list: return [] if compute_type: return [ VmSize._from_rest_object(item) for item in size_list.value if compute_type.lower() in (supported_type.lower() for supported_type in item.supported_compute_types) ] return [VmSize._from_rest_object(item) for item in size_list.value] @distributed_trace @monitor_with_activity(ops_logger, "Compute.enablesso", ActivityType.PUBLICAPI) @experimental def enable_sso(self, *, name: str, enable_sso: bool = True, **kwargs: Any) -> None: """enable sso for a compute instance. :keyword name: Name of the compute instance. :paramtype name: str :keyword enable_sso: enable sso bool flag Default to True :paramtype enable_sso: bool """ self._operation2024.update_sso_settings( self._operation_scope.resource_group_name, self._workspace_name, name, parameters=SsoSetting(enable_sso=enable_sso), **kwargs, ) def _get_workspace_location(self) -> str: workspace = self._workspace_operations.get(self._resource_group_name, self._workspace_name) return str(workspace.location)