diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_job.py | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_job.py')
| -rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_job.py | 393 |
1 files changed, 393 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_job.py new file mode 100644 index 00000000..10930fb4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_job/spark_job.py @@ -0,0 +1,393 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access, too-many-instance-attributes + +import copy +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, Optional, Union + +from marshmallow import INCLUDE + +from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase +from azure.ai.ml._restclient.v2023_04_01_preview.models import SparkJob as RestSparkJob +from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema +from azure.ai.ml._schema.job.parameterized_spark import CONF_KEY_MAP +from azure.ai.ml._schema.job.spark_job import SparkJobSchema +from azure.ai.ml.constants import JobType +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, TYPE +from azure.ai.ml.constants._job.job import SparkConfKey +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, + _BaseJobIdentityConfiguration, +) +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job._input_output_helpers import ( + from_rest_data_outputs, + from_rest_inputs_to_dataset_literal, + to_rest_data_outputs, + to_rest_dataset_literal_inputs, + validate_inputs_for_args, +) +from azure.ai.ml.entities._job.parameterized_spark import ParameterizedSpark +from azure.ai.ml.entities._util import load_from_dict + +from ..._schema import NestedField, UnionField +from .job import Job +from .job_io_mixin import JobIOMixin +from .spark_helpers import _validate_compute_or_resources, _validate_input_output_mode, _validate_spark_configurations +from .spark_job_entry import SparkJobEntry +from .spark_job_entry_mixin import SparkJobEntryMixin +from .spark_resource_configuration import SparkResourceConfiguration + +# avoid circular import error +if TYPE_CHECKING: + from azure.ai.ml.entities import SparkComponent + from azure.ai.ml.entities._builders import Spark + +module_logger = logging.getLogger(__name__) + + +class SparkJob(Job, ParameterizedSpark, JobIOMixin, SparkJobEntryMixin): + """A standalone Spark job. + + :keyword driver_cores: The number of cores to use for the driver process, only in cluster mode. + :paramtype driver_cores: Optional[int] + :keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit + suffix ("k", "m", "g" or "t") (e.g. "512m", "2g"). + :paramtype driver_memory: Optional[str] + :keyword executor_cores: The number of cores to use on each executor. + :paramtype executor_cores: Optional[int] + :keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit + suffix ("k", "m", "g" or "t") (e.g. "512m", "2g"). + :paramtype executor_memory: Optional[str] + :keyword executor_instances: The initial number of executors. + :paramtype executor_instances: Optional[int] + :keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of + executors registered with this application up and down based on the workload. + :paramtype dynamic_allocation_enabled: Optional[bool] + :keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is + enabled. + :paramtype dynamic_allocation_min_executors: Optional[int] + :keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is + enabled. + :paramtype dynamic_allocation_max_executors: Optional[int] + :keyword inputs: The mapping of input data bindings used in the job. + :paramtype inputs: Optional[dict[str, ~azure.ai.ml.Input]] + :keyword outputs: The mapping of output data bindings used in the job. + :paramtype outputs: Optional[dict[str, ~azure.ai.ml.Output]] + :keyword compute: The compute resource the job runs on. + :paramtype compute: Optional[str] + :keyword identity: The identity that the Spark job will use while running on compute. + :paramtype identity: Optional[Union[dict[str, str], ~azure.ai.ml.ManagedIdentityConfiguration, + ~azure.ai.ml.AmlTokenConfiguration, ~azure.ai.ml.UserIdentityConfiguration]] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_spark_configurations.py + :start-after: [START spark_job_configuration] + :end-before: [END spark_job_configuration] + :language: python + :dedent: 8 + :caption: Configuring a SparkJob. + """ + + def __init__( + self, + *, + driver_cores: Optional[Union[int, str]] = None, + driver_memory: Optional[str] = None, + executor_cores: Optional[Union[int, str]] = None, + executor_memory: Optional[str] = None, + executor_instances: Optional[Union[int, str]] = None, + dynamic_allocation_enabled: Optional[Union[bool, str]] = None, + dynamic_allocation_min_executors: Optional[Union[int, str]] = None, + dynamic_allocation_max_executors: Optional[Union[int, str]] = None, + inputs: Optional[Dict[str, Union[Input, str, bool, int, float]]] = None, + outputs: Optional[Dict[str, Output]] = None, + compute: Optional[str] = None, + identity: Optional[ + Union[Dict[str, str], ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration] + ] = None, + resources: Optional[Union[Dict, SparkResourceConfiguration]] = None, + **kwargs: Any, + ) -> None: + kwargs[TYPE] = JobType.SPARK + + super().__init__(**kwargs) + self.conf: Dict = self.conf or {} + self.properties_sparkJob = self.properties or {} + self.driver_cores = driver_cores + self.driver_memory = driver_memory + self.executor_cores = executor_cores + self.executor_memory = executor_memory + self.executor_instances = executor_instances + self.dynamic_allocation_enabled = dynamic_allocation_enabled + self.dynamic_allocation_min_executors = dynamic_allocation_min_executors + self.dynamic_allocation_max_executors = dynamic_allocation_max_executors + self.inputs = inputs # type: ignore[assignment] + self.outputs = outputs # type: ignore[assignment] + self.compute = compute + self.resources = resources + self.identity = identity + if self.executor_instances is None and str(self.dynamic_allocation_enabled).lower() == "true": + self.executor_instances = self.dynamic_allocation_min_executors + + @property + def resources(self) -> Optional[Union[Dict, SparkResourceConfiguration]]: + """The compute resource configuration for the job. + + :return: The compute resource configuration for the job. + :rtype: Optional[~azure.ai.ml.entities.SparkResourceConfiguration] + """ + return self._resources + + @resources.setter + def resources(self, value: Optional[Union[Dict[str, str], SparkResourceConfiguration]]) -> None: + """Sets the compute resource configuration for the job. + + :param value: The compute resource configuration for the job. + :type value: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkResourceConfiguration]] + """ + if isinstance(value, dict): + value = SparkResourceConfiguration(**value) + self._resources = value + + @property + def identity( + self, + ) -> Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]: + """The identity that the Spark job will use while running on compute. + + :return: The identity that the Spark job will use while running on compute. + :rtype: Optional[Union[~azure.ai.ml.ManagedIdentityConfiguration, ~azure.ai.ml.AmlTokenConfiguration, + ~azure.ai.ml.UserIdentityConfiguration]] + """ + return self._identity + + @identity.setter + def identity( + self, + value: Optional[ + Union[Dict[str, str], ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration] + ], + ) -> None: + """Sets the identity that the Spark job will use while running on compute. + + :param value: The identity that the Spark job will use while running on compute. + :type value: Optional[Union[dict[str, str], ~azure.ai.ml.ManagedIdentityConfiguration, + ~azure.ai.ml.AmlTokenConfiguration, ~azure.ai.ml.UserIdentityConfiguration]] + """ + if isinstance(value, dict): + identify_schema = UnionField( + [ + NestedField(ManagedIdentitySchema, unknown=INCLUDE), + NestedField(AMLTokenIdentitySchema, unknown=INCLUDE), + NestedField(UserIdentitySchema, unknown=INCLUDE), + ] + ) + value = identify_schema._deserialize(value=value, attr=None, data=None) + self._identity = value + + def _to_dict(self) -> Dict: + res: dict = SparkJobSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self) + return res + + def filter_conf_fields(self) -> Dict[str, str]: + """Filters out the fields of the conf attribute that are not among the Spark configuration fields + listed in ~azure.ai.ml._schema.job.parameterized_spark.CONF_KEY_MAP and returns them in their own dictionary. + + :return: A dictionary of the conf fields that are not Spark configuration fields. + :rtype: dict[str, str] + """ + if self.conf is None: + return {} + data_conf = {} + for conf_key, conf_val in self.conf.items(): + if not conf_key in CONF_KEY_MAP: + data_conf[conf_key] = conf_val + return data_conf + + def _to_rest_object(self) -> JobBase: + self._validate() + conf = { + **(self.filter_conf_fields()), + "spark.driver.cores": self.driver_cores, + "spark.driver.memory": self.driver_memory, + "spark.executor.cores": self.executor_cores, + "spark.executor.memory": self.executor_memory, + } + if self.dynamic_allocation_enabled in ["True", "true", True]: + conf["spark.dynamicAllocation.enabled"] = True + conf["spark.dynamicAllocation.minExecutors"] = self.dynamic_allocation_min_executors + conf["spark.dynamicAllocation.maxExecutors"] = self.dynamic_allocation_max_executors + if self.executor_instances is not None: + conf["spark.executor.instances"] = self.executor_instances + + properties = RestSparkJob( + experiment_name=self.experiment_name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + code_id=self.code, + entry=self.entry._to_rest_object() if self.entry is not None and not isinstance(self.entry, dict) else None, + py_files=self.py_files, + jars=self.jars, + files=self.files, + archives=self.archives, + identity=( + self.identity._to_job_rest_object() if self.identity and not isinstance(self.identity, dict) else None + ), + conf=conf, + properties=self.properties_sparkJob, + environment_id=self.environment, + inputs=to_rest_dataset_literal_inputs(self.inputs, job_type=self.type), + outputs=to_rest_data_outputs(self.outputs), + args=self.args, + compute_id=self.compute, + resources=( + self.resources._to_rest_object() if self.resources and not isinstance(self.resources, Dict) else None + ), + ) + result = JobBase(properties=properties) + result.name = self.name + return result + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "SparkJob": + loaded_data = load_from_dict(SparkJobSchema, data, context, additional_message, **kwargs) + return SparkJob(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + @classmethod + def _load_from_rest(cls, obj: JobBase) -> "SparkJob": + rest_spark_job: RestSparkJob = obj.properties + rest_spark_conf = copy.copy(rest_spark_job.conf) or {} + spark_job = SparkJob( + name=obj.name, + entry=SparkJobEntry._from_rest_object(rest_spark_job.entry), + experiment_name=rest_spark_job.experiment_name, + id=obj.id, + display_name=rest_spark_job.display_name, + description=rest_spark_job.description, + tags=rest_spark_job.tags, + properties=rest_spark_job.properties, + services=rest_spark_job.services, + status=rest_spark_job.status, + creation_context=obj.system_data, + code=rest_spark_job.code_id, + compute=rest_spark_job.compute_id, + environment=rest_spark_job.environment_id, + identity=( + _BaseJobIdentityConfiguration._from_rest_object(rest_spark_job.identity) + if rest_spark_job.identity + else None + ), + args=rest_spark_job.args, + conf=rest_spark_conf, + driver_cores=rest_spark_conf.get( + SparkConfKey.DRIVER_CORES, None + ), # copy fields from conf into the promote attribute in spark + driver_memory=rest_spark_conf.get(SparkConfKey.DRIVER_MEMORY, None), + executor_cores=rest_spark_conf.get(SparkConfKey.EXECUTOR_CORES, None), + executor_memory=rest_spark_conf.get(SparkConfKey.EXECUTOR_MEMORY, None), + executor_instances=rest_spark_conf.get(SparkConfKey.EXECUTOR_INSTANCES, None), + dynamic_allocation_enabled=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None), + dynamic_allocation_min_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None), + dynamic_allocation_max_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None), + resources=SparkResourceConfiguration._from_rest_object(rest_spark_job.resources), + inputs=from_rest_inputs_to_dataset_literal(rest_spark_job.inputs), + outputs=from_rest_data_outputs(rest_spark_job.outputs), + ) + return spark_job + + def _to_component(self, context: Optional[Dict] = None, **kwargs: Any) -> "SparkComponent": + """Translate a spark job to component. + + :param context: Context of spark job YAML file. + :type context: dict + :return: Translated spark component. + :rtype: SparkComponent + """ + from azure.ai.ml.entities import SparkComponent + + pipeline_job_dict = kwargs.get("pipeline_job_dict", {}) + context = context or {BASE_PATH_CONTEXT_KEY: Path("./")} + + # Create anonymous spark component with default version as 1 + return SparkComponent( + tags=self.tags, + is_anonymous=True, + base_path=context[BASE_PATH_CONTEXT_KEY], + description=self.description, + code=self.code, + entry=self.entry, + py_files=self.py_files, + jars=self.jars, + files=self.files, + archives=self.archives, + driver_cores=self.driver_cores, + driver_memory=self.driver_memory, + executor_cores=self.executor_cores, + executor_memory=self.executor_memory, + executor_instances=self.executor_instances, + dynamic_allocation_enabled=self.dynamic_allocation_enabled, + dynamic_allocation_min_executors=self.dynamic_allocation_min_executors, + dynamic_allocation_max_executors=self.dynamic_allocation_max_executors, + conf=self.conf, + properties=self.properties_sparkJob, + environment=self.environment, + inputs=self._to_inputs(inputs=self.inputs, pipeline_job_dict=pipeline_job_dict), + outputs=self._to_outputs(outputs=self.outputs, pipeline_job_dict=pipeline_job_dict), + args=self.args, + ) + + def _to_node(self, context: Optional[Dict] = None, **kwargs: Any) -> "Spark": + """Translate a spark job to a pipeline node. + + :param context: Context of spark job YAML file. + :type context: dict + :return: Translated spark component. + :rtype: Spark + """ + from azure.ai.ml.entities._builders import Spark + + component = self._to_component(context, **kwargs) + + return Spark( + display_name=self.display_name, + description=self.description, + tags=self.tags, + # code, entry, py_files, jars, files, archives, environment and args are static and not allowed to be + # overwritten. And we will always get them from component. + component=component, + identity=self.identity, + driver_cores=self.driver_cores, + driver_memory=self.driver_memory, + executor_cores=self.executor_cores, + executor_memory=self.executor_memory, + executor_instances=self.executor_instances, + dynamic_allocation_enabled=self.dynamic_allocation_enabled, + dynamic_allocation_min_executors=self.dynamic_allocation_min_executors, + dynamic_allocation_max_executors=self.dynamic_allocation_max_executors, + conf=self.conf, + inputs=self.inputs, # type: ignore[arg-type] + outputs=self.outputs, # type: ignore[arg-type] + compute=self.compute, + resources=self.resources, + properties=self.properties_sparkJob, + ) + + def _validate(self) -> None: + # TODO: make spark job schema validatable? + if self.resources and not isinstance(self.resources, Dict): + self.resources._validate() + _validate_compute_or_resources(self.compute, self.resources) + _validate_input_output_mode(self.inputs, self.outputs) + _validate_spark_configurations(self) + self._validate_entry() + + if self.args: + validate_inputs_for_args(self.args, self.inputs) |
