1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from typing import Dict, List, Optional, Union
from marshmallow import Schema
from ..._schema import PathAwareSchema
from ...constants._job.job import RestSparkConfKey
from ...entities import Environment, SparkJobEntry
from ...entities._job.parameterized_spark import DUMMY_IMAGE, ParameterizedSpark
from ...entities._job.spark_job_entry_mixin import SparkJobEntryMixin
from .._schema.component import InternalSparkComponentSchema
from ..entities import InternalComponent
from .environment import InternalEnvironment
class InternalSparkComponent(
InternalComponent, ParameterizedSpark, SparkJobEntryMixin
): # pylint: disable=too-many-instance-attributes, too-many-ancestors
"""Internal Spark Component
This class is used to handle internal spark component.
It can be loaded from internal spark component yaml or from rest object of an internal spark component.
But after loaded, its structure will be the same as spark component.
"""
def __init__(
self,
entry: Union[Dict[str, str], SparkJobEntry, None] = None,
py_files: Optional[List[str]] = None,
jars: Optional[List[str]] = None,
files: Optional[List[str]] = None,
archives: Optional[List[str]] = None,
driver_cores: Optional[int] = None,
driver_memory: Optional[str] = None,
executor_cores: Optional[int] = None,
executor_memory: Optional[str] = None,
executor_instances: Optional[int] = None,
dynamic_allocation_enabled: Optional[bool] = None,
dynamic_allocation_min_executors: Optional[int] = None,
dynamic_allocation_max_executors: Optional[int] = None,
conf: Optional[Dict[str, str]] = None,
args: Optional[str] = None,
**kwargs,
):
SparkJobEntryMixin.__init__(self, entry=entry, **kwargs)
# environment.setter has been overridden in ParameterizedSpark, so we need to pop it out here
environment = kwargs.pop("environment", None)
InternalComponent.__init__(self, **kwargs)
# Pop it to avoid passing multiple values for code in ParameterizedSpark.__init__
code = kwargs.pop("code", None)
ParameterizedSpark.__init__(
self,
code=self.base_path,
entry=entry,
py_files=py_files,
jars=jars,
files=files,
archives=archives,
conf=conf,
environment=environment,
args=args,
**kwargs,
)
self.code = code
# For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf.
# If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent
# verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node
# in pipeline sdk
conf = conf or {}
self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None)
self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None)
self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None)
self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None)
self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None)
self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get(
RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None
)
self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get(
RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None
)
self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get(
RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None
)
self.conf = conf
self.args = args
@classmethod
def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
return InternalSparkComponentSchema(context=context)
@property # type: ignore[override]
def environment(self) -> Optional[Union[Environment, str]]:
"""Get the environment of the component.
:return: The environment of the component.
:rtype: Optional[Union[Environment, str]]]
"""
if isinstance(self._environment, Environment) and self._environment.image is None:
return Environment(conda_file=self._environment.conda_file, image=DUMMY_IMAGE)
return self._environment
@environment.setter
def environment(self, value):
"""Set the environment of the component.
:param value: The environment of the component.
:type value: Union[str, Environment, dict]
:return: No return
:rtype: None
"""
if value is None or isinstance(value, (str, Environment)):
self._environment = value
elif isinstance(value, dict):
internal_environment = InternalEnvironment(**value)
internal_environment.resolve(self.base_path)
self._environment = Environment(
name=internal_environment.name,
version=internal_environment.version,
)
if internal_environment.conda:
self._environment.conda_file = {
"dependencies": internal_environment.conda[InternalEnvironment.CONDA_DEPENDENCIES]
}
if internal_environment.docker:
self._environment.image = internal_environment.docker["image"]
# we suppose that loaded internal spark component won't be used to create another internal spark component
# so the environment construction here can be simplified
else:
raise ValueError(f"Unsupported environment type: {type(value)}")
@property
def jars(self) -> Optional[List[str]]:
"""Get the jars of the component.
:return: The jars of the component.
:rtype: Optional[List[str]]
"""
return self._jars
@jars.setter
def jars(self, value: Union[str, List[str]]):
"""Set the jars of the component.
:param value: The jars of the component.
:type value: Union[str, List[str]]
:return: No return
:rtype: None
"""
if isinstance(value, str):
value = [value]
self._jars = value
@property
def py_files(self) -> Optional[List[str]]:
"""Get the py_files of the component.
:return: The py_files of the component.
:rtype: Optional[List[str]]
"""
return self._py_files
@py_files.setter
def py_files(self, value):
"""Set the py_files of the component.
:param value: The py_files of the component.
:type value: Union[str, List[str]]
:return: No return
:rtype: None
"""
if isinstance(value, str):
value = [value]
self._py_files = value
def _to_dict(self) -> Dict:
result = super()._to_dict()
return result
def _to_rest_object(self):
result = super()._to_rest_object()
if "pyFiles" in result.properties.component_spec:
result.properties.component_spec["py_files"] = result.properties.component_spec.pop("pyFiles")
return result
@classmethod
def _from_rest_object_to_init_params(cls, obj) -> Dict:
if "py_files" in obj.properties.component_spec:
obj.properties.component_spec["pyFiles"] = obj.properties.component_spec.pop("py_files")
result = super()._from_rest_object_to_init_params(obj)
return result
|