aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_compute/compute_instance.py
blob: 9cbb2528277ffdff7eff9beda0b8c4c49aaced49 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access,too-many-instance-attributes

import logging
import re
import warnings
from typing import Any, Dict, List, Optional

from azure.ai.ml._restclient.v2022_10_01_preview.models import AssignedUser
from azure.ai.ml._restclient.v2023_08_01_preview.models import ComputeInstance as CIRest
from azure.ai.ml._restclient.v2023_08_01_preview.models import ComputeInstanceProperties
from azure.ai.ml._restclient.v2023_08_01_preview.models import ComputeInstanceSshSettings as CiSShSettings
from azure.ai.ml._restclient.v2023_08_01_preview.models import (
    ComputeResource,
    PersonalComputeInstanceSettings,
    ResourceId,
)
from azure.ai.ml._schema._utils.utils import get_subnet_str
from azure.ai.ml._schema.compute.compute_instance import ComputeInstanceSchema
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, TYPE
from azure.ai.ml.constants._compute import ComputeDefaults, ComputeType
from azure.ai.ml.entities._compute.compute import Compute, NetworkSettings
from azure.ai.ml.entities._credentials import IdentityConfiguration
from azure.ai.ml.entities._mixins import DictMixin
from azure.ai.ml.entities._util import load_from_dict

from ._custom_applications import CustomApplications, validate_custom_applications
from ._image_metadata import ImageMetadata
from ._schedule import ComputeSchedules
from ._setup_scripts import SetupScripts

module_logger = logging.getLogger(__name__)


class ComputeInstanceSshSettings:
    """Credentials for an administrator user account to SSH into the compute node.

    Can only be configured if `ssh_public_access_enabled` is set to true on compute
    resource.

    :param ssh_key_value: The SSH public key of the administrator user account.
    :type ssh_key_value: Optional[str]

    .. admonition:: Example:

        .. literalinclude:: ../samples/ml_samples_compute.py
            :start-after: [START compute_instance_ssh_settings]
            :end-before: [END compute_instance_ssh_settings]
            :language: python
            :dedent: 8
            :caption: Configuring ComputeInstanceSshSettings object.
    """

    def __init__(
        self,
        *,
        ssh_key_value: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        self.ssh_key_value = ssh_key_value
        self._ssh_port: str = kwargs.pop("ssh_port", None)
        self._admin_username: str = kwargs.pop("admin_username", None)

    @property
    def admin_username(self) -> str:
        """The name of the administrator user account which can be used to SSH into nodes.

        :return: The name of the administrator user account.
        :rtype: str
        """
        return self._admin_username

    @property
    def ssh_port(self) -> str:
        """SSH port.

        :return: SSH port.
        :rtype: str
        """
        return self._ssh_port


class AssignedUserConfiguration(DictMixin):
    """Settings to create a compute resource on behalf of another user.

    :param user_tenant_id: Tenant ID of the user to assign the compute target to.
    :type user_tenant_id: str
    :param user_object_id: Object ID of the user to assign the compute target to.
    :type user_object_id: str

    .. admonition:: Example:

        .. literalinclude:: ../samples/ml_samples_compute.py
            :start-after: [START assigned_user_configuration]
            :end-before: [END assigned_user_configuration]
            :language: python
            :dedent: 8
            :caption: Creating an AssignedUserConfiguration.
    """

    def __init__(self, *, user_tenant_id: str, user_object_id: str) -> None:
        self.user_tenant_id = user_tenant_id
        self.user_object_id = user_object_id


class ComputeInstance(Compute):
    """Compute Instance resource.

    :param name: Name of the compute.
    :type name: str
    :param location: The resource location.
    :type location: Optional[str]
    :param description: Description of the resource.
    :type description: Optional[str]
    :param size: Compute size.
    :type size: Optional[str]
    :param tags: A set of tags. Contains resource tags defined as key/value pairs.
    :type tags: Optional[dict[str, str]]
    :param create_on_behalf_of: Configuration to create resource on behalf of another user. Defaults to None.
    :type create_on_behalf_of: Optional[~azure.ai.ml.entities.AssignedUserConfiguration]
    :ivar state: State of the resource.
    :type state: Optional[str]
    :ivar last_operation: The last operation.
    :type last_operation: Optional[Dict[str, str]]
    :ivar applications: Applications associated with the compute instance.
    :type applications: Optional[List[Dict[str, str]]]
    :param network_settings: Network settings for the compute instance.
    :type network_settings: Optional[~azure.ai.ml.entities.NetworkSettings]
    :param ssh_settings: SSH settings for the compute instance.
    :type ssh_settings: Optional[~azure.ai.ml.entities.ComputeInstanceSshSettings]
    :param ssh_public_access_enabled: State of the public SSH port. Defaults to None.
        Possible values are:

        * False - Indicates that the public ssh port is closed on all nodes of the cluster.
        * True - Indicates that the public ssh port is open on all nodes of the cluster.
        * None -Indicates that the public ssh port is closed on all nodes of the cluster if VNet is defined,
            else is open all public nodes. It can be default only during cluster creation time, after
            creation it will be either True or False.

    :type ssh_public_access_enabled: Optional[bool]
    :param schedules: Compute instance schedules. Defaults to None.
    :type schedules: Optional[~azure.ai.ml.entities.ComputeSchedules]
    :param identity: The identities that are associated with the compute cluster.
    :type identity: ~azure.ai.ml.entities.IdentityConfiguration
    :param idle_time_before_shutdown: Deprecated. Use the `idle_time_before_shutdown_minutes` parameter instead.
        Stops compute instance after user defined period of inactivity.
        Time is defined in ISO8601 format. Minimum is 15 minutes, maximum is 3 days.
    :type idle_time_before_shutdown: Optional[str]
    :param idle_time_before_shutdown_minutes: Stops compute instance after a user defined period of
        inactivity in minutes. Minimum is 15 minutes, maximum is 3 days.
    :type idle_time_before_shutdown_minutes: Optional[int]
    :param enable_node_public_ip: Enable or disable node public IP address provisioning. Defaults to True.
        Possible values are:

            * True - Indicates that the compute nodes will have public IPs provisioned.
            * False - Indicates that the compute nodes will have a private endpoint and no public IPs.

    :type enable_node_public_ip: Optional[bool]
    :param setup_scripts: Details of customized scripts to execute for setting up the cluster.
    :type setup_scripts: Optional[~azure.ai.ml.entities.SetupScripts]
    :param custom_applications: List of custom applications and their endpoints for the compute instance.
    :type custom_applications: Optional[List[~azure.ai.ml.entities.CustomApplications]]
    :param enable_sso: Enable or disable single sign-on. Defaults to True.
    :type enable_sso: bool
    :param enable_root_access: Enable or disable root access. Defaults to True.
    :type enable_root_access: bool
    :param release_quota_on_stop: Release quota on stop for the compute instance. Defaults to False.
    :type release_quota_on_stop: bool
    :param enable_os_patching: Enable or disable OS patching for the compute instance. Defaults to False.
    :type enable_os_patching: bool

    .. admonition:: Example:

        .. literalinclude:: ../samples/ml_samples_compute.py
            :start-after: [START compute_instance]
            :end-before: [END compute_instance]
            :language: python
            :dedent: 8
            :caption: Creating a ComputeInstance object.
    """

    def __init__(
        self,
        *,
        name: str,
        description: Optional[str] = None,
        size: Optional[str] = None,
        tags: Optional[dict] = None,
        ssh_public_access_enabled: Optional[bool] = None,
        create_on_behalf_of: Optional[AssignedUserConfiguration] = None,
        network_settings: Optional[NetworkSettings] = None,
        ssh_settings: Optional[ComputeInstanceSshSettings] = None,
        schedules: Optional[ComputeSchedules] = None,
        identity: Optional[IdentityConfiguration] = None,
        idle_time_before_shutdown: Optional[str] = None,
        idle_time_before_shutdown_minutes: Optional[int] = None,
        setup_scripts: Optional[SetupScripts] = None,
        enable_node_public_ip: bool = True,
        custom_applications: Optional[List[CustomApplications]] = None,
        enable_sso: bool = True,
        enable_root_access: bool = True,
        release_quota_on_stop: bool = False,
        enable_os_patching: bool = False,
        **kwargs: Any,
    ) -> None:
        kwargs[TYPE] = ComputeType.COMPUTEINSTANCE
        self._state: str = kwargs.pop("state", None)
        self._last_operation: dict = kwargs.pop("last_operation", None)
        self._os_image_metadata: ImageMetadata = kwargs.pop("os_image_metadata", None)
        self._services: list = kwargs.pop("services", None)
        super().__init__(
            name=name,
            location=kwargs.pop("location", None),
            resource_id=kwargs.pop("resource_id", None),
            description=description,
            tags=tags,
            **kwargs,
        )
        self.size = size
        self.ssh_public_access_enabled = ssh_public_access_enabled
        self.create_on_behalf_of = create_on_behalf_of
        self.network_settings = network_settings
        self.ssh_settings = ssh_settings
        self.schedules = schedules
        self.identity = identity
        self.idle_time_before_shutdown = idle_time_before_shutdown
        self.idle_time_before_shutdown_minutes = idle_time_before_shutdown_minutes
        self.setup_scripts = setup_scripts
        self.enable_node_public_ip = enable_node_public_ip
        self.enable_sso = enable_sso
        self.enable_root_access = enable_root_access
        self.release_quota_on_stop = release_quota_on_stop
        self.enable_os_patching = enable_os_patching
        self.custom_applications = custom_applications
        self.subnet = None

    @property
    def services(self) -> List[Dict[str, str]]:
        """The compute instance's services.

        :return: The compute instance's services.
        :rtype: List[Dict[str, str]]
        """
        return self._services

    @property
    def last_operation(self) -> Dict[str, str]:
        """The last operation.

        :return: The last operation.
        :rtype: str
        """
        return self._last_operation

    @property
    def state(self) -> str:
        """The state of the compute.

        :return: The state of the compute.
        :rtype: str
        """
        return self._state

    @property
    def os_image_metadata(self) -> ImageMetadata:
        """Metadata about the operating system image for this compute instance.

        :return: Operating system image metadata.
        :rtype: ~azure.ai.ml.entities.ImageMetadata
        """
        return self._os_image_metadata

    def _to_rest_object(self) -> ComputeResource:
        if self.network_settings and self.network_settings.subnet:
            subnet_resource = ResourceId(id=self.subnet)
        else:
            subnet_resource = None

        ssh_settings = None
        if self.ssh_public_access_enabled is not None or self.ssh_settings is not None:
            ssh_settings = CiSShSettings()
            ssh_settings.ssh_public_access = "Enabled" if self.ssh_public_access_enabled else "Disabled"
            ssh_settings.admin_public_key = (
                self.ssh_settings.ssh_key_value if self.ssh_settings and self.ssh_settings.ssh_key_value else None
            )

        personal_compute_instance_settings = None
        if self.create_on_behalf_of:
            personal_compute_instance_settings = PersonalComputeInstanceSettings(
                assigned_user=AssignedUser(
                    object_id=self.create_on_behalf_of.user_object_id,
                    tenant_id=self.create_on_behalf_of.user_tenant_id,
                )
            )

        idle_time_before_shutdown = None
        if self.idle_time_before_shutdown_minutes:
            idle_time_before_shutdown = f"PT{self.idle_time_before_shutdown_minutes}M"
        elif self.idle_time_before_shutdown:
            warnings.warn(
                """ The property 'idle_time_before_shutdown' is deprecated.
                Please use'idle_time_before_shutdown_minutes' instead.""",
                DeprecationWarning,
            )
            idle_time_before_shutdown = self.idle_time_before_shutdown

        compute_instance_prop = ComputeInstanceProperties(
            vm_size=self.size if self.size else ComputeDefaults.VMSIZE,
            subnet=subnet_resource,
            ssh_settings=ssh_settings,
            personal_compute_instance_settings=personal_compute_instance_settings,
            idle_time_before_shutdown=idle_time_before_shutdown,
            enable_node_public_ip=self.enable_node_public_ip,
            enable_sso=self.enable_sso,
            enable_root_access=self.enable_root_access,
            release_quota_on_stop=self.release_quota_on_stop,
            enable_os_patching=self.enable_os_patching,
        )
        compute_instance_prop.schedules = self.schedules._to_rest_object() if self.schedules else None
        compute_instance_prop.setup_scripts = self.setup_scripts._to_rest_object() if self.setup_scripts else None
        if self.custom_applications:
            validate_custom_applications(self.custom_applications)
            compute_instance_prop.custom_services = []
            for app in self.custom_applications:
                compute_instance_prop.custom_services.append(app._to_rest_object())
        compute_instance = CIRest(
            description=self.description,
            compute_type=self.type,
            properties=compute_instance_prop,
        )
        return ComputeResource(
            location=self.location,
            properties=compute_instance,
            identity=(self.identity._to_compute_rest_object() if self.identity else None),
            tags=self.tags,
        )

    def _to_dict(self) -> Dict:
        res: dict = ComputeInstanceSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
        return res

    def _set_full_subnet_name(self, subscription_id: str, rg: str) -> None:
        if self.network_settings and (self.network_settings.vnet_name or self.network_settings.subnet):
            self.subnet = get_subnet_str(
                self.network_settings.vnet_name,
                self.network_settings.subnet,
                subscription_id,
                rg,
            )

    @classmethod
    def _load_from_rest(cls, rest_obj: ComputeResource) -> "ComputeInstance":
        prop = rest_obj.properties
        create_on_behalf_of = None
        if prop.properties and prop.properties.personal_compute_instance_settings:
            create_on_behalf_of = AssignedUserConfiguration(
                user_tenant_id=prop.properties.personal_compute_instance_settings.assigned_user.tenant_id,
                user_object_id=prop.properties.personal_compute_instance_settings.assigned_user.object_id,
            )
        ssh_settings = None
        if prop.properties and prop.properties.ssh_settings:
            ssh_settings = ComputeInstanceSshSettings(
                ssh_key_value=prop.properties.ssh_settings.admin_public_key,
                ssh_port=prop.properties.ssh_settings.ssh_port,
                admin_username=prop.properties.ssh_settings.admin_user_name,
            )

        network_settings = None
        if prop.properties and (
            prop.properties.subnet
            or (
                prop.properties.connectivity_endpoints
                and (
                    prop.properties.connectivity_endpoints.private_ip_address
                    or prop.properties.connectivity_endpoints.public_ip_address
                )
            )
        ):
            network_settings = NetworkSettings(
                subnet=prop.properties.subnet.id if prop.properties.subnet else None,
                public_ip_address=(
                    prop.properties.connectivity_endpoints.public_ip_address
                    if prop.properties.connectivity_endpoints
                    and prop.properties.connectivity_endpoints.public_ip_address
                    else None
                ),
                private_ip_address=(
                    prop.properties.connectivity_endpoints.private_ip_address
                    if prop.properties.connectivity_endpoints
                    and prop.properties.connectivity_endpoints.private_ip_address
                    else None
                ),
            )
        os_image_metadata = None
        if prop.properties and prop.properties.os_image_metadata:
            metadata = prop.properties.os_image_metadata
            os_image_metadata = ImageMetadata(
                is_latest_os_image_version=(
                    metadata.is_latest_os_image_version if metadata.is_latest_os_image_version is not None else None
                ),
                current_image_version=metadata.current_image_version if metadata.current_image_version else None,
                latest_image_version=metadata.latest_image_version if metadata.latest_image_version else None,
            )

        idle_time_before_shutdown = None
        idle_time_before_shutdown_minutes = None
        idle_time_before_shutdown_pattern = r"PT([0-9]+)M"
        if prop.properties and prop.properties.idle_time_before_shutdown:
            idle_time_before_shutdown = prop.properties.idle_time_before_shutdown
            idle_time_match = re.match(
                pattern=idle_time_before_shutdown_pattern,
                string=idle_time_before_shutdown,
            )
            idle_time_before_shutdown_minutes = int(idle_time_match[1]) if idle_time_match else None
        custom_applications = None
        if prop.properties and prop.properties.custom_services:
            custom_applications = []
            for app in prop.properties.custom_services:
                custom_applications.append(CustomApplications._from_rest_object(app))
        response = ComputeInstance(
            name=rest_obj.name,
            id=rest_obj.id,
            description=prop.description,
            location=rest_obj.location,
            resource_id=prop.resource_id,
            tags=rest_obj.tags if rest_obj.tags else None,
            provisioning_state=prop.provisioning_state,
            provisioning_errors=(
                prop.provisioning_errors[0].error.code
                if (prop.provisioning_errors and len(prop.provisioning_errors) > 0)
                else None
            ),
            size=prop.properties.vm_size if prop.properties else None,
            state=prop.properties.state if prop.properties else None,
            last_operation=(
                prop.properties.last_operation.as_dict() if prop.properties and prop.properties.last_operation else None
            ),
            services=(
                [app.as_dict() for app in prop.properties.applications]
                if prop.properties and prop.properties.applications
                else None
            ),
            created_on=(
                rest_obj.properties.created_on.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
                if rest_obj.properties and rest_obj.properties.created_on is not None
                else None
            ),
            create_on_behalf_of=create_on_behalf_of,
            network_settings=network_settings,
            ssh_settings=ssh_settings,
            ssh_public_access_enabled=(
                _ssh_public_access_to_bool(prop.properties.ssh_settings.ssh_public_access)
                if (prop.properties and prop.properties.ssh_settings and prop.properties.ssh_settings.ssh_public_access)
                else None
            ),
            schedules=(
                ComputeSchedules._from_rest_object(prop.properties.schedules)
                if prop.properties and prop.properties.schedules and prop.properties.schedules.compute_start_stop
                else None
            ),
            identity=IdentityConfiguration._from_compute_rest_object(rest_obj.identity) if rest_obj.identity else None,
            setup_scripts=(
                SetupScripts._from_rest_object(prop.properties.setup_scripts)
                if prop.properties and prop.properties.setup_scripts
                else None
            ),
            idle_time_before_shutdown=idle_time_before_shutdown,
            idle_time_before_shutdown_minutes=idle_time_before_shutdown_minutes,
            os_image_metadata=os_image_metadata,
            enable_node_public_ip=(
                prop.properties.enable_node_public_ip
                if (prop.properties and prop.properties.enable_node_public_ip is not None)
                else True
            ),
            custom_applications=custom_applications,
            enable_sso=(
                prop.properties.enable_sso if (prop.properties and prop.properties.enable_sso is not None) else True
            ),
            enable_root_access=(
                prop.properties.enable_root_access
                if (prop.properties and prop.properties.enable_root_access is not None)
                else True
            ),
            release_quota_on_stop=(
                prop.properties.release_quota_on_stop
                if (prop.properties and prop.properties.release_quota_on_stop is not None)
                else False
            ),
            enable_os_patching=(
                prop.properties.enable_os_patching
                if (prop.properties and prop.properties.enable_os_patching is not None)
                else False
            ),
        )
        return response

    @classmethod
    def _load_from_dict(cls, data: Dict, context: Dict, **kwargs: Any) -> "ComputeInstance":
        loaded_data = load_from_dict(ComputeInstanceSchema, data, context, **kwargs)
        return ComputeInstance(**loaded_data)


def _ssh_public_access_to_bool(value: str) -> Optional[bool]:
    if value.lower() == "disabled":
        return False
    if value.lower() == "enabled":
        return True
    return None