1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
|
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
from abc import ABC
from typing import Any, Optional, Union
from azure.ai.ml._restclient.v2023_04_01_preview.models import CommandJobLimits as RestCommandJobLimits
from azure.ai.ml._restclient.v2023_08_01_preview.models import SweepJobLimits as RestSweepJobLimits
from azure.ai.ml._utils.utils import from_iso_duration_format, is_data_binding_expression, to_iso_duration_format
from azure.ai.ml.constants import JobType
from azure.ai.ml.entities._mixins import RestTranslatableMixin
module_logger = logging.getLogger(__name__)
class JobLimits(RestTranslatableMixin, ABC):
"""Base class for Job limits.
This class should not be instantiated directly. Instead, one of its child classes should be used.
"""
def __init__(
self,
) -> None:
self.type: Any = None
def __eq__(self, other: Any) -> bool:
if not isinstance(other, JobLimits):
return NotImplemented
res: bool = self._to_rest_object() == other._to_rest_object()
return res
class CommandJobLimits(JobLimits):
"""Limits for Command Jobs.
:keyword timeout: The maximum run duration, in seconds, after which the job will be cancelled.
:paramtype timeout: Optional[Union[int, str]]
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_command_configurations.py
:start-after: [START command_job_definition]
:end-before: [END command_job_definition]
:language: python
:dedent: 8
:caption: Configuring a CommandJob with CommandJobLimits.
"""
def __init__(self, *, timeout: Optional[Union[int, str]] = None) -> None:
super().__init__()
self.type = JobType.COMMAND
self.timeout = timeout
def _to_rest_object(self) -> RestCommandJobLimits:
if is_data_binding_expression(self.timeout):
return RestCommandJobLimits(timeout=self.timeout)
return RestCommandJobLimits(timeout=to_iso_duration_format(self.timeout))
@classmethod
def _from_rest_object(cls, obj: Union[RestCommandJobLimits, dict]) -> Optional["CommandJobLimits"]:
if not obj:
return None
if isinstance(obj, dict):
timeout_value = obj.get("timeout", None)
# if timeout value is a binding string
if is_data_binding_expression(timeout_value):
return cls(timeout=timeout_value)
# if response timeout is a normal iso date string
obj = RestCommandJobLimits.from_dict(obj)
return cls(timeout=from_iso_duration_format(obj.timeout))
class SweepJobLimits(JobLimits):
"""Limits for Sweep Jobs.
:keyword max_concurrent_trials: The maximum number of concurrent trials for the Sweep Job.
:paramtype max_concurrent_trials: Optional[int]
:keyword max_total_trials: The maximum number of total trials for the Sweep Job.
:paramtype max_total_trials: Optional[int]
:keyword timeout: The maximum run duration, in seconds, after which the job will be cancelled.
:paramtype timeout: Optional[int]
:keyword trial_timeout: The timeout value, in seconds, for each Sweep Job trial.
:paramtype trial_timeout: Optional[int]
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_sweep_configurations.py
:start-after: [START configure_sweep_job_bayesian_sampling_algorithm]
:end-before: [END configure_sweep_job_bayesian_sampling_algorithm]
:language: python
:dedent: 8
:caption: Assigning limits to a SweepJob
"""
def __init__(
self,
*,
max_concurrent_trials: Optional[int] = None,
max_total_trials: Optional[int] = None,
timeout: Optional[int] = None,
trial_timeout: Optional[Union[int, str]] = None,
) -> None:
super().__init__()
self.type = JobType.SWEEP
self.max_concurrent_trials = max_concurrent_trials
self.max_total_trials = max_total_trials
self._timeout = _get_floored_timeout(timeout)
self._trial_timeout = _get_floored_timeout(trial_timeout)
@property
def timeout(self) -> Optional[Union[int, str]]:
"""The maximum run duration, in seconds, after which the job will be cancelled.
:return: The maximum run duration, in seconds, after which the job will be cancelled.
:rtype: int
"""
return self._timeout
@timeout.setter
def timeout(self, value: int) -> None:
"""Sets the maximum run duration.
:param value: The maximum run duration, in seconds, after which the job will be cancelled.
:type value: int
"""
self._timeout = _get_floored_timeout(value)
@property
def trial_timeout(self) -> Optional[Union[int, str]]:
"""The timeout value, in seconds, for each Sweep Job trial.
:return: The timeout value, in seconds, for each Sweep Job trial.
:rtype: int
"""
return self._trial_timeout
@trial_timeout.setter
def trial_timeout(self, value: int) -> None:
"""Sets the timeout value for each Sweep Job trial.
:param value: The timeout value, in seconds, for each Sweep Job trial.
:type value: int
"""
self._trial_timeout = _get_floored_timeout(value)
def _to_rest_object(self) -> RestSweepJobLimits:
return RestSweepJobLimits(
max_concurrent_trials=self.max_concurrent_trials,
max_total_trials=self.max_total_trials,
timeout=to_iso_duration_format(self.timeout),
trial_timeout=to_iso_duration_format(self.trial_timeout),
)
@classmethod
def _from_rest_object(cls, obj: RestSweepJobLimits) -> Optional["SweepJobLimits"]:
if not obj:
return None
return cls(
max_concurrent_trials=obj.max_concurrent_trials,
max_total_trials=obj.max_total_trials,
timeout=from_iso_duration_format(obj.timeout),
trial_timeout=from_iso_duration_format(obj.trial_timeout),
)
def _get_floored_timeout(value: Optional[Union[int, str]]) -> Optional[Union[int, str]]:
# Bug 1335978: Service rounds durations less than 60 seconds to 60 days.
# If duration is non-0 and less than 60, set to 60.
if isinstance(value, int):
return value if not value or value > 60 else 60
return None
class DoWhileJobLimits(JobLimits):
"""DoWhile Job limit class.
:keyword max_iteration_count: The maximum number of iterations for the DoWhile Job.
:paramtype max_iteration_count: Optional[int]
"""
def __init__(
self, # pylint: disable=unused-argument
*,
max_iteration_count: Optional[int] = None,
**kwargs: Any,
) -> None:
super().__init__()
self._max_iteration_count = max_iteration_count
@property
def max_iteration_count(self) -> Optional[int]:
"""The maximum number of iterations for the DoWhile Job.
:rtype: int
"""
return self._max_iteration_count
|