about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py210
1 files changed, 210 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py
new file mode 100644
index 00000000..d3fdf9dc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py
@@ -0,0 +1,210 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+import re
+from typing import Any
+
+from azure.ai.ml.constants import InputOutputModes
+from azure.ai.ml.constants._component import ComponentJobConstants
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.pipeline._io import NodeInput, NodeOutput
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
+
+
+def _validate_spark_configurations(obj: Any) -> None:
+    # skip validation when component of node is from remote
+    if hasattr(obj, "component") and isinstance(obj.component, str):
+        return
+    if obj.dynamic_allocation_enabled in ["True", "true", True]:
+        if (
+            obj.driver_cores is None
+            or obj.driver_memory is None
+            or obj.executor_cores is None
+            or obj.executor_memory is None
+        ):
+            msg = (
+                "spark.driver.cores, spark.driver.memory, spark.executor.cores and spark.executor.memory are "
+                "mandatory fields."
+            )
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        if obj.dynamic_allocation_min_executors is None or obj.dynamic_allocation_max_executors is None:
+            msg = (
+                "spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors are required "
+                "when dynamic allocation is enabled."
+            )
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        if not (
+            obj.dynamic_allocation_min_executors > 0
+            and obj.dynamic_allocation_min_executors <= obj.dynamic_allocation_max_executors
+        ):
+            msg = (
+                "Dynamic min executors should be bigger than 0 and min executors should be equal or less than "
+                "max executors."
+            )
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        if obj.executor_instances and (
+            obj.executor_instances > obj.dynamic_allocation_max_executors
+            or obj.executor_instances < obj.dynamic_allocation_min_executors
+        ):
+            msg = (
+                "Executor instances must be a valid non-negative integer and must be between "
+                "spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors"
+            )
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+    else:
+        if (
+            obj.driver_cores is None
+            or obj.driver_memory is None
+            or obj.executor_cores is None
+            or obj.executor_memory is None
+            or obj.executor_instances is None
+        ):
+            msg = (
+                "spark.driver.cores, spark.driver.memory, spark.executor.cores, spark.executor.memory and "
+                "spark.executor.instances are mandatory fields."
+            )
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        if obj.dynamic_allocation_min_executors is not None or obj.dynamic_allocation_max_executors is not None:
+            msg = "Should not specify min or max executors when dynamic allocation is disabled."
+            raise ValidationException(
+                message=msg,
+                no_personal_data_message=msg,
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+
+def _validate_compute_or_resources(compute: Any, resources: Any) -> None:
+    # if resources is set, then ensure it is valid before
+    # checking mutual exclusiveness against compute existence
+    if compute is None and resources is None:
+        msg = "One of either compute or resources must be specified for Spark job"
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.SPARK_JOB,
+            error_category=ErrorCategory.USER_ERROR,
+        )
+    if compute and resources:
+        msg = "Only one of either compute or resources may be specified for Spark job"
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.SPARK_JOB,
+            error_category=ErrorCategory.USER_ERROR,
+        )
+
+
+# Only "direct" mode is supported for spark job inputs and outputs
+# pylint: disable=no-else-raise, too-many-boolean-expressions
+def _validate_input_output_mode(inputs: Any, outputs: Any) -> None:
+    for input_name, input_value in inputs.items():
+        if isinstance(input_value, Input) and input_value.mode != InputOutputModes.DIRECT:
+            # For standalone job input
+            msg = "Input '{}' is using '{}' mode, only '{}' is supported for Spark job"
+            raise ValidationException(
+                message=msg.format(input_name, input_value.mode, InputOutputModes.DIRECT),
+                no_personal_data_message=msg.format("[input_name]", "[input_value.mode]", "direct"),
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        elif (
+            isinstance(input_value, NodeInput)
+            and (
+                isinstance(input_value._data, Input)
+                and not (
+                    isinstance(input_value._data.path, str)
+                    and bool(re.search(ComponentJobConstants.INPUT_PATTERN, input_value._data.path))
+                )
+                and input_value._data.mode != InputOutputModes.DIRECT
+            )
+            and (isinstance(input_value._meta, Input) and input_value._meta.mode != InputOutputModes.DIRECT)
+        ):
+            # For node input in pipeline job, client side can only validate node input which isn't bound to pipeline
+            # input or node output.
+            # 1. If node input is bound to pipeline input, we can't get pipeline level input mode in node level
+            # validate. Even if we can judge through component input mode (_meta), we should note that pipeline level
+            # input mode has higher priority than component level. so component input can be set "Mount", but it can
+            # run successfully when pipeline input is "Direct".
+            # 2. If node input is bound to last node output, input mode should be decoupled with output mode, so we
+            # always get None mode in node level. In this case, if we define correct "Direct" mode in component yaml,
+            # component level mode will take effect and run successfully. Otherwise, it need to set mode in node level
+            # like input1: path: ${{parent.jobs.sample_word.outputs.output1}} mode: direct.
+            msg = "Input '{}' is using '{}' mode, only '{}' is supported for Spark job"
+            raise ValidationException(
+                message=msg.format(
+                    input_name, input_value._data.mode or input_value._meta.mode, InputOutputModes.DIRECT
+                ),
+                no_personal_data_message=msg.format("[input_name]", "[input_value.mode]", "direct"),
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+    for output_name, output_value in outputs.items():
+        if (
+            isinstance(output_value, Output)
+            and output_name != "default"
+            and output_value.mode != InputOutputModes.DIRECT
+        ):
+            # For standalone job output
+            msg = "Output '{}' is using '{}' mode, only '{}' is supported for Spark job"
+            raise ValidationException(
+                message=msg.format(output_name, output_value.mode, InputOutputModes.DIRECT),
+                no_personal_data_message=msg.format("[output_name]", "[output_value.mode]", "direct"),
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+        elif (
+            isinstance(output_value, NodeOutput)
+            and output_name != "default"
+            and (
+                isinstance(output_value._data, Output)
+                and not (
+                    isinstance(output_value._data.path, str)
+                    and bool(re.search(ComponentJobConstants.OUTPUT_PATTERN, output_value._data.path))
+                )
+                and output_value._data.mode != InputOutputModes.DIRECT
+            )
+            and (isinstance(output_value._meta, Output) and output_value._meta.mode != InputOutputModes.DIRECT)
+        ):
+            # For node output in pipeline job, client side can only validate node output which isn't bound to pipeline
+            # output.
+            # 1. If node output is bound to pipeline output, we can't get pipeline level output mode in node level
+            # validate. Even if we can judge through component output mode (_meta), we should note that pipeline level
+            # output mode has higher priority than component level. so component output can be set "upload", but it
+            # can run successfully when pipeline output is "Direct".
+            msg = "Output '{}' is using '{}' mode, only '{}' is supported for Spark job"
+            raise ValidationException(
+                message=msg.format(
+                    output_name, output_value._data.mode or output_value._meta.mode, InputOutputModes.DIRECT
+                ),
+                no_personal_data_message=msg.format("[output_name]", "[output_value.mode]", "direct"),
+                target=ErrorTarget.SPARK_JOB,
+                error_category=ErrorCategory.USER_ERROR,
+            )