aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py192
1 files changed, 192 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py
new file mode 100644
index 00000000..345fa5f2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py
@@ -0,0 +1,192 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Dict, List, Optional, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from ...constants._job.job import RestSparkConfKey
+from ...entities import Environment, SparkJobEntry
+from ...entities._job.parameterized_spark import DUMMY_IMAGE, ParameterizedSpark
+from ...entities._job.spark_job_entry_mixin import SparkJobEntryMixin
+from .._schema.component import InternalSparkComponentSchema
+from ..entities import InternalComponent
+from .environment import InternalEnvironment
+
+
+class InternalSparkComponent(
+ InternalComponent, ParameterizedSpark, SparkJobEntryMixin
+): # pylint: disable=too-many-instance-attributes, too-many-ancestors
+ """Internal Spark Component
+ This class is used to handle internal spark component.
+ It can be loaded from internal spark component yaml or from rest object of an internal spark component.
+ But after loaded, its structure will be the same as spark component.
+ """
+
+ def __init__(
+ self,
+ entry: Union[Dict[str, str], SparkJobEntry, None] = None,
+ py_files: Optional[List[str]] = None,
+ jars: Optional[List[str]] = None,
+ files: Optional[List[str]] = None,
+ archives: Optional[List[str]] = None,
+ driver_cores: Optional[int] = None,
+ driver_memory: Optional[str] = None,
+ executor_cores: Optional[int] = None,
+ executor_memory: Optional[str] = None,
+ executor_instances: Optional[int] = None,
+ dynamic_allocation_enabled: Optional[bool] = None,
+ dynamic_allocation_min_executors: Optional[int] = None,
+ dynamic_allocation_max_executors: Optional[int] = None,
+ conf: Optional[Dict[str, str]] = None,
+ args: Optional[str] = None,
+ **kwargs,
+ ):
+ SparkJobEntryMixin.__init__(self, entry=entry, **kwargs)
+ # environment.setter has been overridden in ParameterizedSpark, so we need to pop it out here
+ environment = kwargs.pop("environment", None)
+ InternalComponent.__init__(self, **kwargs)
+ # Pop it to avoid passing multiple values for code in ParameterizedSpark.__init__
+ code = kwargs.pop("code", None)
+ ParameterizedSpark.__init__(
+ self,
+ code=self.base_path,
+ entry=entry,
+ py_files=py_files,
+ jars=jars,
+ files=files,
+ archives=archives,
+ conf=conf,
+ environment=environment,
+ args=args,
+ **kwargs,
+ )
+ self.code = code
+ # For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf.
+ # If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent
+ # verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node
+ # in pipeline sdk
+ conf = conf or {}
+ self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None)
+ self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None)
+ self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None)
+ self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None)
+ self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None)
+ self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None
+ )
+ self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None
+ )
+ self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None
+ )
+
+ self.conf = conf
+ self.args = args
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ return InternalSparkComponentSchema(context=context)
+
+ @property # type: ignore[override]
+ def environment(self) -> Optional[Union[Environment, str]]:
+ """Get the environment of the component.
+
+ :return: The environment of the component.
+ :rtype: Optional[Union[Environment, str]]]
+ """
+ if isinstance(self._environment, Environment) and self._environment.image is None:
+ return Environment(conda_file=self._environment.conda_file, image=DUMMY_IMAGE)
+ return self._environment
+
+ @environment.setter
+ def environment(self, value):
+ """Set the environment of the component.
+
+ :param value: The environment of the component.
+ :type value: Union[str, Environment, dict]
+ :return: No return
+ :rtype: None
+ """
+ if value is None or isinstance(value, (str, Environment)):
+ self._environment = value
+ elif isinstance(value, dict):
+ internal_environment = InternalEnvironment(**value)
+ internal_environment.resolve(self.base_path)
+ self._environment = Environment(
+ name=internal_environment.name,
+ version=internal_environment.version,
+ )
+ if internal_environment.conda:
+ self._environment.conda_file = {
+ "dependencies": internal_environment.conda[InternalEnvironment.CONDA_DEPENDENCIES]
+ }
+ if internal_environment.docker:
+ self._environment.image = internal_environment.docker["image"]
+ # we suppose that loaded internal spark component won't be used to create another internal spark component
+ # so the environment construction here can be simplified
+ else:
+ raise ValueError(f"Unsupported environment type: {type(value)}")
+
+ @property
+ def jars(self) -> Optional[List[str]]:
+ """Get the jars of the component.
+
+ :return: The jars of the component.
+ :rtype: Optional[List[str]]
+ """
+ return self._jars
+
+ @jars.setter
+ def jars(self, value: Union[str, List[str]]):
+ """Set the jars of the component.
+
+ :param value: The jars of the component.
+ :type value: Union[str, List[str]]
+ :return: No return
+ :rtype: None
+ """
+ if isinstance(value, str):
+ value = [value]
+ self._jars = value
+
+ @property
+ def py_files(self) -> Optional[List[str]]:
+ """Get the py_files of the component.
+
+ :return: The py_files of the component.
+ :rtype: Optional[List[str]]
+ """
+ return self._py_files
+
+ @py_files.setter
+ def py_files(self, value):
+ """Set the py_files of the component.
+
+ :param value: The py_files of the component.
+ :type value: Union[str, List[str]]
+ :return: No return
+ :rtype: None
+ """
+ if isinstance(value, str):
+ value = [value]
+ self._py_files = value
+
+ def _to_dict(self) -> Dict:
+ result = super()._to_dict()
+ return result
+
+ def _to_rest_object(self):
+ result = super()._to_rest_object()
+ if "pyFiles" in result.properties.component_spec:
+ result.properties.component_spec["py_files"] = result.properties.component_spec.pop("pyFiles")
+ return result
+
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj) -> Dict:
+ if "py_files" in obj.properties.component_spec:
+ obj.properties.component_spec["pyFiles"] = obj.properties.component_spec.pop("py_files")
+ result = super()._from_rest_object_to_init_params(obj)
+ return result