aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_component/spark_component.py
blob: 7da65fb6973295255a41c99e3e671f853a4ddfb1 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os
from typing import Any, Dict, List, Optional, Union

from marshmallow import Schema

from azure.ai.ml._schema.component.spark_component import SparkComponentSchema
from azure.ai.ml.constants._common import COMPONENT_TYPE
from azure.ai.ml.constants._component import NodeType
from azure.ai.ml.constants._job.job import RestSparkConfKey
from azure.ai.ml.entities._assets import Environment
from azure.ai.ml.entities._job.parameterized_spark import ParameterizedSpark

from ..._schema import PathAwareSchema
from .._job.spark_job_entry_mixin import SparkJobEntry, SparkJobEntryMixin
from .._util import convert_ordered_dict_to_dict, validate_attribute_type
from .._validation import MutableValidationResult
from ._additional_includes import AdditionalIncludesMixin
from .component import Component


class SparkComponent(
    Component, ParameterizedSpark, SparkJobEntryMixin, AdditionalIncludesMixin
):  # pylint: disable=too-many-instance-attributes
    """Spark component version, used to define a Spark Component or Job.

    :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing
        to a remote location. Defaults to ".", indicating the current directory.
    :type code: Union[str, os.PathLike]
    :keyword entry: The file or class entry point.
    :paramtype entry: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkJobEntry]]
    :keyword py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps. Defaults to None.
    :paramtype py_files: Optional[List[str]]
    :keyword jars: The list of .JAR files to include on the driver and executor classpaths. Defaults to None.
    :paramtype jars: Optional[List[str]]
    :keyword files: The list of files to be placed in the working directory of each executor. Defaults to None.
    :paramtype files: Optional[List[str]]
    :keyword archives: The list of archives to be extracted into the working directory of each executor.
        Defaults to None.
    :paramtype archives: Optional[List[str]]
    :keyword driver_cores: The number of cores to use for the driver process, only in cluster mode.
    :paramtype driver_cores: Optional[int]
    :keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
    :paramtype driver_memory: Optional[str]
    :keyword executor_cores: The number of cores to use on each executor.
    :paramtype executor_cores: Optional[int]
    :keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
    :paramtype executor_memory: Optional[str]
    :keyword executor_instances: The initial number of executors.
    :paramtype executor_instances: Optional[int]
    :keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
        executors registered with this application up and down based on the workload. Defaults to False.
    :paramtype dynamic_allocation_enabled: Optional[bool]
    :keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is
        enabled.
    :paramtype dynamic_allocation_min_executors: Optional[int]
    :keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is
        enabled.
    :paramtype dynamic_allocation_max_executors: Optional[int]
    :keyword conf: A dictionary with pre-defined Spark configurations key and values. Defaults to None.
    :paramtype conf: Optional[dict[str, str]]
    :keyword environment: The Azure ML environment to run the job in.
    :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
    :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None.
    :paramtype inputs: Optional[dict[str, Union[
        ~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
        ~azure.ai.ml.Input,
        str,
        bool,
        int,
        float,
        Enum,
        ]]]
    :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None.
    :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
    :keyword args: The arguments for the job. Defaults to None.
    :paramtype args: Optional[str]
    :keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None.
    :paramtype additional_includes: Optional[List[str]]

    .. admonition:: Example:

        .. literalinclude:: ../samples/ml_samples_spark_configurations.py
            :start-after: [START spark_component_definition]
            :end-before: [END spark_component_definition]
            :language: python
            :dedent: 8
            :caption: Creating SparkComponent.
    """

    def __init__(
        self,
        *,
        code: Optional[Union[str, os.PathLike]] = ".",
        entry: Optional[Union[Dict[str, str], SparkJobEntry]] = None,
        py_files: Optional[List[str]] = None,
        jars: Optional[List[str]] = None,
        files: Optional[List[str]] = None,
        archives: Optional[List[str]] = None,
        driver_cores: Optional[Union[int, str]] = None,
        driver_memory: Optional[str] = None,
        executor_cores: Optional[Union[int, str]] = None,
        executor_memory: Optional[str] = None,
        executor_instances: Optional[Union[int, str]] = None,
        dynamic_allocation_enabled: Optional[Union[bool, str]] = None,
        dynamic_allocation_min_executors: Optional[Union[int, str]] = None,
        dynamic_allocation_max_executors: Optional[Union[int, str]] = None,
        conf: Optional[Dict[str, str]] = None,
        environment: Optional[Union[str, Environment]] = None,
        inputs: Optional[Dict] = None,
        outputs: Optional[Dict] = None,
        args: Optional[str] = None,
        additional_includes: Optional[List] = None,
        **kwargs: Any,
    ) -> None:
        # validate init params are valid type
        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())

        kwargs[COMPONENT_TYPE] = NodeType.SPARK

        super().__init__(
            inputs=inputs,
            outputs=outputs,
            **kwargs,
        )

        self.code: Optional[Union[str, os.PathLike]] = code
        self.entry = entry
        self.py_files = py_files
        self.jars = jars
        self.files = files
        self.archives = archives
        self.conf = conf
        self.environment = environment
        self.args = args
        self.additional_includes = additional_includes or []
        # For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf.
        # If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent
        # verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node
        # in pipeline sdk
        conf = conf or {}
        self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None)
        self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None)
        self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None)
        self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None)
        self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None)
        self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get(
            RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None
        )
        self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get(
            RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None
        )
        self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get(
            RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None
        )

    @classmethod
    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
        return SparkComponentSchema(context=context)

    @classmethod
    def _attr_type_map(cls) -> dict:
        return {
            "environment": (str, Environment),
            "code": (str, os.PathLike),
        }

    def _customized_validate(self) -> MutableValidationResult:
        validation_result = super()._customized_validate()
        self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
        return validation_result

    def _to_dict(self) -> Dict:
        # TODO: Bug Item number: 2897665
        res: Dict = convert_ordered_dict_to_dict(  # type: ignore
            {**self._other_parameter, **super(SparkComponent, self)._to_dict()}
        )
        return res

    def _to_ordered_dict_for_yaml_dump(self) -> Dict:
        """Dump the component content into a sorted yaml string.

        :return: The ordered dict
        :rtype: Dict
        """

        obj: dict = super()._to_ordered_dict_for_yaml_dump()
        # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
        if self.code and isinstance(self.code, str):
            obj["code"] = self.code
        return obj

    def _get_environment_id(self) -> Union[str, None]:
        # Return environment id of environment
        # handle case when environment is defined inline
        if isinstance(self.environment, Environment):
            res: Optional[str] = self.environment.id
            return res
        return self.environment

    def __str__(self) -> str:
        try:
            toYaml: str = self._to_yaml()
            return toYaml
        except BaseException:  # pylint: disable=W0718
            toStr: str = super(SparkComponent, self).__str__()
            return toStr