about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
new file mode 100644
index 00000000..86fa8939
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
@@ -0,0 +1,114 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import List, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from ...entities import BatchRetrySettings
+from .._schema.component import NodeType
+from ..entities import Command
+
+
+class Parallel(Command):
+    """Node of scope components in pipeline with specific run settings."""
+
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Parallel, self).__init__(type=NodeType.PARALLEL, **kwargs)
+        self._init = True
+        self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None)
+        self._error_threshold = kwargs.pop("error_threshold", None)
+        self._mini_batch_size = kwargs.pop("mini_batch_size", None)
+        self._partition_keys = kwargs.pop("partition_keys", None)
+        self._logging_level = kwargs.pop("logging_level", None)
+        self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings())
+        self._init = False
+
+    @property
+    def max_concurrency_per_instance(self) -> int:
+        """The max parallellism that each compute instance has.
+
+        :return: The max concurrence per compute instance
+        :rtype: int
+        """
+        return self._max_concurrency_per_instance
+
+    @max_concurrency_per_instance.setter
+    def max_concurrency_per_instance(self, value: int):
+        self._max_concurrency_per_instance = value
+
+    @property
+    def error_threshold(self) -> int:
+        """The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored
+        during processing.
+
+        If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input
+        rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all
+        failures during processing.
+
+        :return: The error threshold
+        :rtype: int
+        """
+        return self._error_threshold
+
+    @error_threshold.setter
+    def error_threshold(self, value: int):
+        self._error_threshold = value
+
+    @property
+    def mini_batch_size(self) -> int:
+        """The number of records to be sent to run() method for each mini-batch.
+
+        :return: The batch size
+        :rtype: int
+        """
+        return self._mini_batch_size
+
+    @mini_batch_size.setter
+    def mini_batch_size(self, value: int):
+        self._mini_batch_size = value
+
+    @property
+    def logging_level(self) -> str:
+        """A string of the logging level name.
+
+        :return: The loggin level
+        :rtype: str
+        """
+        return self._logging_level
+
+    @logging_level.setter
+    def logging_level(self, value: str):
+        self._logging_level = value
+
+    @property
+    def retry_settings(self) -> BatchRetrySettings:
+        """Parallel job run failed retry.
+
+        :return: The retry settings
+        :rtype: BatchRetrySettings
+        """
+        return self._retry_settings
+
+    @retry_settings.setter
+    def retry_settings(self, value: BatchRetrySettings):
+        self._retry_settings = value
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return Command._picked_fields_from_dict_to_rest_object() + [
+            "max_concurrency_per_instance",
+            "error_threshold",
+            "logging_level",
+            "retry_settings",
+            "mini_batch_size",
+        ]
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.command import ParallelSchema
+
+        return ParallelSchema(context=context)