about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark.py
blob: e72f1334a937080a6ed04f7225cecc00044478a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access, too-many-instance-attributes

import copy
import logging
import re
from enum import Enum
from os import PathLike, path
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast

from marshmallow import INCLUDE, Schema

from ..._restclient.v2023_04_01_preview.models import JobBase as JobBaseData
from ..._restclient.v2023_04_01_preview.models import SparkJob as RestSparkJob
from ..._schema import NestedField, PathAwareSchema, UnionField
from ..._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema
from ..._schema.job.parameterized_spark import CONF_KEY_MAP
from ..._schema.job.spark_job import SparkJobSchema
from ..._utils.utils import is_url
from ...constants._common import (
    ARM_ID_PREFIX,
    BASE_PATH_CONTEXT_KEY,
    REGISTRY_URI_FORMAT,
    SPARK_ENVIRONMENT_WARNING_MESSAGE,
)
from ...constants._component import NodeType
from ...constants._job.job import SparkConfKey
from ...entities._assets import Environment
from ...entities._component.component import Component
from ...entities._component.spark_component import SparkComponent
from ...entities._credentials import (
    AmlTokenConfiguration,
    ManagedIdentityConfiguration,
    UserIdentityConfiguration,
    _BaseJobIdentityConfiguration,
)
from ...entities._inputs_outputs import Input, Output
from ...entities._job._input_output_helpers import (
    from_rest_data_outputs,
    from_rest_inputs_to_dataset_literal,
    validate_inputs_for_args,
)
from ...entities._job.spark_job import SparkJob
from ...entities._job.spark_job_entry import SparkJobEntryType
from ...entities._job.spark_resource_configuration import SparkResourceConfiguration
from ...entities._validation import MutableValidationResult
from ...exceptions import ErrorCategory, ErrorTarget, ValidationException
from .._job.pipeline._io import NodeOutput
from .._job.spark_helpers import (
    _validate_compute_or_resources,
    _validate_input_output_mode,
    _validate_spark_configurations,
)
from .._job.spark_job_entry_mixin import SparkJobEntry, SparkJobEntryMixin
from .._util import convert_ordered_dict_to_dict, get_rest_dict_for_node_attrs, load_from_dict, validate_attribute_type
from .base_node import BaseNode

module_logger = logging.getLogger(__name__)


class Spark(BaseNode, SparkJobEntryMixin):
    """Base class for spark node, used for spark component version consumption.

    You should not instantiate this class directly. Instead, you should
    create it from the builder function: spark.

    :param component: The ID or instance of the Spark component or job to be run during the step.
    :type component: Union[str, ~azure.ai.ml.entities.SparkComponent]
    :param identity: The identity that the Spark job will use while running on compute.
    :type identity: Union[Dict[str, str],
        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
        ~azure.ai.ml.entities.AmlTokenConfiguration,
        ~azure.ai.ml.entities.UserIdentityConfiguration

    ]

    :param driver_cores: The number of cores to use for the driver process, only in cluster mode.
    :type driver_cores: int
    :param driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
    :type driver_memory: str
    :param executor_cores: The number of cores to use on each executor.
    :type executor_cores: int
    :param executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
        suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
    :type executor_memory: str
    :param executor_instances: The initial number of executors.
    :type executor_instances: int
    :param dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
        executors registered with this application up and down based on the workload.
    :type dynamic_allocation_enabled: bool
    :param dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation
        is enabled.
    :type dynamic_allocation_min_executors: int
    :param dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation
        is enabled.
    :type dynamic_allocation_max_executors: int
    :param conf: A dictionary with pre-defined Spark configurations key and values.
    :type conf: Dict[str, str]
    :param inputs: A mapping of input names to input data sources used in the job.
    :type inputs: Dict[str, Union[
        str,
        bool,
        int,
        float,
        Enum,
        ~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
        ~azure.ai.ml.Input

    ]]

    :param outputs: A mapping of output names to output data sources used in the job.
    :type outputs: Dict[str, Union[str, ~azure.ai.ml.Output]]
    :param args: The arguments for the job.
    :type args: str
    :param compute: The compute resource the job runs on.
    :type compute: str
    :param resources: The compute resource configuration for the job.
    :type resources: Union[Dict, ~azure.ai.ml.entities.SparkResourceConfiguration]
    :param entry: The file or class entry point.
    :type entry: Dict[str, str]
    :param py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps.
    :type py_files: List[str]
    :param jars: The list of .JAR files to include on the driver and executor classpaths.
    :type jars: List[str]
    :param files: The list of files to be placed in the working directory of each executor.
    :type files: List[str]
    :param archives: The list of archives to be extracted into the working directory of each executor.
    :type archives: List[str]
    """

    def __init__(
        self,
        *,
        component: Union[str, SparkComponent],
        identity: Optional[
            Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
        ] = None,
        driver_cores: Optional[Union[int, str]] = None,
        driver_memory: Optional[str] = None,
        executor_cores: Optional[Union[int, str]] = None,
        executor_memory: Optional[str] = None,
        executor_instances: Optional[Union[int, str]] = None,
        dynamic_allocation_enabled: Optional[Union[bool, str]] = None,
        dynamic_allocation_min_executors: Optional[Union[int, str]] = None,
        dynamic_allocation_max_executors: Optional[Union[int, str]] = None,
        conf: Optional[Dict[str, str]] = None,
        inputs: Optional[
            Dict[
                str,
                Union[
                    NodeOutput,
                    Input,
                    str,
                    bool,
                    int,
                    float,
                    Enum,
                    "Input",
                ],
            ]
        ] = None,
        outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
        compute: Optional[str] = None,
        resources: Optional[Union[Dict, SparkResourceConfiguration]] = None,
        entry: Union[Dict[str, str], SparkJobEntry, None] = None,
        py_files: Optional[List[str]] = None,
        jars: Optional[List[str]] = None,
        files: Optional[List[str]] = None,
        archives: Optional[List[str]] = None,
        args: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        # validate init params are valid type
        validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
        kwargs.pop("type", None)

        BaseNode.__init__(
            self, type=NodeType.SPARK, inputs=inputs, outputs=outputs, component=component, compute=compute, **kwargs
        )

        # init mark for _AttrDict
        self._init = True
        SparkJobEntryMixin.__init__(self, entry=entry)
        self.conf = conf
        self.driver_cores = driver_cores
        self.driver_memory = driver_memory
        self.executor_cores = executor_cores
        self.executor_memory = executor_memory
        self.executor_instances = executor_instances
        self.dynamic_allocation_enabled = dynamic_allocation_enabled
        self.dynamic_allocation_min_executors = dynamic_allocation_min_executors
        self.dynamic_allocation_max_executors = dynamic_allocation_max_executors

        is_spark_component = isinstance(component, SparkComponent)
        if is_spark_component:
            # conf is dict and we need copy component conf here, otherwise node conf setting will affect component
            # setting
            _component = cast(SparkComponent, component)
            self.conf = self.conf or copy.copy(_component.conf)
            self.driver_cores = self.driver_cores or _component.driver_cores
            self.driver_memory = self.driver_memory or _component.driver_memory
            self.executor_cores = self.executor_cores or _component.executor_cores
            self.executor_memory = self.executor_memory or _component.executor_memory
            self.executor_instances = self.executor_instances or _component.executor_instances
            self.dynamic_allocation_enabled = self.dynamic_allocation_enabled or _component.dynamic_allocation_enabled
            self.dynamic_allocation_min_executors = (
                self.dynamic_allocation_min_executors or _component.dynamic_allocation_min_executors
            )
            self.dynamic_allocation_max_executors = (
                self.dynamic_allocation_max_executors or _component.dynamic_allocation_max_executors
            )
        if self.executor_instances is None and str(self.dynamic_allocation_enabled).lower() == "true":
            self.executor_instances = self.dynamic_allocation_min_executors
        # When create standalone job or pipeline job, following fields will always get value from component or get
        # default None, because we will not pass those fields to Spark. But in following cases, we expect to get
        # correct value from spark._from_rest_object() and then following fields will get from their respective
        # keyword arguments.
        # 1. when we call regenerated_spark_node=Spark._from_rest_object(spark_node._to_rest_object()) in local test,
        # we expect regenerated_spark_node and spark_node are identical.
        # 2.when get created remote job through Job._from_rest_object(result) in job operation where component is an
        # arm_id, we expect get remote returned values.
        # 3.when we load a remote job, component now is an arm_id, we need get entry from node level returned from
        # service
        self.entry = _component.entry if is_spark_component else entry
        self.py_files = _component.py_files if is_spark_component else py_files
        self.jars = _component.jars if is_spark_component else jars
        self.files = _component.files if is_spark_component else files
        self.archives = _component.archives if is_spark_component else archives
        self.args = _component.args if is_spark_component else args
        self.environment: Any = _component.environment if is_spark_component else None

        self.resources = resources
        self.identity = identity
        self._swept = False
        self._init = False

    @classmethod
    def _get_supported_outputs_types(cls) -> Tuple:
        return str, Output

    @property
    def component(self) -> Union[str, SparkComponent]:
        """The ID or instance of the Spark component or job to be run during the step.

        :rtype: ~azure.ai.ml.entities.SparkComponent
        """
        res: Union[str, SparkComponent] = self._component
        return res

    @property
    def resources(self) -> Optional[Union[Dict, SparkResourceConfiguration]]:
        """The compute resource configuration for the job.

        :rtype: ~azure.ai.ml.entities.SparkResourceConfiguration
        """
        return self._resources  # type: ignore

    @resources.setter
    def resources(self, value: Optional[Union[Dict, SparkResourceConfiguration]]) -> None:
        """Sets the compute resource configuration for the job.

        :param value: The compute resource configuration for the job.
        :type value: Union[Dict[str, str], ~azure.ai.ml.entities.SparkResourceConfiguration]
        """
        if isinstance(value, dict):
            value = SparkResourceConfiguration(**value)
        self._resources = value

    @property
    def identity(
        self,
    ) -> Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]:
        """The identity that the Spark job will use while running on compute.

        :rtype: Union[~azure.ai.ml.entities.ManagedIdentityConfiguration, ~azure.ai.ml.entities.AmlTokenConfiguration,
            ~azure.ai.ml.entities.UserIdentityConfiguration]
        """
        # If there is no identity from CLI/SDK input: for jobs running on synapse compute (MLCompute Clusters), the
        # managed identity is the default; for jobs running on clusterless, the user identity should be the default,
        # otherwise use user input identity.
        if self._identity is None:
            if self.compute is not None:
                return ManagedIdentityConfiguration()
            if self.resources is not None:
                return UserIdentityConfiguration()
        return self._identity

    @identity.setter
    def identity(
        self,
        value: Union[Dict[str, str], ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration],
    ) -> None:
        """Sets the identity that the Spark job will use while running on compute.

        :param value: The identity that the Spark job will use while running on compute.
        :type value: Union[Dict[str, str], ~azure.ai.ml.entities.ManagedIdentityConfiguration,
            ~azure.ai.ml.entities.AmlTokenConfiguration, ~azure.ai.ml.entities.UserIdentityConfiguration]
        """
        if isinstance(value, dict):
            identify_schema = UnionField(
                [
                    NestedField(ManagedIdentitySchema, unknown=INCLUDE),
                    NestedField(AMLTokenIdentitySchema, unknown=INCLUDE),
                    NestedField(UserIdentitySchema, unknown=INCLUDE),
                ]
            )
            value = identify_schema._deserialize(value=value, attr=None, data=None)
        self._identity = value

    @property
    def code(self) -> Optional[Union[str, PathLike]]:
        """The local or remote path pointing at source code.

        :rtype: Union[str, PathLike]
        """
        if isinstance(self.component, Component):
            _code: Optional[Union[str, PathLike]] = self.component.code
            return _code
        return None

    @code.setter
    def code(self, value: str) -> None:
        """Sets the source code to be used for the job.

        :param value: The local or remote path pointing at source code.
        :type value: Union[str, PathLike]
        """
        if isinstance(self.component, Component):
            self.component.code = value
        else:
            msg = "Can't set code property for a registered component {}"
            raise ValidationException(
                message=msg.format(self.component),
                no_personal_data_message=msg.format(self.component),
                target=ErrorTarget.SPARK_JOB,
                error_category=ErrorCategory.USER_ERROR,
            )

    @classmethod
    def _from_rest_object_to_init_params(cls, obj: dict) -> Dict:
        obj = super()._from_rest_object_to_init_params(obj)

        if "resources" in obj and obj["resources"]:
            obj["resources"] = SparkResourceConfiguration._from_rest_object(obj["resources"])

        if "identity" in obj and obj["identity"]:
            obj["identity"] = _BaseJobIdentityConfiguration._from_rest_object(obj["identity"])

        if "entry" in obj and obj["entry"]:
            obj["entry"] = SparkJobEntry._from_rest_object(obj["entry"])
        if "conf" in obj and obj["conf"]:
            # get conf setting value from conf
            for field_name, _ in CONF_KEY_MAP.items():
                value = obj["conf"].get(field_name, None)
                if value is not None:
                    obj[field_name] = value

        return obj

    @classmethod
    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Spark":
        from .spark_func import spark

        loaded_data = load_from_dict(SparkJobSchema, data, context, additional_message, **kwargs)
        spark_job: Spark = spark(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)

        return spark_job

    @classmethod
    def _load_from_rest_job(cls, obj: JobBaseData) -> "Spark":
        from .spark_func import spark

        rest_spark_job: RestSparkJob = obj.properties
        rest_spark_conf = copy.copy(rest_spark_job.conf) or {}

        spark_job: Spark = spark(
            name=obj.name,
            id=obj.id,
            entry=SparkJobEntry._from_rest_object(rest_spark_job.entry),
            display_name=rest_spark_job.display_name,
            description=rest_spark_job.description,
            tags=rest_spark_job.tags,
            properties=rest_spark_job.properties,
            experiment_name=rest_spark_job.experiment_name,
            services=rest_spark_job.services,
            status=rest_spark_job.status,
            creation_context=obj.system_data,
            code=rest_spark_job.code_id,
            compute=rest_spark_job.compute_id,
            environment=rest_spark_job.environment_id,
            identity=(
                _BaseJobIdentityConfiguration._from_rest_object(rest_spark_job.identity)
                if rest_spark_job.identity
                else None
            ),
            args=rest_spark_job.args,
            conf=rest_spark_conf,
            driver_cores=rest_spark_conf.get(
                SparkConfKey.DRIVER_CORES, None
            ),  # copy fields from conf into the promote attribute in spark
            driver_memory=rest_spark_conf.get(SparkConfKey.DRIVER_MEMORY, None),
            executor_cores=rest_spark_conf.get(SparkConfKey.EXECUTOR_CORES, None),
            executor_memory=rest_spark_conf.get(SparkConfKey.EXECUTOR_MEMORY, None),
            executor_instances=rest_spark_conf.get(SparkConfKey.EXECUTOR_INSTANCES, None),
            dynamic_allocation_enabled=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None),
            dynamic_allocation_min_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None),
            dynamic_allocation_max_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None),
            resources=SparkResourceConfiguration._from_rest_object(rest_spark_job.resources),
            inputs=from_rest_inputs_to_dataset_literal(rest_spark_job.inputs),
            outputs=from_rest_data_outputs(rest_spark_job.outputs),
        )
        return spark_job

    @classmethod
    def _attr_type_map(cls) -> dict:
        return {
            # hack: allow use InternalSparkComponent as component
            # "component": (str, SparkComponent),
            "environment": (str, Environment),
            "resources": (dict, SparkResourceConfiguration),
            "code": (str, PathLike),
        }

    @property
    def _skip_required_compute_missing_validation(self) -> bool:
        return self.resources is not None

    def _to_job(self) -> SparkJob:
        if isinstance(self.component, SparkComponent):
            return SparkJob(
                experiment_name=self.experiment_name,
                name=self.name,
                display_name=self.display_name,
                description=self.description,
                tags=self.tags,
                code=self.component.code,
                entry=self.entry,
                py_files=self.py_files,
                jars=self.jars,
                files=self.files,
                archives=self.archives,
                identity=self.identity,
                driver_cores=self.driver_cores,
                driver_memory=self.driver_memory,
                executor_cores=self.executor_cores,
                executor_memory=self.executor_memory,
                executor_instances=self.executor_instances,
                dynamic_allocation_enabled=self.dynamic_allocation_enabled,
                dynamic_allocation_min_executors=self.dynamic_allocation_min_executors,
                dynamic_allocation_max_executors=self.dynamic_allocation_max_executors,
                conf=self.conf,
                environment=self.environment,
                status=self.status,
                inputs=self._job_inputs,
                outputs=self._job_outputs,
                services=self.services,
                args=self.args,
                compute=self.compute,
                resources=self.resources,
            )

        return SparkJob(
            experiment_name=self.experiment_name,
            name=self.name,
            display_name=self.display_name,
            description=self.description,
            tags=self.tags,
            code=self.component,
            entry=self.entry,
            py_files=self.py_files,
            jars=self.jars,
            files=self.files,
            archives=self.archives,
            identity=self.identity,
            driver_cores=self.driver_cores,
            driver_memory=self.driver_memory,
            executor_cores=self.executor_cores,
            executor_memory=self.executor_memory,
            executor_instances=self.executor_instances,
            dynamic_allocation_enabled=self.dynamic_allocation_enabled,
            dynamic_allocation_min_executors=self.dynamic_allocation_min_executors,
            dynamic_allocation_max_executors=self.dynamic_allocation_max_executors,
            conf=self.conf,
            environment=self.environment,
            status=self.status,
            inputs=self._job_inputs,
            outputs=self._job_outputs,
            services=self.services,
            args=self.args,
            compute=self.compute,
            resources=self.resources,
        )

    @classmethod
    def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
        from azure.ai.ml._schema.pipeline import SparkSchema

        return SparkSchema(context=context)

    @classmethod
    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
        return [
            "type",
            "resources",
            "py_files",
            "jars",
            "files",
            "archives",
            "identity",
            "conf",
            "args",
        ]

    def _to_rest_object(self, **kwargs: Any) -> dict:
        rest_obj: dict = super()._to_rest_object(**kwargs)
        rest_obj.update(
            convert_ordered_dict_to_dict(
                {
                    "componentId": self._get_component_id(),
                    "identity": get_rest_dict_for_node_attrs(self.identity),
                    "resources": get_rest_dict_for_node_attrs(self.resources),
                    "entry": get_rest_dict_for_node_attrs(self.entry),
                }
            )
        )
        return rest_obj

    def _build_inputs(self) -> dict:
        inputs = super(Spark, self)._build_inputs()
        built_inputs = {}
        # Validate and remove non-specified inputs
        for key, value in inputs.items():
            if value is not None:
                built_inputs[key] = value
        return built_inputs

    def _customized_validate(self) -> MutableValidationResult:
        result = super()._customized_validate()
        if (
            isinstance(self.component, SparkComponent)
            and isinstance(self.component._environment, Environment)
            and self.component._environment.image is not None
        ):
            result.append_warning(
                yaml_path="environment.image",
                message=SPARK_ENVIRONMENT_WARNING_MESSAGE,
            )
        result.merge_with(self._validate_entry_exist())
        result.merge_with(self._validate_fields())
        return result

    def _validate_entry_exist(self) -> MutableValidationResult:
        is_remote_code = isinstance(self.code, str) and (
            self.code.startswith("git+")
            or self.code.startswith(REGISTRY_URI_FORMAT)
            or self.code.startswith(ARM_ID_PREFIX)
            or is_url(self.code)
            or bool(self.CODE_ID_RE_PATTERN.match(self.code))
        )
        validation_result = self._create_empty_validation_result()
        # validate whether component entry exists to ensure code path is correct, especially when code is default value
        if self.code is None or is_remote_code or not isinstance(self.entry, SparkJobEntry):
            # skip validate when code is not a local path or code is None, or self.entry is not SparkJobEntry object
            pass
        else:
            if not path.isabs(self.code):
                _component: SparkComponent = self.component  # type: ignore
                code_path = Path(_component.base_path) / self.code
                if code_path.exists():
                    code_path = code_path.resolve().absolute()
                else:
                    validation_result.append_error(
                        message=f"Code path {code_path} doesn't exist.", yaml_path="component.code"
                    )
                entry_path = code_path / self.entry.entry
            else:
                entry_path = Path(self.code) / self.entry.entry

            if (
                isinstance(self.entry, SparkJobEntry)
                and self.entry.entry_type == SparkJobEntryType.SPARK_JOB_FILE_ENTRY
            ):
                if not entry_path.exists():
                    validation_result.append_error(
                        message=f"Entry {entry_path} doesn't exist.", yaml_path="component.entry"
                    )
        return validation_result

    def _validate_fields(self) -> MutableValidationResult:
        validation_result = self._create_empty_validation_result()
        try:
            _validate_compute_or_resources(self.compute, self.resources)
        except ValidationException as e:
            validation_result.append_error(message=str(e), yaml_path="resources")
            validation_result.append_error(message=str(e), yaml_path="compute")

        try:
            _validate_input_output_mode(self.inputs, self.outputs)
        except ValidationException as e:
            msg = str(e)
            m = re.match(r"(Input|Output) '(\w+)'", msg)
            if m:
                io_type, io_name = m.groups()
                if io_type == "Input":
                    validation_result.append_error(message=msg, yaml_path=f"inputs.{io_name}")
                else:
                    validation_result.append_error(message=msg, yaml_path=f"outputs.{io_name}")

        try:
            _validate_spark_configurations(self)
        except ValidationException as e:
            validation_result.append_error(message=str(e), yaml_path="conf")

        try:
            self._validate_entry()
        except ValidationException as e:
            validation_result.append_error(message=str(e), yaml_path="entry")

        if self.args:
            try:
                validate_inputs_for_args(self.args, self.inputs)
            except ValidationException as e:
                validation_result.append_error(message=str(e), yaml_path="args")
        return validation_result

    # pylint: disable-next=docstring-missing-param
    def __call__(self, *args: Any, **kwargs: Any) -> "Spark":
        """Call Spark as a function will return a new instance each time.

        :return: A Spark object
        :rtype: Spark
        """
        if isinstance(self._component, Component):
            # call this to validate inputs
            node: Spark = self._component(*args, **kwargs)
            # merge inputs
            for name, original_input in self.inputs.items():
                if name not in kwargs:
                    # use setattr here to make sure owner of input won't change
                    setattr(node.inputs, name, original_input._data)
                    node._job_inputs[name] = original_input._data
                # get outputs
            for name, original_output in self.outputs.items():
                # use setattr here to make sure owner of output won't change
                if not isinstance(original_output, str):
                    setattr(node.outputs, name, original_output._data)
            self._refine_optional_inputs_with_no_value(node, kwargs)
            node._name = self.name
            node.compute = self.compute
            node.environment = copy.deepcopy(self.environment)
            node.resources = copy.deepcopy(self.resources)
            return node

        msg = "Spark can be called as a function only when referenced component is {}, currently got {}."
        raise ValidationException(
            message=msg.format(type(Component), self._component),
            no_personal_data_message=msg.format(type(Component), "self._component"),
            target=ErrorTarget.SPARK_JOB,
        )