1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
|
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from pathlib import Path
from typing import Any, Dict, NoReturn, Optional, Union, cast
from marshmallow import Schema
from azure.ai.ml._schema.component.data_transfer_component import (
DataTransferCopyComponentSchema,
DataTransferExportComponentSchema,
DataTransferImportComponentSchema,
)
from azure.ai.ml._utils._experimental import experimental
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, COMPONENT_TYPE, AssetTypes
from azure.ai.ml.constants._component import DataTransferTaskType, ExternalDataType, NodeType
from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
from azure.ai.ml.entities._inputs_outputs.output import Output
from azure.ai.ml.entities._validation.core import MutableValidationResult
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
from ..._schema import PathAwareSchema
from .._util import convert_ordered_dict_to_dict, validate_attribute_type
from .component import Component
class DataTransferComponent(Component):
"""DataTransfer component version, used to define a data transfer component.
:param task: Task type in the data transfer component. Possible values are "copy_data",
"import_data", and "export_data".
:type task: str
:param inputs: Mapping of input data bindings used in the job.
:type inputs: dict
:param outputs: Mapping of output data bindings used in the job.
:type outputs: dict
:param kwargs: Additional parameters for the data transfer component.
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
Details will be provided in the error message.
"""
def __init__(
self,
*,
task: Optional[str] = None,
inputs: Optional[Dict] = None,
outputs: Optional[Dict] = None,
**kwargs: Any,
) -> None:
# validate init params are valid type
validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
kwargs[COMPONENT_TYPE] = NodeType.DATA_TRANSFER
# Set default base path
if BASE_PATH_CONTEXT_KEY not in kwargs:
kwargs[BASE_PATH_CONTEXT_KEY] = Path(".")
super().__init__(
inputs=inputs,
outputs=outputs,
**kwargs,
)
self._task = task
@classmethod
def _attr_type_map(cls) -> dict:
return {}
@property
def task(self) -> Optional[str]:
"""Task type of the component.
:return: Task type of the component.
:rtype: str
"""
return self._task
def _to_dict(self) -> Dict:
return cast(
dict,
convert_ordered_dict_to_dict({**self._other_parameter, **super(DataTransferComponent, self)._to_dict()}),
)
def __str__(self) -> str:
try:
_toYaml: str = self._to_yaml()
return _toYaml
except BaseException: # pylint: disable=W0718
_toStr: str = super(DataTransferComponent, self).__str__()
return _toStr
@classmethod
def _build_source_sink(cls, io_dict: Union[Dict, Database, FileSystem]) -> Union[Database, FileSystem]:
component_io: Union[Database, FileSystem] = Database()
if isinstance(io_dict, Database):
component_io = Database()
elif isinstance(io_dict, FileSystem):
component_io = FileSystem()
else:
if isinstance(io_dict, dict):
data_type = io_dict.pop("type", None)
if data_type == ExternalDataType.DATABASE:
component_io = Database()
elif data_type == ExternalDataType.FILE_SYSTEM:
component_io = FileSystem()
else:
msg = "Type in source or sink only support {} and {}, currently got {}."
raise ValidationException(
message=msg.format(
ExternalDataType.DATABASE,
ExternalDataType.FILE_SYSTEM,
data_type,
),
no_personal_data_message=msg.format(
ExternalDataType.DATABASE,
ExternalDataType.FILE_SYSTEM,
"data_type",
),
target=ErrorTarget.COMPONENT,
error_category=ErrorCategory.USER_ERROR,
error_type=ValidationErrorType.INVALID_VALUE,
)
else:
msg = "Source or sink only support dict, Database and FileSystem"
raise ValidationException(
message=msg,
no_personal_data_message=msg,
target=ErrorTarget.COMPONENT,
error_category=ErrorCategory.USER_ERROR,
error_type=ValidationErrorType.INVALID_VALUE,
)
return component_io
@experimental
class DataTransferCopyComponent(DataTransferComponent):
"""DataTransfer copy component version, used to define a data transfer copy component.
:param data_copy_mode: Data copy mode in the copy task.
Possible values are "merge_with_overwrite" and "fail_if_conflict".
:type data_copy_mode: str
:param inputs: Mapping of input data bindings used in the job.
:type inputs: dict
:param outputs: Mapping of output data bindings used in the job.
:type outputs: dict
:param kwargs: Additional parameters for the data transfer copy component.
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
Details will be provided in the error message.
"""
def __init__(
self,
*,
data_copy_mode: Optional[str] = None,
inputs: Optional[Dict] = None,
outputs: Optional[Dict] = None,
**kwargs: Any,
) -> None:
kwargs["task"] = DataTransferTaskType.COPY_DATA
super().__init__(
inputs=inputs,
outputs=outputs,
**kwargs,
)
self._data_copy_mode = data_copy_mode
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
return DataTransferCopyComponentSchema(context=context)
@property
def data_copy_mode(self) -> Optional[str]:
"""Data copy mode of the component.
:return: Data copy mode of the component.
:rtype: str
"""
return self._data_copy_mode
def _customized_validate(self) -> MutableValidationResult:
validation_result = super(DataTransferCopyComponent, self)._customized_validate()
validation_result.merge_with(self._validate_input_output_mapping())
return validation_result
def _validate_input_output_mapping(self) -> MutableValidationResult:
validation_result = self._create_empty_validation_result()
inputs_count = len(self.inputs)
outputs_count = len(self.outputs)
if outputs_count != 1:
msg = "Only support single output in {}, but there're {} outputs."
validation_result.append_error(
message=msg.format(DataTransferTaskType.COPY_DATA, outputs_count),
yaml_path="outputs",
)
else:
input_type = None
output_type = None
if inputs_count == 1:
for _, input_data in self.inputs.items():
input_type = input_data.type
for _, output_data in self.outputs.items():
output_type = output_data.type
if input_type is None or output_type is None or input_type != output_type:
msg = "Input type {} doesn't exactly match with output type {} in task {}"
validation_result.append_error(
message=msg.format(input_type, output_type, DataTransferTaskType.COPY_DATA),
yaml_path="outputs",
)
elif inputs_count > 1:
for _, output_data in self.outputs.items():
output_type = output_data.type
if output_type is None or output_type != AssetTypes.URI_FOLDER:
msg = "output type {} need to be {} in task {}"
validation_result.append_error(
message=msg.format(
output_type,
AssetTypes.URI_FOLDER,
DataTransferTaskType.COPY_DATA,
),
yaml_path="outputs",
)
else:
msg = "Inputs must be set in task {}."
validation_result.append_error(
message=msg.format(DataTransferTaskType.COPY_DATA),
yaml_path="inputs",
)
return validation_result
@experimental
class DataTransferImportComponent(DataTransferComponent):
"""DataTransfer import component version, used to define a data transfer import component.
:param source: The data source of the file system or database.
:type source: dict
:param outputs: Mapping of output data bindings used in the job.
Default value is an output port with the key "sink" and the type "mltable".
:type outputs: dict
:param kwargs: Additional parameters for the data transfer import component.
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
Details will be provided in the error message.
"""
def __init__(
self,
*,
source: Optional[Dict] = None,
outputs: Optional[Dict] = None,
**kwargs: Any,
) -> None:
outputs = outputs or {"sink": Output(type=AssetTypes.MLTABLE)}
kwargs["task"] = DataTransferTaskType.IMPORT_DATA
super().__init__(
outputs=outputs,
**kwargs,
)
source = source if source else {}
self.source = self._build_source_sink(source)
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
return DataTransferImportComponentSchema(context=context)
# pylint: disable-next=docstring-missing-param
def __call__(self, *args: Any, **kwargs: Any) -> NoReturn:
"""Call ComponentVersion as a function and get a Component object."""
msg = "DataTransfer component is not callable for import task."
raise ValidationException(
message=msg,
no_personal_data_message=msg,
target=ErrorTarget.COMPONENT,
error_category=ErrorCategory.USER_ERROR,
)
@experimental
class DataTransferExportComponent(DataTransferComponent):
"""DataTransfer export component version, used to define a data transfer export component.
:param sink: The sink of external data and databases.
:type sink: Union[Dict, Database, FileSystem]
:param inputs: Mapping of input data bindings used in the job.
:type inputs: dict
:param kwargs: Additional parameters for the data transfer export component.
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if the component cannot be successfully validated.
Details will be provided in the error message.
"""
def __init__(
self,
*,
inputs: Optional[Dict] = None,
sink: Optional[Dict] = None,
**kwargs: Any,
) -> None:
kwargs["task"] = DataTransferTaskType.EXPORT_DATA
super().__init__(
inputs=inputs,
**kwargs,
)
sink = sink if sink else {}
self.sink = self._build_source_sink(sink)
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
return DataTransferExportComponentSchema(context=context)
# pylint: disable-next=docstring-missing-param
def __call__(self, *args: Any, **kwargs: Any) -> NoReturn:
"""Call ComponentVersion as a function and get a Component object."""
msg = "DataTransfer component is not callable for export task."
raise ValidationException(
message=msg,
no_personal_data_message=msg,
target=ErrorTarget.COMPONENT,
error_category=ErrorCategory.USER_ERROR,
)
|