1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
|
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access, too-many-instance-attributes
import copy
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from marshmallow import INCLUDE
from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase
from azure.ai.ml._restclient.v2023_04_01_preview.models import SparkJob as RestSparkJob
from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema
from azure.ai.ml._schema.job.parameterized_spark import CONF_KEY_MAP
from azure.ai.ml._schema.job.spark_job import SparkJobSchema
from azure.ai.ml.constants import JobType
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, TYPE
from azure.ai.ml.constants._job.job import SparkConfKey
from azure.ai.ml.entities._credentials import (
AmlTokenConfiguration,
ManagedIdentityConfiguration,
UserIdentityConfiguration,
_BaseJobIdentityConfiguration,
)
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._job._input_output_helpers import (
from_rest_data_outputs,
from_rest_inputs_to_dataset_literal,
to_rest_data_outputs,
to_rest_dataset_literal_inputs,
validate_inputs_for_args,
)
from azure.ai.ml.entities._job.parameterized_spark import ParameterizedSpark
from azure.ai.ml.entities._util import load_from_dict
from ..._schema import NestedField, UnionField
from .job import Job
from .job_io_mixin import JobIOMixin
from .spark_helpers import _validate_compute_or_resources, _validate_input_output_mode, _validate_spark_configurations
from .spark_job_entry import SparkJobEntry
from .spark_job_entry_mixin import SparkJobEntryMixin
from .spark_resource_configuration import SparkResourceConfiguration
# avoid circular import error
if TYPE_CHECKING:
from azure.ai.ml.entities import SparkComponent
from azure.ai.ml.entities._builders import Spark
module_logger = logging.getLogger(__name__)
class SparkJob(Job, ParameterizedSpark, JobIOMixin, SparkJobEntryMixin):
"""A standalone Spark job.
:keyword driver_cores: The number of cores to use for the driver process, only in cluster mode.
:paramtype driver_cores: Optional[int]
:keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
:paramtype driver_memory: Optional[str]
:keyword executor_cores: The number of cores to use on each executor.
:paramtype executor_cores: Optional[int]
:keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
:paramtype executor_memory: Optional[str]
:keyword executor_instances: The initial number of executors.
:paramtype executor_instances: Optional[int]
:keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
executors registered with this application up and down based on the workload.
:paramtype dynamic_allocation_enabled: Optional[bool]
:keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is
enabled.
:paramtype dynamic_allocation_min_executors: Optional[int]
:keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is
enabled.
:paramtype dynamic_allocation_max_executors: Optional[int]
:keyword inputs: The mapping of input data bindings used in the job.
:paramtype inputs: Optional[dict[str, ~azure.ai.ml.Input]]
:keyword outputs: The mapping of output data bindings used in the job.
:paramtype outputs: Optional[dict[str, ~azure.ai.ml.Output]]
:keyword compute: The compute resource the job runs on.
:paramtype compute: Optional[str]
:keyword identity: The identity that the Spark job will use while running on compute.
:paramtype identity: Optional[Union[dict[str, str], ~azure.ai.ml.ManagedIdentityConfiguration,
~azure.ai.ml.AmlTokenConfiguration, ~azure.ai.ml.UserIdentityConfiguration]]
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_spark_configurations.py
:start-after: [START spark_job_configuration]
:end-before: [END spark_job_configuration]
:language: python
:dedent: 8
:caption: Configuring a SparkJob.
"""
def __init__(
self,
*,
driver_cores: Optional[Union[int, str]] = None,
driver_memory: Optional[str] = None,
executor_cores: Optional[Union[int, str]] = None,
executor_memory: Optional[str] = None,
executor_instances: Optional[Union[int, str]] = None,
dynamic_allocation_enabled: Optional[Union[bool, str]] = None,
dynamic_allocation_min_executors: Optional[Union[int, str]] = None,
dynamic_allocation_max_executors: Optional[Union[int, str]] = None,
inputs: Optional[Dict[str, Union[Input, str, bool, int, float]]] = None,
outputs: Optional[Dict[str, Output]] = None,
compute: Optional[str] = None,
identity: Optional[
Union[Dict[str, str], ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
] = None,
resources: Optional[Union[Dict, SparkResourceConfiguration]] = None,
**kwargs: Any,
) -> None:
kwargs[TYPE] = JobType.SPARK
super().__init__(**kwargs)
self.conf: Dict = self.conf or {}
self.properties_sparkJob = self.properties or {}
self.driver_cores = driver_cores
self.driver_memory = driver_memory
self.executor_cores = executor_cores
self.executor_memory = executor_memory
self.executor_instances = executor_instances
self.dynamic_allocation_enabled = dynamic_allocation_enabled
self.dynamic_allocation_min_executors = dynamic_allocation_min_executors
self.dynamic_allocation_max_executors = dynamic_allocation_max_executors
self.inputs = inputs # type: ignore[assignment]
self.outputs = outputs # type: ignore[assignment]
self.compute = compute
self.resources = resources
self.identity = identity
if self.executor_instances is None and str(self.dynamic_allocation_enabled).lower() == "true":
self.executor_instances = self.dynamic_allocation_min_executors
@property
def resources(self) -> Optional[Union[Dict, SparkResourceConfiguration]]:
"""The compute resource configuration for the job.
:return: The compute resource configuration for the job.
:rtype: Optional[~azure.ai.ml.entities.SparkResourceConfiguration]
"""
return self._resources
@resources.setter
def resources(self, value: Optional[Union[Dict[str, str], SparkResourceConfiguration]]) -> None:
"""Sets the compute resource configuration for the job.
:param value: The compute resource configuration for the job.
:type value: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkResourceConfiguration]]
"""
if isinstance(value, dict):
value = SparkResourceConfiguration(**value)
self._resources = value
@property
def identity(
self,
) -> Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]:
"""The identity that the Spark job will use while running on compute.
:return: The identity that the Spark job will use while running on compute.
:rtype: Optional[Union[~azure.ai.ml.ManagedIdentityConfiguration, ~azure.ai.ml.AmlTokenConfiguration,
~azure.ai.ml.UserIdentityConfiguration]]
"""
return self._identity
@identity.setter
def identity(
self,
value: Optional[
Union[Dict[str, str], ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
],
) -> None:
"""Sets the identity that the Spark job will use while running on compute.
:param value: The identity that the Spark job will use while running on compute.
:type value: Optional[Union[dict[str, str], ~azure.ai.ml.ManagedIdentityConfiguration,
~azure.ai.ml.AmlTokenConfiguration, ~azure.ai.ml.UserIdentityConfiguration]]
"""
if isinstance(value, dict):
identify_schema = UnionField(
[
NestedField(ManagedIdentitySchema, unknown=INCLUDE),
NestedField(AMLTokenIdentitySchema, unknown=INCLUDE),
NestedField(UserIdentitySchema, unknown=INCLUDE),
]
)
value = identify_schema._deserialize(value=value, attr=None, data=None)
self._identity = value
def _to_dict(self) -> Dict:
res: dict = SparkJobSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self)
return res
def filter_conf_fields(self) -> Dict[str, str]:
"""Filters out the fields of the conf attribute that are not among the Spark configuration fields
listed in ~azure.ai.ml._schema.job.parameterized_spark.CONF_KEY_MAP and returns them in their own dictionary.
:return: A dictionary of the conf fields that are not Spark configuration fields.
:rtype: dict[str, str]
"""
if self.conf is None:
return {}
data_conf = {}
for conf_key, conf_val in self.conf.items():
if not conf_key in CONF_KEY_MAP:
data_conf[conf_key] = conf_val
return data_conf
def _to_rest_object(self) -> JobBase:
self._validate()
conf = {
**(self.filter_conf_fields()),
"spark.driver.cores": self.driver_cores,
"spark.driver.memory": self.driver_memory,
"spark.executor.cores": self.executor_cores,
"spark.executor.memory": self.executor_memory,
}
if self.dynamic_allocation_enabled in ["True", "true", True]:
conf["spark.dynamicAllocation.enabled"] = True
conf["spark.dynamicAllocation.minExecutors"] = self.dynamic_allocation_min_executors
conf["spark.dynamicAllocation.maxExecutors"] = self.dynamic_allocation_max_executors
if self.executor_instances is not None:
conf["spark.executor.instances"] = self.executor_instances
properties = RestSparkJob(
experiment_name=self.experiment_name,
display_name=self.display_name,
description=self.description,
tags=self.tags,
code_id=self.code,
entry=self.entry._to_rest_object() if self.entry is not None and not isinstance(self.entry, dict) else None,
py_files=self.py_files,
jars=self.jars,
files=self.files,
archives=self.archives,
identity=(
self.identity._to_job_rest_object() if self.identity and not isinstance(self.identity, dict) else None
),
conf=conf,
properties=self.properties_sparkJob,
environment_id=self.environment,
inputs=to_rest_dataset_literal_inputs(self.inputs, job_type=self.type),
outputs=to_rest_data_outputs(self.outputs),
args=self.args,
compute_id=self.compute,
resources=(
self.resources._to_rest_object() if self.resources and not isinstance(self.resources, Dict) else None
),
)
result = JobBase(properties=properties)
result.name = self.name
return result
@classmethod
def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "SparkJob":
loaded_data = load_from_dict(SparkJobSchema, data, context, additional_message, **kwargs)
return SparkJob(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
@classmethod
def _load_from_rest(cls, obj: JobBase) -> "SparkJob":
rest_spark_job: RestSparkJob = obj.properties
rest_spark_conf = copy.copy(rest_spark_job.conf) or {}
spark_job = SparkJob(
name=obj.name,
entry=SparkJobEntry._from_rest_object(rest_spark_job.entry),
experiment_name=rest_spark_job.experiment_name,
id=obj.id,
display_name=rest_spark_job.display_name,
description=rest_spark_job.description,
tags=rest_spark_job.tags,
properties=rest_spark_job.properties,
services=rest_spark_job.services,
status=rest_spark_job.status,
creation_context=obj.system_data,
code=rest_spark_job.code_id,
compute=rest_spark_job.compute_id,
environment=rest_spark_job.environment_id,
identity=(
_BaseJobIdentityConfiguration._from_rest_object(rest_spark_job.identity)
if rest_spark_job.identity
else None
),
args=rest_spark_job.args,
conf=rest_spark_conf,
driver_cores=rest_spark_conf.get(
SparkConfKey.DRIVER_CORES, None
), # copy fields from conf into the promote attribute in spark
driver_memory=rest_spark_conf.get(SparkConfKey.DRIVER_MEMORY, None),
executor_cores=rest_spark_conf.get(SparkConfKey.EXECUTOR_CORES, None),
executor_memory=rest_spark_conf.get(SparkConfKey.EXECUTOR_MEMORY, None),
executor_instances=rest_spark_conf.get(SparkConfKey.EXECUTOR_INSTANCES, None),
dynamic_allocation_enabled=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None),
dynamic_allocation_min_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None),
dynamic_allocation_max_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None),
resources=SparkResourceConfiguration._from_rest_object(rest_spark_job.resources),
inputs=from_rest_inputs_to_dataset_literal(rest_spark_job.inputs),
outputs=from_rest_data_outputs(rest_spark_job.outputs),
)
return spark_job
def _to_component(self, context: Optional[Dict] = None, **kwargs: Any) -> "SparkComponent":
"""Translate a spark job to component.
:param context: Context of spark job YAML file.
:type context: dict
:return: Translated spark component.
:rtype: SparkComponent
"""
from azure.ai.ml.entities import SparkComponent
pipeline_job_dict = kwargs.get("pipeline_job_dict", {})
context = context or {BASE_PATH_CONTEXT_KEY: Path("./")}
# Create anonymous spark component with default version as 1
return SparkComponent(
tags=self.tags,
is_anonymous=True,
base_path=context[BASE_PATH_CONTEXT_KEY],
description=self.description,
code=self.code,
entry=self.entry,
py_files=self.py_files,
jars=self.jars,
files=self.files,
archives=self.archives,
driver_cores=self.driver_cores,
driver_memory=self.driver_memory,
executor_cores=self.executor_cores,
executor_memory=self.executor_memory,
executor_instances=self.executor_instances,
dynamic_allocation_enabled=self.dynamic_allocation_enabled,
dynamic_allocation_min_executors=self.dynamic_allocation_min_executors,
dynamic_allocation_max_executors=self.dynamic_allocation_max_executors,
conf=self.conf,
properties=self.properties_sparkJob,
environment=self.environment,
inputs=self._to_inputs(inputs=self.inputs, pipeline_job_dict=pipeline_job_dict),
outputs=self._to_outputs(outputs=self.outputs, pipeline_job_dict=pipeline_job_dict),
args=self.args,
)
def _to_node(self, context: Optional[Dict] = None, **kwargs: Any) -> "Spark":
"""Translate a spark job to a pipeline node.
:param context: Context of spark job YAML file.
:type context: dict
:return: Translated spark component.
:rtype: Spark
"""
from azure.ai.ml.entities._builders import Spark
component = self._to_component(context, **kwargs)
return Spark(
display_name=self.display_name,
description=self.description,
tags=self.tags,
# code, entry, py_files, jars, files, archives, environment and args are static and not allowed to be
# overwritten. And we will always get them from component.
component=component,
identity=self.identity,
driver_cores=self.driver_cores,
driver_memory=self.driver_memory,
executor_cores=self.executor_cores,
executor_memory=self.executor_memory,
executor_instances=self.executor_instances,
dynamic_allocation_enabled=self.dynamic_allocation_enabled,
dynamic_allocation_min_executors=self.dynamic_allocation_min_executors,
dynamic_allocation_max_executors=self.dynamic_allocation_max_executors,
conf=self.conf,
inputs=self.inputs, # type: ignore[arg-type]
outputs=self.outputs, # type: ignore[arg-type]
compute=self.compute,
resources=self.resources,
properties=self.properties_sparkJob,
)
def _validate(self) -> None:
# TODO: make spark job schema validatable?
if self.resources and not isinstance(self.resources, Dict):
self.resources._validate()
_validate_compute_or_resources(self.compute, self.resources)
_validate_input_output_mode(self.inputs, self.outputs)
_validate_spark_configurations(self)
self._validate_entry()
if self.args:
validate_inputs_for_args(self.args, self.inputs)
|