diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py')
| -rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py new file mode 100644 index 00000000..d3fdf9dc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_helpers.py @@ -0,0 +1,210 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access +import re +from typing import Any + +from azure.ai.ml.constants import InputOutputModes +from azure.ai.ml.constants._component import ComponentJobConstants +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job.pipeline._io import NodeInput, NodeOutput +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException + + +def _validate_spark_configurations(obj: Any) -> None: + # skip validation when component of node is from remote + if hasattr(obj, "component") and isinstance(obj.component, str): + return + if obj.dynamic_allocation_enabled in ["True", "true", True]: + if ( + obj.driver_cores is None + or obj.driver_memory is None + or obj.executor_cores is None + or obj.executor_memory is None + ): + msg = ( + "spark.driver.cores, spark.driver.memory, spark.executor.cores and spark.executor.memory are " + "mandatory fields." + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + if obj.dynamic_allocation_min_executors is None or obj.dynamic_allocation_max_executors is None: + msg = ( + "spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors are required " + "when dynamic allocation is enabled." + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + if not ( + obj.dynamic_allocation_min_executors > 0 + and obj.dynamic_allocation_min_executors <= obj.dynamic_allocation_max_executors + ): + msg = ( + "Dynamic min executors should be bigger than 0 and min executors should be equal or less than " + "max executors." + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + if obj.executor_instances and ( + obj.executor_instances > obj.dynamic_allocation_max_executors + or obj.executor_instances < obj.dynamic_allocation_min_executors + ): + msg = ( + "Executor instances must be a valid non-negative integer and must be between " + "spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors" + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + else: + if ( + obj.driver_cores is None + or obj.driver_memory is None + or obj.executor_cores is None + or obj.executor_memory is None + or obj.executor_instances is None + ): + msg = ( + "spark.driver.cores, spark.driver.memory, spark.executor.cores, spark.executor.memory and " + "spark.executor.instances are mandatory fields." + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + if obj.dynamic_allocation_min_executors is not None or obj.dynamic_allocation_max_executors is not None: + msg = "Should not specify min or max executors when dynamic allocation is disabled." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + + +def _validate_compute_or_resources(compute: Any, resources: Any) -> None: + # if resources is set, then ensure it is valid before + # checking mutual exclusiveness against compute existence + if compute is None and resources is None: + msg = "One of either compute or resources must be specified for Spark job" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + if compute and resources: + msg = "Only one of either compute or resources may be specified for Spark job" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + + +# Only "direct" mode is supported for spark job inputs and outputs +# pylint: disable=no-else-raise, too-many-boolean-expressions +def _validate_input_output_mode(inputs: Any, outputs: Any) -> None: + for input_name, input_value in inputs.items(): + if isinstance(input_value, Input) and input_value.mode != InputOutputModes.DIRECT: + # For standalone job input + msg = "Input '{}' is using '{}' mode, only '{}' is supported for Spark job" + raise ValidationException( + message=msg.format(input_name, input_value.mode, InputOutputModes.DIRECT), + no_personal_data_message=msg.format("[input_name]", "[input_value.mode]", "direct"), + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + elif ( + isinstance(input_value, NodeInput) + and ( + isinstance(input_value._data, Input) + and not ( + isinstance(input_value._data.path, str) + and bool(re.search(ComponentJobConstants.INPUT_PATTERN, input_value._data.path)) + ) + and input_value._data.mode != InputOutputModes.DIRECT + ) + and (isinstance(input_value._meta, Input) and input_value._meta.mode != InputOutputModes.DIRECT) + ): + # For node input in pipeline job, client side can only validate node input which isn't bound to pipeline + # input or node output. + # 1. If node input is bound to pipeline input, we can't get pipeline level input mode in node level + # validate. Even if we can judge through component input mode (_meta), we should note that pipeline level + # input mode has higher priority than component level. so component input can be set "Mount", but it can + # run successfully when pipeline input is "Direct". + # 2. If node input is bound to last node output, input mode should be decoupled with output mode, so we + # always get None mode in node level. In this case, if we define correct "Direct" mode in component yaml, + # component level mode will take effect and run successfully. Otherwise, it need to set mode in node level + # like input1: path: ${{parent.jobs.sample_word.outputs.output1}} mode: direct. + msg = "Input '{}' is using '{}' mode, only '{}' is supported for Spark job" + raise ValidationException( + message=msg.format( + input_name, input_value._data.mode or input_value._meta.mode, InputOutputModes.DIRECT + ), + no_personal_data_message=msg.format("[input_name]", "[input_value.mode]", "direct"), + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + + for output_name, output_value in outputs.items(): + if ( + isinstance(output_value, Output) + and output_name != "default" + and output_value.mode != InputOutputModes.DIRECT + ): + # For standalone job output + msg = "Output '{}' is using '{}' mode, only '{}' is supported for Spark job" + raise ValidationException( + message=msg.format(output_name, output_value.mode, InputOutputModes.DIRECT), + no_personal_data_message=msg.format("[output_name]", "[output_value.mode]", "direct"), + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + elif ( + isinstance(output_value, NodeOutput) + and output_name != "default" + and ( + isinstance(output_value._data, Output) + and not ( + isinstance(output_value._data.path, str) + and bool(re.search(ComponentJobConstants.OUTPUT_PATTERN, output_value._data.path)) + ) + and output_value._data.mode != InputOutputModes.DIRECT + ) + and (isinstance(output_value._meta, Output) and output_value._meta.mode != InputOutputModes.DIRECT) + ): + # For node output in pipeline job, client side can only validate node output which isn't bound to pipeline + # output. + # 1. If node output is bound to pipeline output, we can't get pipeline level output mode in node level + # validate. Even if we can judge through component output mode (_meta), we should note that pipeline level + # output mode has higher priority than component level. so component output can be set "upload", but it + # can run successfully when pipeline output is "Direct". + msg = "Output '{}' is using '{}' mode, only '{}' is supported for Spark job" + raise ValidationException( + message=msg.format( + output_name, output_value._data.mode or output_value._meta.mode, InputOutputModes.DIRECT + ), + no_personal_data_message=msg.format("[output_name]", "[output_value.mode]", "direct"), + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) |
