aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
new file mode 100644
index 00000000..86fa8939
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
@@ -0,0 +1,114 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import List, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from ...entities import BatchRetrySettings
+from .._schema.component import NodeType
+from ..entities import Command
+
+
+class Parallel(Command):
+ """Node of scope components in pipeline with specific run settings."""
+
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(Parallel, self).__init__(type=NodeType.PARALLEL, **kwargs)
+ self._init = True
+ self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None)
+ self._error_threshold = kwargs.pop("error_threshold", None)
+ self._mini_batch_size = kwargs.pop("mini_batch_size", None)
+ self._partition_keys = kwargs.pop("partition_keys", None)
+ self._logging_level = kwargs.pop("logging_level", None)
+ self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings())
+ self._init = False
+
+ @property
+ def max_concurrency_per_instance(self) -> int:
+ """The max parallellism that each compute instance has.
+
+ :return: The max concurrence per compute instance
+ :rtype: int
+ """
+ return self._max_concurrency_per_instance
+
+ @max_concurrency_per_instance.setter
+ def max_concurrency_per_instance(self, value: int):
+ self._max_concurrency_per_instance = value
+
+ @property
+ def error_threshold(self) -> int:
+ """The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored
+ during processing.
+
+ If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input
+ rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all
+ failures during processing.
+
+ :return: The error threshold
+ :rtype: int
+ """
+ return self._error_threshold
+
+ @error_threshold.setter
+ def error_threshold(self, value: int):
+ self._error_threshold = value
+
+ @property
+ def mini_batch_size(self) -> int:
+ """The number of records to be sent to run() method for each mini-batch.
+
+ :return: The batch size
+ :rtype: int
+ """
+ return self._mini_batch_size
+
+ @mini_batch_size.setter
+ def mini_batch_size(self, value: int):
+ self._mini_batch_size = value
+
+ @property
+ def logging_level(self) -> str:
+ """A string of the logging level name.
+
+ :return: The loggin level
+ :rtype: str
+ """
+ return self._logging_level
+
+ @logging_level.setter
+ def logging_level(self, value: str):
+ self._logging_level = value
+
+ @property
+ def retry_settings(self) -> BatchRetrySettings:
+ """Parallel job run failed retry.
+
+ :return: The retry settings
+ :rtype: BatchRetrySettings
+ """
+ return self._retry_settings
+
+ @retry_settings.setter
+ def retry_settings(self, value: BatchRetrySettings):
+ self._retry_settings = value
+
+ @classmethod
+ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+ return Command._picked_fields_from_dict_to_rest_object() + [
+ "max_concurrency_per_instance",
+ "error_threshold",
+ "logging_level",
+ "retry_settings",
+ "mini_batch_size",
+ ]
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ from .._schema.command import ParallelSchema
+
+ return ParallelSchema(context=context)