# --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- # pylint: disable=protected-access from enum import Enum from typing import Dict, List, Optional, Union from marshmallow import Schema from ... import Input, Output from ..._schema import PathAwareSchema from ...constants import JobType from ...entities import Component, Job from ...entities._builders import BaseNode from ...entities._job.pipeline._io import NodeInput, NodeOutput, PipelineInput from ...entities._util import convert_ordered_dict_to_dict from .._schema.component import NodeType class InternalBaseNode(BaseNode): """Base class for node of internal components in pipeline. Can be instantiated directly. :param type: Type of pipeline node :type type: str :param component: Id or instance of the component version to be run for the step :type component: Union[Component, str] :param inputs: Inputs to the node. :type inputs: Dict[str, Union[Input, str, bool, int, float, Enum, dict]] :param outputs: Mapping of output data bindings used in the job. :type outputs: Dict[str, Union[str, Output, dict]] :param properties: The job property dictionary. :type properties: dict[str, str] :param compute: Compute definition containing the compute information for the step :type compute: str """ def __init__( self, *, type: str = JobType.COMPONENT, # pylint: disable=redefined-builtin component: Union[Component, str], inputs: Optional[ Dict[ str, Union[ PipelineInput, NodeOutput, Input, str, bool, int, float, Enum, "Input", ], ] ] = None, outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None, properties: Optional[Dict] = None, compute: Optional[str] = None, **kwargs, ): kwargs.pop("type", None) BaseNode.__init__( self, type=type, component=component, # type: ignore[arg-type] # TODO: Bug 2881892 inputs=inputs, outputs=outputs, compute=compute, properties=properties, **kwargs, ) @property def _skip_required_compute_missing_validation(self) -> bool: return True def _to_node(self, context: Optional[Dict] = None, **kwargs) -> BaseNode: return self def _to_component(self, context: Optional[Dict] = None, **kwargs) -> Component: return self.component def _to_job(self) -> Job: raise RuntimeError("Internal components doesn't support to job") @classmethod def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "Job": raise RuntimeError("Internal components doesn't support load from dict") @classmethod def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: from .._schema.node import InternalBaseNodeSchema return InternalBaseNodeSchema(context=context) @property def component(self) -> Component: return self._component def _to_rest_inputs(self) -> Dict[str, Dict]: rest_dataset_literal_inputs = super(InternalBaseNode, self)._to_rest_inputs() for input_name, input_value in self.inputs.items(): # hack: remove unfilled input from rest object instead a default input of {"job_input_type": "literal"} # note that this hack is not always effective as _data will be set to Input() when visiting input_value.type if ( isinstance(input_value, NodeInput) and input_value._data is None and input_name in rest_dataset_literal_inputs ): del rest_dataset_literal_inputs[input_name] return rest_dataset_literal_inputs def _to_rest_object(self, **kwargs) -> dict: base_dict = super(InternalBaseNode, self)._to_rest_object(**kwargs) for key in ["name", "display_name", "tags"]: if key in base_dict: del base_dict[key] for key in ["computeId"]: if key in base_dict and base_dict[key] is None: del base_dict[key] base_dict.update( convert_ordered_dict_to_dict( { "componentId": self._get_component_id(), "type": self.type, } ) ) return base_dict class DataTransfer(InternalBaseNode): def __init__(self, **kwargs): kwargs.pop("type", None) super(DataTransfer, self).__init__(type=NodeType.DATA_TRANSFER, **kwargs) class HDInsight(InternalBaseNode): def __init__(self, **kwargs): kwargs.pop("type", None) super(HDInsight, self).__init__(type=NodeType.HDI, **kwargs) self._init = True self._compute_name: str = kwargs.pop("compute_name", None) self._queue: str = kwargs.pop("queue", None) self._driver_memory: str = kwargs.pop("driver_memory", None) self._driver_cores: int = kwargs.pop("driver_cores", None) self._executor_memory: str = kwargs.pop("executor_memory", None) self._executor_cores: int = kwargs.pop("executor_cores", None) self._number_executors: int = kwargs.pop("number_executors", None) self._conf: Union[dict, str] = kwargs.pop("conf", None) self._hdinsight_spark_job_name: str = kwargs.pop("hdinsight_spark_job_name", None) self._init = False @property def compute_name(self) -> str: """Name of the compute to be used. :return: Compute name :rtype: str """ return self._compute_name @compute_name.setter def compute_name(self, value: str): self._compute_name = value @property def queue(self) -> str: """The name of the YARN queue to which submitted. :return: YARN queue name :rtype: str """ return self._queue @queue.setter def queue(self, value: str): self._queue = value @property def driver_memory(self) -> str: """Amount of memory to use for the driver process. It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte, megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g. :return: Amount of memory to use for the driver process :rtype: str """ return self._driver_memory @driver_memory.setter def driver_memory(self, value: str): self._driver_memory = value @property def driver_cores(self) -> int: """Number of cores to use for the driver process. :return: Number of cores to use for the driver process. :rtype: int """ return self._driver_cores @driver_cores.setter def driver_cores(self, value: int): self._driver_cores = value @property def executor_memory(self) -> str: """Amount of memory to use per executor process. It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte, megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g. :return: The executor memory :rtype: str """ return self._executor_memory @executor_memory.setter def executor_memory(self, value: str): self._executor_memory = value @property def executor_cores(self) -> int: """Number of cores to use for each executor. :return: The number of cores to use for each executor :rtype: int """ return self._executor_cores @executor_cores.setter def executor_cores(self, value: int): self._executor_cores = value @property def number_executors(self) -> int: """Number of executors to launch for this session. :return: The number of executors to launch :rtype: int """ return self._number_executors @number_executors.setter def number_executors(self, value: int): self._number_executors = value @property def conf(self) -> Union[dict, str]: """Spark configuration properties. :return: The spark configuration properties. :rtype: Union[dict, str] """ return self._conf @conf.setter def conf(self, value: Union[dict, str]): self._conf = value @property def hdinsight_spark_job_name(self) -> str: """ :return: The name of this session :rtype: str """ return self._hdinsight_spark_job_name @hdinsight_spark_job_name.setter def hdinsight_spark_job_name(self, value: str): self._hdinsight_spark_job_name = value @classmethod def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: return [ "compute_name", "queue", "driver_cores", "executor_memory", "conf", "hdinsight_spark_job_name", "driver_memory", "executor_cores", "number_executors", ] @classmethod def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: from .._schema.node import HDInsightSchema return HDInsightSchema(context=context) class Starlite(InternalBaseNode): def __init__(self, **kwargs): kwargs.pop("type", None) super(Starlite, self).__init__(type=NodeType.STARLITE, **kwargs) class Pipeline(InternalBaseNode): # this is only for using registered pipeline component def __init__(self, **kwargs): kwargs.pop("type", None) super(Pipeline, self).__init__(type=NodeType.PIPELINE, **kwargs) class Hemera(InternalBaseNode): def __init__(self, **kwargs): kwargs.pop("type", None) super(Hemera, self).__init__(type=NodeType.HEMERA, **kwargs) class Ae365exepool(InternalBaseNode): def __init__(self, **kwargs): kwargs.pop("type", None) super(Ae365exepool, self).__init__(type=NodeType.AE365EXEPOOL, **kwargs) class Sweep(InternalBaseNode): # this is not in our scope def __init__(self, **kwargs): kwargs.pop("type", None) super(Sweep, self).__init__(type=NodeType.SWEEP, **kwargs) class AetherBridge(InternalBaseNode): def __init__(self, **kwargs): kwargs.pop("type", None) super(AetherBridge, self).__init__(type=NodeType.AETHER_BRIDGE, **kwargs)