aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
blob: 86fa893954304382e81cd4e8f8704150a2f663c0 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from typing import List, Union

from marshmallow import Schema

from ..._schema import PathAwareSchema
from ...entities import BatchRetrySettings
from .._schema.component import NodeType
from ..entities import Command


class Parallel(Command):
    """Node of scope components in pipeline with specific run settings."""

    def __init__(self, **kwargs):
        kwargs.pop("type", None)
        super(Parallel, self).__init__(type=NodeType.PARALLEL, **kwargs)
        self._init = True
        self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None)
        self._error_threshold = kwargs.pop("error_threshold", None)
        self._mini_batch_size = kwargs.pop("mini_batch_size", None)
        self._partition_keys = kwargs.pop("partition_keys", None)
        self._logging_level = kwargs.pop("logging_level", None)
        self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings())
        self._init = False

    @property
    def max_concurrency_per_instance(self) -> int:
        """The max parallellism that each compute instance has.

        :return: The max concurrence per compute instance
        :rtype: int
        """
        return self._max_concurrency_per_instance

    @max_concurrency_per_instance.setter
    def max_concurrency_per_instance(self, value: int):
        self._max_concurrency_per_instance = value

    @property
    def error_threshold(self) -> int:
        """The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored
        during processing.

        If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input
        rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all
        failures during processing.

        :return: The error threshold
        :rtype: int
        """
        return self._error_threshold

    @error_threshold.setter
    def error_threshold(self, value: int):
        self._error_threshold = value

    @property
    def mini_batch_size(self) -> int:
        """The number of records to be sent to run() method for each mini-batch.

        :return: The batch size
        :rtype: int
        """
        return self._mini_batch_size

    @mini_batch_size.setter
    def mini_batch_size(self, value: int):
        self._mini_batch_size = value

    @property
    def logging_level(self) -> str:
        """A string of the logging level name.

        :return: The loggin level
        :rtype: str
        """
        return self._logging_level

    @logging_level.setter
    def logging_level(self, value: str):
        self._logging_level = value

    @property
    def retry_settings(self) -> BatchRetrySettings:
        """Parallel job run failed retry.

        :return: The retry settings
        :rtype: BatchRetrySettings
        """
        return self._retry_settings

    @retry_settings.setter
    def retry_settings(self, value: BatchRetrySettings):
        self._retry_settings = value

    @classmethod
    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
        return Command._picked_fields_from_dict_to_rest_object() + [
            "max_concurrency_per_instance",
            "error_threshold",
            "logging_level",
            "retry_settings",
            "mini_batch_size",
        ]

    @classmethod
    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
        from .._schema.command import ParallelSchema

        return ParallelSchema(context=context)