diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py new file mode 100644 index 00000000..89fc032c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py @@ -0,0 +1,338 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access + +from enum import Enum +from typing import Dict, List, Optional, Union + +from marshmallow import Schema + +from ... import Input, Output +from ..._schema import PathAwareSchema +from ...constants import JobType +from ...entities import Component, Job +from ...entities._builders import BaseNode +from ...entities._job.pipeline._io import NodeInput, NodeOutput, PipelineInput +from ...entities._util import convert_ordered_dict_to_dict +from .._schema.component import NodeType + + +class InternalBaseNode(BaseNode): + """Base class for node of internal components in pipeline. Can be instantiated directly. + + :param type: Type of pipeline node + :type type: str + :param component: Id or instance of the component version to be run for the step + :type component: Union[Component, str] + :param inputs: Inputs to the node. + :type inputs: Dict[str, Union[Input, str, bool, int, float, Enum, dict]] + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Dict[str, Union[str, Output, dict]] + :param properties: The job property dictionary. + :type properties: dict[str, str] + :param compute: Compute definition containing the compute information for the step + :type compute: str + """ + + def __init__( + self, + *, + type: str = JobType.COMPONENT, # pylint: disable=redefined-builtin + component: Union[Component, str], + inputs: Optional[ + Dict[ + str, + Union[ + PipelineInput, + NodeOutput, + Input, + str, + bool, + int, + float, + Enum, + "Input", + ], + ] + ] = None, + outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None, + properties: Optional[Dict] = None, + compute: Optional[str] = None, + **kwargs, + ): + kwargs.pop("type", None) + BaseNode.__init__( + self, + type=type, + component=component, # type: ignore[arg-type] + # TODO: Bug 2881892 + inputs=inputs, + outputs=outputs, + compute=compute, + properties=properties, + **kwargs, + ) + + @property + def _skip_required_compute_missing_validation(self) -> bool: + return True + + def _to_node(self, context: Optional[Dict] = None, **kwargs) -> BaseNode: + return self + + def _to_component(self, context: Optional[Dict] = None, **kwargs) -> Component: + return self.component + + def _to_job(self) -> Job: + raise RuntimeError("Internal components doesn't support to job") + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "Job": + raise RuntimeError("Internal components doesn't support load from dict") + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.node import InternalBaseNodeSchema + + return InternalBaseNodeSchema(context=context) + + @property + def component(self) -> Component: + return self._component + + def _to_rest_inputs(self) -> Dict[str, Dict]: + rest_dataset_literal_inputs = super(InternalBaseNode, self)._to_rest_inputs() + for input_name, input_value in self.inputs.items(): + # hack: remove unfilled input from rest object instead a default input of {"job_input_type": "literal"} + # note that this hack is not always effective as _data will be set to Input() when visiting input_value.type + if ( + isinstance(input_value, NodeInput) + and input_value._data is None + and input_name in rest_dataset_literal_inputs + ): + del rest_dataset_literal_inputs[input_name] + return rest_dataset_literal_inputs + + def _to_rest_object(self, **kwargs) -> dict: + base_dict = super(InternalBaseNode, self)._to_rest_object(**kwargs) + for key in ["name", "display_name", "tags"]: + if key in base_dict: + del base_dict[key] + for key in ["computeId"]: + if key in base_dict and base_dict[key] is None: + del base_dict[key] + + base_dict.update( + convert_ordered_dict_to_dict( + { + "componentId": self._get_component_id(), + "type": self.type, + } + ) + ) + return base_dict + + +class DataTransfer(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(DataTransfer, self).__init__(type=NodeType.DATA_TRANSFER, **kwargs) + + +class HDInsight(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(HDInsight, self).__init__(type=NodeType.HDI, **kwargs) + self._init = True + self._compute_name: str = kwargs.pop("compute_name", None) + self._queue: str = kwargs.pop("queue", None) + self._driver_memory: str = kwargs.pop("driver_memory", None) + self._driver_cores: int = kwargs.pop("driver_cores", None) + self._executor_memory: str = kwargs.pop("executor_memory", None) + self._executor_cores: int = kwargs.pop("executor_cores", None) + self._number_executors: int = kwargs.pop("number_executors", None) + self._conf: Union[dict, str] = kwargs.pop("conf", None) + self._hdinsight_spark_job_name: str = kwargs.pop("hdinsight_spark_job_name", None) + self._init = False + + @property + def compute_name(self) -> str: + """Name of the compute to be used. + + :return: Compute name + :rtype: str + """ + return self._compute_name + + @compute_name.setter + def compute_name(self, value: str): + self._compute_name = value + + @property + def queue(self) -> str: + """The name of the YARN queue to which submitted. + + :return: YARN queue name + :rtype: str + """ + return self._queue + + @queue.setter + def queue(self, value: str): + self._queue = value + + @property + def driver_memory(self) -> str: + """Amount of memory to use for the driver process. + + It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte, + megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g. + + :return: Amount of memory to use for the driver process + :rtype: str + """ + return self._driver_memory + + @driver_memory.setter + def driver_memory(self, value: str): + self._driver_memory = value + + @property + def driver_cores(self) -> int: + """Number of cores to use for the driver process. + + :return: Number of cores to use for the driver process. + :rtype: int + """ + return self._driver_cores + + @driver_cores.setter + def driver_cores(self, value: int): + self._driver_cores = value + + @property + def executor_memory(self) -> str: + """Amount of memory to use per executor process. + + It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte, + megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g. + + :return: The executor memory + :rtype: str + """ + return self._executor_memory + + @executor_memory.setter + def executor_memory(self, value: str): + self._executor_memory = value + + @property + def executor_cores(self) -> int: + """Number of cores to use for each executor. + + :return: The number of cores to use for each executor + :rtype: int + """ + return self._executor_cores + + @executor_cores.setter + def executor_cores(self, value: int): + self._executor_cores = value + + @property + def number_executors(self) -> int: + """Number of executors to launch for this session. + + :return: The number of executors to launch + :rtype: int + """ + return self._number_executors + + @number_executors.setter + def number_executors(self, value: int): + self._number_executors = value + + @property + def conf(self) -> Union[dict, str]: + """Spark configuration properties. + + :return: The spark configuration properties. + :rtype: Union[dict, str] + """ + return self._conf + + @conf.setter + def conf(self, value: Union[dict, str]): + self._conf = value + + @property + def hdinsight_spark_job_name(self) -> str: + """ + + :return: The name of this session + :rtype: str + """ + return self._hdinsight_spark_job_name + + @hdinsight_spark_job_name.setter + def hdinsight_spark_job_name(self, value: str): + self._hdinsight_spark_job_name = value + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return [ + "compute_name", + "queue", + "driver_cores", + "executor_memory", + "conf", + "hdinsight_spark_job_name", + "driver_memory", + "executor_cores", + "number_executors", + ] + + @classmethod + def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]: + from .._schema.node import HDInsightSchema + + return HDInsightSchema(context=context) + + +class Starlite(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Starlite, self).__init__(type=NodeType.STARLITE, **kwargs) + + +class Pipeline(InternalBaseNode): + # this is only for using registered pipeline component + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Pipeline, self).__init__(type=NodeType.PIPELINE, **kwargs) + + +class Hemera(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Hemera, self).__init__(type=NodeType.HEMERA, **kwargs) + + +class Ae365exepool(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Ae365exepool, self).__init__(type=NodeType.AE365EXEPOOL, **kwargs) + + +class Sweep(InternalBaseNode): + # this is not in our scope + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(Sweep, self).__init__(type=NodeType.SWEEP, **kwargs) + + +class AetherBridge(InternalBaseNode): + def __init__(self, **kwargs): + kwargs.pop("type", None) + super(AetherBridge, self).__init__(type=NodeType.AETHER_BRIDGE, **kwargs) |