1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
|
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import json
import os
import re
from typing import Any, Dict, List, Optional, Union, cast
from marshmallow import Schema
from azure.ai.ml._restclient.v2022_10_01.models import ComponentVersion
from azure.ai.ml._schema.component.parallel_component import ParallelComponentSchema
from azure.ai.ml.constants._common import COMPONENT_TYPE
from azure.ai.ml.constants._component import NodeType
from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration
from azure.ai.ml.entities._job.parallel.parallel_task import ParallelTask
from azure.ai.ml.entities._job.parallel.parameterized_parallel import ParameterizedParallel
from azure.ai.ml.entities._job.parallel.retry_settings import RetrySettings
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
from ..._schema import PathAwareSchema
from .._util import validate_attribute_type
from .._validation import MutableValidationResult
from .code import ComponentCodeMixin
from .component import Component
class ParallelComponent(
Component, ParameterizedParallel, ComponentCodeMixin
): # pylint: disable=too-many-instance-attributes
"""Parallel component version, used to define a parallel component.
:param name: Name of the component. Defaults to None
:type name: str
:param version: Version of the component. Defaults to None
:type version: str
:param description: Description of the component. Defaults to None
:type description: str
:param tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None
:type tags: dict
:param display_name: Display name of the component. Defaults to None
:type display_name: str
:param retry_settings: parallel component run failed retry. Defaults to None
:type retry_settings: BatchRetrySettings
:param logging_level: A string of the logging level name. Defaults to None
:type logging_level: str
:param max_concurrency_per_instance: The max parallellism that each compute instance has. Defaults to None
:type max_concurrency_per_instance: int
:param error_threshold: The number of item processing failures should be ignored. Defaults to None
:type error_threshold: int
:param mini_batch_error_threshold: The number of mini batch processing failures should be ignored. Defaults to None
:type mini_batch_error_threshold: int
:param task: The parallel task. Defaults to None
:type task: ParallelTask
:param mini_batch_size: For FileDataset input, this field is the number of files a user script can process
in one run() call. For TabularDataset input, this field is the approximate size of data the user script
can process in one run() call. Example values are 1024, 1024KB, 10MB, and 1GB.
(optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
through PipelineParameter.
:type mini_batch_size: str
:param partition_keys: The keys used to partition dataset into mini-batches. Defaults to None
If specified, the data with the same key will be partitioned into the same mini-batch.
If both partition_keys and mini_batch_size are specified, partition_keys will take effect.
The input(s) must be partitioned dataset(s),
and the partition_keys must be a subset of the keys of every input dataset for this to work.
:type partition_keys: list
:param input_data: The input data. Defaults to None
:type input_data: str
:param resources: Compute Resource configuration for the component. Defaults to None
:type resources: Union[dict, ~azure.ai.ml.entities.JobResourceConfiguration]
:param inputs: Inputs of the component. Defaults to None
:type inputs: dict
:param outputs: Outputs of the component. Defaults to None
:type outputs: dict
:param code: promoted property from task.code
:type code: str
:param instance_count: promoted property from resources.instance_count. Defaults to None
:type instance_count: int
:param is_deterministic: Whether the parallel component is deterministic. Defaults to True
:type is_deterministic: bool
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if ParallelComponent cannot be successfully validated.
Details will be provided in the error message.
"""
def __init__( # pylint: disable=too-many-locals
self,
*,
name: Optional[str] = None,
version: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
display_name: Optional[str] = None,
retry_settings: Optional[RetrySettings] = None,
logging_level: Optional[str] = None,
max_concurrency_per_instance: Optional[int] = None,
error_threshold: Optional[int] = None,
mini_batch_error_threshold: Optional[int] = None,
task: Optional[ParallelTask] = None,
mini_batch_size: Optional[str] = None,
partition_keys: Optional[List] = None,
input_data: Optional[str] = None,
resources: Optional[JobResourceConfiguration] = None,
inputs: Optional[Dict] = None,
outputs: Optional[Dict] = None,
code: Optional[str] = None, # promoted property from task.code
instance_count: Optional[int] = None, # promoted property from resources.instance_count
is_deterministic: bool = True,
**kwargs: Any,
):
# validate init params are valid type
validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
kwargs[COMPONENT_TYPE] = NodeType.PARALLEL
super().__init__(
name=name,
version=version,
description=description,
tags=tags,
display_name=display_name,
inputs=inputs,
outputs=outputs,
is_deterministic=is_deterministic,
**kwargs,
)
# No validation on value passed here because in pipeline job, required code&environment maybe absent
# and fill in later with job defaults.
self.task = task
self.mini_batch_size: int = 0
self.partition_keys = partition_keys
self.input_data = input_data
self.retry_settings = retry_settings
self.logging_level = logging_level
self.max_concurrency_per_instance = max_concurrency_per_instance
self.error_threshold = error_threshold
self.mini_batch_error_threshold = mini_batch_error_threshold
self.resources = resources
# check mutual exclusivity of promoted properties
if self.resources is not None and instance_count is not None:
msg = "instance_count and resources are mutually exclusive"
raise ValidationException(
message=msg,
target=ErrorTarget.COMPONENT,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
)
self.instance_count = instance_count
self.code = code
if mini_batch_size is not None:
# Convert str to int.
pattern = re.compile(r"^\d+([kKmMgG][bB])*$")
if not pattern.match(mini_batch_size):
raise ValueError(r"Parameter mini_batch_size must follow regex rule ^\d+([kKmMgG][bB])*$")
try:
self.mini_batch_size = int(mini_batch_size)
except ValueError as e:
unit = mini_batch_size[-2:].lower()
if unit == "kb":
self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024
elif unit == "mb":
self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024
elif unit == "gb":
self.mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024 * 1024
else:
raise ValueError("mini_batch_size unit must be kb, mb or gb") from e
@property
def instance_count(self) -> Optional[int]:
"""Return value of promoted property resources.instance_count.
:return: Value of resources.instance_count.
:rtype: Optional[int]
"""
return self.resources.instance_count if self.resources and not isinstance(self.resources, dict) else None
@instance_count.setter
def instance_count(self, value: int) -> None:
"""Set the value of the promoted property resources.instance_count.
:param value: The value to set for resources.instance_count.
:type value: int
"""
if not value:
return
if not self.resources:
self.resources = JobResourceConfiguration(instance_count=value)
else:
if not isinstance(self.resources, dict):
self.resources.instance_count = value
@property
def code(self) -> Optional[str]:
"""Return value of promoted property task.code, which is a local or
remote path pointing at source code.
:return: Value of task.code.
:rtype: Optional[str]
"""
return self.task.code if self.task else None
@code.setter
def code(self, value: str) -> None:
"""Set the value of the promoted property task.code.
:param value: The value to set for task.code.
:type value: str
"""
if not value:
return
if not self.task:
self.task = ParallelTask(code=value)
else:
self.task.code = value
def _to_ordered_dict_for_yaml_dump(self) -> Dict:
"""Dump the component content into a sorted yaml string.
:return: The ordered dict
:rtype: Dict
"""
obj: dict = super()._to_ordered_dict_for_yaml_dump()
# dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
if self.code and isinstance(self.code, str):
obj["task"]["code"] = self.code
return obj
@property
def environment(self) -> Optional[str]:
"""Return value of promoted property task.environment, indicate the
environment that training job will run in.
:return: Value of task.environment.
:rtype: Optional[Environment, str]
"""
if self.task:
return cast(Optional[str], self.task.environment)
return None
@environment.setter
def environment(self, value: str) -> None:
"""Set the value of the promoted property task.environment.
:param value: The value to set for task.environment.
:type value: str
"""
if not value:
return
if not self.task:
self.task = ParallelTask(environment=value)
else:
self.task.environment = value
def _customized_validate(self) -> MutableValidationResult:
validation_result = super()._customized_validate()
self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
return validation_result
@classmethod
def _attr_type_map(cls) -> dict:
return {
"retry_settings": (dict, RetrySettings),
"task": (dict, ParallelTask),
"logging_level": str,
"max_concurrency_per_instance": int,
"input_data": str,
"error_threshold": int,
"mini_batch_error_threshold": int,
"code": (str, os.PathLike),
"resources": (dict, JobResourceConfiguration),
}
def _to_rest_object(self) -> ComponentVersion:
rest_object = super()._to_rest_object()
# schema required list while backend accept json string
if self.partition_keys:
rest_object.properties.component_spec["partition_keys"] = json.dumps(self.partition_keys)
return rest_object
@classmethod
def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
# schema required list while backend accept json string
# update rest obj as it will be
partition_keys = obj.properties.component_spec.get("partition_keys", None)
if partition_keys:
obj.properties.component_spec["partition_keys"] = json.loads(partition_keys)
res: dict = super()._from_rest_object_to_init_params(obj)
return res
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
return ParallelComponentSchema(context=context)
def __str__(self) -> str:
try:
toYaml: str = self._to_yaml()
return toYaml
except BaseException: # pylint: disable=W0718
toStr: str = super(ParallelComponent, self).__str__()
return toStr
|