1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from typing import List, Union
from marshmallow import Schema
from ..._schema import PathAwareSchema
from ...entities import BatchRetrySettings
from .._schema.component import NodeType
from ..entities import Command
class Parallel(Command):
"""Node of scope components in pipeline with specific run settings."""
def __init__(self, **kwargs):
kwargs.pop("type", None)
super(Parallel, self).__init__(type=NodeType.PARALLEL, **kwargs)
self._init = True
self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None)
self._error_threshold = kwargs.pop("error_threshold", None)
self._mini_batch_size = kwargs.pop("mini_batch_size", None)
self._partition_keys = kwargs.pop("partition_keys", None)
self._logging_level = kwargs.pop("logging_level", None)
self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings())
self._init = False
@property
def max_concurrency_per_instance(self) -> int:
"""The max parallellism that each compute instance has.
:return: The max concurrence per compute instance
:rtype: int
"""
return self._max_concurrency_per_instance
@max_concurrency_per_instance.setter
def max_concurrency_per_instance(self, value: int):
self._max_concurrency_per_instance = value
@property
def error_threshold(self) -> int:
"""The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored
during processing.
If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input
rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all
failures during processing.
:return: The error threshold
:rtype: int
"""
return self._error_threshold
@error_threshold.setter
def error_threshold(self, value: int):
self._error_threshold = value
@property
def mini_batch_size(self) -> int:
"""The number of records to be sent to run() method for each mini-batch.
:return: The batch size
:rtype: int
"""
return self._mini_batch_size
@mini_batch_size.setter
def mini_batch_size(self, value: int):
self._mini_batch_size = value
@property
def logging_level(self) -> str:
"""A string of the logging level name.
:return: The loggin level
:rtype: str
"""
return self._logging_level
@logging_level.setter
def logging_level(self, value: str):
self._logging_level = value
@property
def retry_settings(self) -> BatchRetrySettings:
"""Parallel job run failed retry.
:return: The retry settings
:rtype: BatchRetrySettings
"""
return self._retry_settings
@retry_settings.setter
def retry_settings(self, value: BatchRetrySettings):
self._retry_settings = value
@classmethod
def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
return Command._picked_fields_from_dict_to_rest_object() + [
"max_concurrency_per_instance",
"error_threshold",
"logging_level",
"retry_settings",
"mini_batch_size",
]
@classmethod
def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
from .._schema.command import ParallelSchema
return ParallelSchema(context=context)
|