diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py new file mode 100644 index 00000000..86fa8939 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py @@ -0,0 +1,114 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import List, Union + +from marshmallow import Schema + +from ..._schema import PathAwareSchema +from ...entities import BatchRetrySettings +from .._schema.component import NodeType +from ..entities import Command + + +class Parallel(Command): + """Node of scope components in pipeline with specific run settings.""" + + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Parallel, self).__init__(type=NodeType.PARALLEL, **kwargs) + self._init = True + self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None) + self._error_threshold = kwargs.pop("error_threshold", None) + self._mini_batch_size = kwargs.pop("mini_batch_size", None) + self._partition_keys = kwargs.pop("partition_keys", None) + self._logging_level = kwargs.pop("logging_level", None) + self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings()) + self._init = False + + @property + def max_concurrency_per_instance(self) -> int: + """The max parallellism that each compute instance has. + + :return: The max concurrence per compute instance + :rtype: int + """ + return self._max_concurrency_per_instance + + @max_concurrency_per_instance.setter + def max_concurrency_per_instance(self, value: int): + self._max_concurrency_per_instance = value + + @property + def error_threshold(self) -> int: + """The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored + during processing. + + If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input + rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all + failures during processing. + + :return: The error threshold + :rtype: int + """ + return self._error_threshold + + @error_threshold.setter + def error_threshold(self, value: int): + self._error_threshold = value + + @property + def mini_batch_size(self) -> int: + """The number of records to be sent to run() method for each mini-batch. + + :return: The batch size + :rtype: int + """ + return self._mini_batch_size + + @mini_batch_size.setter + def mini_batch_size(self, value: int): + self._mini_batch_size = value + + @property + def logging_level(self) -> str: + """A string of the logging level name. + + :return: The loggin level + :rtype: str + """ + return self._logging_level + + @logging_level.setter + def logging_level(self, value: str): + self._logging_level = value + + @property + def retry_settings(self) -> BatchRetrySettings: + """Parallel job run failed retry. + + :return: The retry settings + :rtype: BatchRetrySettings + """ + return self._retry_settings + + @retry_settings.setter + def retry_settings(self, value: BatchRetrySettings): + self._retry_settings = value + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return Command._picked_fields_from_dict_to_rest_object() + [ + "max_concurrency_per_instance", + "error_threshold", + "logging_level", + "retry_settings", + "mini_batch_size", + ] + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.command import ParallelSchema + + return ParallelSchema(context=context) |