about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py306
1 files changed, 306 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py
new file mode 100644
index 00000000..342f8c44
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py
@@ -0,0 +1,306 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, too-many-locals
+
+import os
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import AmlToken, ManagedIdentity, UserIdentity
+from azure.ai.ml.constants._common import AssetTypes
+from azure.ai.ml.constants._component import ComponentSource
+from azure.ai.ml.entities import Environment
+from azure.ai.ml.entities._component.spark_component import SparkComponent
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
+from azure.ai.ml.entities._job.spark_job_entry import SparkJobEntry
+from azure.ai.ml.entities._job.spark_resource_configuration import SparkResourceConfiguration
+from azure.ai.ml.exceptions import ErrorTarget, ValidationException
+
+from .spark import Spark
+
+SUPPORTED_INPUTS = [AssetTypes.URI_FILE, AssetTypes.URI_FOLDER, AssetTypes.MLTABLE]
+
+
+def _parse_input(input_value: Union[Input, dict, str, bool, int, float]) -> Tuple:
+    component_input = None
+    job_input: Union[Input, dict, str, bool, int, float] = ""
+
+    if isinstance(input_value, Input):
+        component_input = Input(**input_value._to_dict())
+        input_type = input_value.type
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value._to_dict())
+    elif isinstance(input_value, dict):
+        # if user provided dict, we try to parse it to Input.
+        # for job input, only parse for path type
+        input_type = input_value.get("type", None)
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value)
+        component_input = Input(**input_value)
+    elif isinstance(input_value, (str, bool, int, float)):
+        # Input bindings are not supported
+        component_input = ComponentTranslatableMixin._to_input_builder_function(input_value)
+        job_input = input_value
+    else:
+        msg = f"Unsupported input type: {type(input_value)}, only Input, dict, str, bool, int and float are supported."
+        raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB)
+    return component_input, job_input
+
+
+def _parse_output(output_value: Union[Output, dict]) -> Tuple:
+    component_output = None
+    job_output: Union[Output, dict] = {}
+
+    if isinstance(output_value, Output):
+        component_output = Output(**output_value._to_dict())
+        job_output = Output(**output_value._to_dict())
+    elif not output_value:
+        # output value can be None or empty dictionary
+        # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder
+        component_output = ComponentTranslatableMixin._to_output(output_value)
+        job_output = output_value
+    elif isinstance(output_value, dict):  # When output value is a non-empty dictionary
+        job_output = Output(**output_value)
+        component_output = Output(**output_value)
+    elif isinstance(output_value, str):  # When output is passed in from pipeline job yaml
+        job_output = output_value
+    else:
+        msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported."
+        raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB)
+    return component_output, job_output
+
+
+def _parse_inputs_outputs(io_dict: Dict, parse_func: Callable) -> Tuple[Dict, Dict]:
+    component_io_dict, job_io_dict = {}, {}
+    if io_dict:
+        for key, val in io_dict.items():
+            component_io, job_io = parse_func(val)
+            component_io_dict[key] = component_io
+            job_io_dict[key] = job_io
+    return component_io_dict, job_io_dict
+
+
+def spark(
+    *,
+    experiment_name: Optional[str] = None,
+    name: Optional[str] = None,
+    display_name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    code: Optional[Union[str, os.PathLike]] = None,
+    entry: Union[Dict[str, str], SparkJobEntry, None] = None,
+    py_files: Optional[List[str]] = None,
+    jars: Optional[List[str]] = None,
+    files: Optional[List[str]] = None,
+    archives: Optional[List[str]] = None,
+    identity: Optional[Union[Dict[str, str], ManagedIdentity, AmlToken, UserIdentity]] = None,
+    driver_cores: Optional[int] = None,
+    driver_memory: Optional[str] = None,
+    executor_cores: Optional[int] = None,
+    executor_memory: Optional[str] = None,
+    executor_instances: Optional[int] = None,
+    dynamic_allocation_enabled: Optional[bool] = None,
+    dynamic_allocation_min_executors: Optional[int] = None,
+    dynamic_allocation_max_executors: Optional[int] = None,
+    conf: Optional[Dict[str, str]] = None,
+    environment: Optional[Union[str, Environment]] = None,
+    inputs: Optional[Dict] = None,
+    outputs: Optional[Dict] = None,
+    args: Optional[str] = None,
+    compute: Optional[str] = None,
+    resources: Optional[Union[Dict, SparkResourceConfiguration]] = None,
+    **kwargs: Any,
+) -> Spark:
+    """Creates a Spark object which can be used inside a dsl.pipeline function or used as a standalone Spark job.
+
+    :keyword experiment_name:  The name of the experiment the job will be created under.
+    :paramtype experiment_name: Optional[str]
+    :keyword name: The name of the job.
+    :paramtype name: Optional[str]
+    :keyword display_name: The job display name.
+    :paramtype display_name: Optional[str]
+    :keyword description: The description of the job. Defaults to None.
+    :paramtype description: Optional[str]
+    :keyword tags: The dictionary of tags for the job. Tags can be added, removed, and updated. Defaults to None.
+    :paramtype tags: Optional[dict[str, str]]
+    :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url
+        pointing to a remote location.
+    :type code: Optional[Union[str, os.PathLike]]
+    :keyword entry: The file or class entry point.
+    :paramtype entry: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkJobEntry]]
+    :keyword py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps.
+        Defaults to None.
+    :paramtype py_files: Optional[List[str]]
+    :keyword jars: The list of .JAR files to include on the driver and executor classpaths. Defaults to None.
+    :paramtype jars: Optional[List[str]]
+    :keyword files: The list of files to be placed in the working directory of each executor. Defaults to None.
+    :paramtype files: Optional[List[str]]
+    :keyword archives: The list of archives to be extracted into the working directory of each executor.
+        Defaults to None.
+    :paramtype archives: Optional[List[str]]
+    :keyword identity: The identity that the Spark job will use while running on compute.
+    :paramtype identity: Optional[Union[
+        dict[str, str],
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration]]
+    :keyword driver_cores: The number of cores to use for the driver process, only in cluster mode.
+    :paramtype driver_cores: Optional[int]
+    :keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
+        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+    :paramtype driver_memory: Optional[str]
+    :keyword executor_cores: The number of cores to use on each executor.
+    :paramtype executor_cores: Optional[int]
+    :keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
+        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
+    :paramtype executor_memory: Optional[str]
+    :keyword executor_instances: The initial number of executors.
+    :paramtype executor_instances: Optional[int]
+    :keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
+        executors registered with this application up and down based on the workload.
+    :paramtype dynamic_allocation_enabled: Optional[bool]
+    :keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is
+        enabled.
+    :paramtype dynamic_allocation_min_executors: Optional[int]
+    :keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is
+        enabled.
+    :paramtype dynamic_allocation_max_executors: Optional[int]
+    :keyword conf: A dictionary with pre-defined Spark configurations key and values. Defaults to None.
+    :paramtype conf: Optional[dict[str, str]]
+    :keyword environment: The Azure ML environment to run the job in.
+    :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+    :keyword inputs: A mapping of input names to input data used in the job. Defaults to None.
+    :paramtype inputs: Optional[dict[str, ~azure.ai.ml.Input]]
+    :keyword outputs: A mapping of output names to output data used in the job. Defaults to None.
+    :paramtype outputs: Optional[dict[str, ~azure.ai.ml.Output]]
+    :keyword args: The arguments for the job.
+    :paramtype args: Optional[str]
+    :keyword compute: The compute resource the job runs on.
+    :paramtype compute: Optional[str]
+    :keyword resources: The compute resource configuration for the job.
+    :paramtype resources: Optional[Union[dict, ~azure.ai.ml.entities.SparkResourceConfiguration]]
+    :return: A Spark object.
+    :rtype: ~azure.ai.ml.entities.Spark
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_spark_configurations.py
+            :start-after: [START spark_function_configuration_1]
+            :end-before: [END spark_function_configuration_1]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a SparkJob.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_spark_configurations.py
+            :start-after: [START spark_function_configuration_2]
+            :end-before: [END spark_function_configuration_2]
+            :language: python
+            :dedent: 8
+            :caption: Configuring a SparkJob.
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_spark_configurations.py
+            :start-after: [START spark_dsl_pipeline]
+            :end-before: [END spark_dsl_pipeline]
+            :language: python
+            :dedent: 8
+            :caption: Building a Spark pipeline using the DSL pipeline decorator
+    """
+
+    inputs = inputs or {}
+    outputs = outputs or {}
+    component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+    component = kwargs.pop("component", None)
+
+    if component is None:
+        component = SparkComponent(
+            name=name,
+            display_name=display_name,
+            tags=tags,
+            description=description,
+            code=code,
+            entry=entry,
+            py_files=py_files,
+            jars=jars,
+            files=files,
+            archives=archives,
+            driver_cores=driver_cores,
+            driver_memory=driver_memory,
+            executor_cores=executor_cores,
+            executor_memory=executor_memory,
+            executor_instances=executor_instances,
+            dynamic_allocation_enabled=dynamic_allocation_enabled,
+            dynamic_allocation_min_executors=dynamic_allocation_min_executors,
+            dynamic_allocation_max_executors=dynamic_allocation_max_executors,
+            conf=conf,
+            environment=environment,
+            inputs=component_inputs,
+            outputs=component_outputs,
+            args=args,
+            _source=ComponentSource.BUILDER,
+            **kwargs,
+        )
+    if isinstance(component, SparkComponent):
+        spark_obj = Spark(
+            experiment_name=experiment_name,
+            name=name,
+            display_name=display_name,
+            tags=tags,
+            description=description,
+            component=component,
+            identity=identity,
+            driver_cores=driver_cores,
+            driver_memory=driver_memory,
+            executor_cores=executor_cores,
+            executor_memory=executor_memory,
+            executor_instances=executor_instances,
+            dynamic_allocation_enabled=dynamic_allocation_enabled,
+            dynamic_allocation_min_executors=dynamic_allocation_min_executors,
+            dynamic_allocation_max_executors=dynamic_allocation_max_executors,
+            conf=conf,
+            inputs=job_inputs,
+            outputs=job_outputs,
+            compute=compute,
+            resources=resources,
+            **kwargs,
+        )
+    else:
+        # when we load a remote job, component now is an arm_id, we need get entry from node level returned from
+        # service
+        spark_obj = Spark(
+            experiment_name=experiment_name,
+            name=name,
+            display_name=display_name,
+            tags=tags,
+            description=description,
+            component=component,
+            identity=identity,
+            driver_cores=driver_cores,
+            driver_memory=driver_memory,
+            executor_cores=executor_cores,
+            executor_memory=executor_memory,
+            executor_instances=executor_instances,
+            dynamic_allocation_enabled=dynamic_allocation_enabled,
+            dynamic_allocation_min_executors=dynamic_allocation_min_executors,
+            dynamic_allocation_max_executors=dynamic_allocation_max_executors,
+            conf=conf,
+            inputs=job_inputs,
+            outputs=job_outputs,
+            compute=compute,
+            resources=resources,
+            entry=entry,
+            py_files=py_files,
+            jars=jars,
+            files=files,
+            archives=archives,
+            args=args,
+            **kwargs,
+        )
+    return spark_obj