about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py338
1 files changed, 338 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py
new file mode 100644
index 00000000..89fc032c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py
@@ -0,0 +1,338 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+
+from enum import Enum
+from typing import Dict, List, Optional, Union
+
+from marshmallow import Schema
+
+from ... import Input, Output
+from ..._schema import PathAwareSchema
+from ...constants import JobType
+from ...entities import Component, Job
+from ...entities._builders import BaseNode
+from ...entities._job.pipeline._io import NodeInput, NodeOutput, PipelineInput
+from ...entities._util import convert_ordered_dict_to_dict
+from .._schema.component import NodeType
+
+
+class InternalBaseNode(BaseNode):
+    """Base class for node of internal components in pipeline. Can be instantiated directly.
+
+    :param type: Type of pipeline node
+    :type type: str
+    :param component: Id or instance of the component version to be run for the step
+    :type component: Union[Component, str]
+    :param inputs: Inputs to the node.
+    :type inputs: Dict[str, Union[Input, str, bool, int, float, Enum, dict]]
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: Dict[str, Union[str, Output, dict]]
+    :param properties: The job property dictionary.
+    :type properties: dict[str, str]
+    :param compute: Compute definition containing the compute information for the step
+    :type compute: str
+    """
+
+    def __init__(
+        self,
+        *,
+        type: str = JobType.COMPONENT,  # pylint: disable=redefined-builtin
+        component: Union[Component, str],
+        inputs: Optional[
+            Dict[
+                str,
+                Union[
+                    PipelineInput,
+                    NodeOutput,
+                    Input,
+                    str,
+                    bool,
+                    int,
+                    float,
+                    Enum,
+                    "Input",
+                ],
+            ]
+        ] = None,
+        outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
+        properties: Optional[Dict] = None,
+        compute: Optional[str] = None,
+        **kwargs,
+    ):
+        kwargs.pop("type", None)
+        BaseNode.__init__(
+            self,
+            type=type,
+            component=component,  # type: ignore[arg-type]
+            # TODO: Bug 2881892
+            inputs=inputs,
+            outputs=outputs,
+            compute=compute,
+            properties=properties,
+            **kwargs,
+        )
+
+    @property
+    def _skip_required_compute_missing_validation(self) -> bool:
+        return True
+
+    def _to_node(self, context: Optional[Dict] = None, **kwargs) -> BaseNode:
+        return self
+
+    def _to_component(self, context: Optional[Dict] = None, **kwargs) -> Component:
+        return self.component
+
+    def _to_job(self) -> Job:
+        raise RuntimeError("Internal components doesn't support to job")
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "Job":
+        raise RuntimeError("Internal components doesn't support load from dict")
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.node import InternalBaseNodeSchema
+
+        return InternalBaseNodeSchema(context=context)
+
+    @property
+    def component(self) -> Component:
+        return self._component
+
+    def _to_rest_inputs(self) -> Dict[str, Dict]:
+        rest_dataset_literal_inputs = super(InternalBaseNode, self)._to_rest_inputs()
+        for input_name, input_value in self.inputs.items():
+            # hack: remove unfilled input from rest object instead a default input of {"job_input_type": "literal"}
+            # note that this hack is not always effective as _data will be set to Input() when visiting input_value.type
+            if (
+                isinstance(input_value, NodeInput)
+                and input_value._data is None
+                and input_name in rest_dataset_literal_inputs
+            ):
+                del rest_dataset_literal_inputs[input_name]
+        return rest_dataset_literal_inputs
+
+    def _to_rest_object(self, **kwargs) -> dict:
+        base_dict = super(InternalBaseNode, self)._to_rest_object(**kwargs)
+        for key in ["name", "display_name", "tags"]:
+            if key in base_dict:
+                del base_dict[key]
+        for key in ["computeId"]:
+            if key in base_dict and base_dict[key] is None:
+                del base_dict[key]
+
+        base_dict.update(
+            convert_ordered_dict_to_dict(
+                {
+                    "componentId": self._get_component_id(),
+                    "type": self.type,
+                }
+            )
+        )
+        return base_dict
+
+
+class DataTransfer(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(DataTransfer, self).__init__(type=NodeType.DATA_TRANSFER, **kwargs)
+
+
+class HDInsight(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(HDInsight, self).__init__(type=NodeType.HDI, **kwargs)
+        self._init = True
+        self._compute_name: str = kwargs.pop("compute_name", None)
+        self._queue: str = kwargs.pop("queue", None)
+        self._driver_memory: str = kwargs.pop("driver_memory", None)
+        self._driver_cores: int = kwargs.pop("driver_cores", None)
+        self._executor_memory: str = kwargs.pop("executor_memory", None)
+        self._executor_cores: int = kwargs.pop("executor_cores", None)
+        self._number_executors: int = kwargs.pop("number_executors", None)
+        self._conf: Union[dict, str] = kwargs.pop("conf", None)
+        self._hdinsight_spark_job_name: str = kwargs.pop("hdinsight_spark_job_name", None)
+        self._init = False
+
+    @property
+    def compute_name(self) -> str:
+        """Name of the compute to be used.
+
+        :return: Compute name
+        :rtype: str
+        """
+        return self._compute_name
+
+    @compute_name.setter
+    def compute_name(self, value: str):
+        self._compute_name = value
+
+    @property
+    def queue(self) -> str:
+        """The name of the YARN queue to which submitted.
+
+        :return: YARN queue name
+        :rtype: str
+        """
+        return self._queue
+
+    @queue.setter
+    def queue(self, value: str):
+        self._queue = value
+
+    @property
+    def driver_memory(self) -> str:
+        """Amount of memory to use for the driver process.
+
+        It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte,
+        megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g.
+
+        :return: Amount of memory to use for the driver process
+        :rtype: str
+        """
+        return self._driver_memory
+
+    @driver_memory.setter
+    def driver_memory(self, value: str):
+        self._driver_memory = value
+
+    @property
+    def driver_cores(self) -> int:
+        """Number of cores to use for the driver process.
+
+        :return: Number of cores to use for the driver process.
+        :rtype: int
+        """
+        return self._driver_cores
+
+    @driver_cores.setter
+    def driver_cores(self, value: int):
+        self._driver_cores = value
+
+    @property
+    def executor_memory(self) -> str:
+        """Amount of memory to use per executor process.
+
+        It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte,
+        megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g.
+
+        :return: The executor memory
+        :rtype: str
+        """
+        return self._executor_memory
+
+    @executor_memory.setter
+    def executor_memory(self, value: str):
+        self._executor_memory = value
+
+    @property
+    def executor_cores(self) -> int:
+        """Number of cores to use for each executor.
+
+        :return: The number of cores to use for each executor
+        :rtype: int
+        """
+        return self._executor_cores
+
+    @executor_cores.setter
+    def executor_cores(self, value: int):
+        self._executor_cores = value
+
+    @property
+    def number_executors(self) -> int:
+        """Number of executors to launch for this session.
+
+        :return: The number of executors to launch
+        :rtype: int
+        """
+        return self._number_executors
+
+    @number_executors.setter
+    def number_executors(self, value: int):
+        self._number_executors = value
+
+    @property
+    def conf(self) -> Union[dict, str]:
+        """Spark configuration properties.
+
+        :return: The spark configuration properties.
+        :rtype: Union[dict, str]
+        """
+        return self._conf
+
+    @conf.setter
+    def conf(self, value: Union[dict, str]):
+        self._conf = value
+
+    @property
+    def hdinsight_spark_job_name(self) -> str:
+        """
+
+        :return: The name of this session
+        :rtype: str
+        """
+        return self._hdinsight_spark_job_name
+
+    @hdinsight_spark_job_name.setter
+    def hdinsight_spark_job_name(self, value: str):
+        self._hdinsight_spark_job_name = value
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return [
+            "compute_name",
+            "queue",
+            "driver_cores",
+            "executor_memory",
+            "conf",
+            "hdinsight_spark_job_name",
+            "driver_memory",
+            "executor_cores",
+            "number_executors",
+        ]
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.node import HDInsightSchema
+
+        return HDInsightSchema(context=context)
+
+
+class Starlite(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Starlite, self).__init__(type=NodeType.STARLITE, **kwargs)
+
+
+class Pipeline(InternalBaseNode):
+    # this is only for using registered pipeline component
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Pipeline, self).__init__(type=NodeType.PIPELINE, **kwargs)
+
+
+class Hemera(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Hemera, self).__init__(type=NodeType.HEMERA, **kwargs)
+
+
+class Ae365exepool(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Ae365exepool, self).__init__(type=NodeType.AE365EXEPOOL, **kwargs)
+
+
+class Sweep(InternalBaseNode):
+    # this is not in our scope
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Sweep, self).__init__(type=NodeType.SWEEP, **kwargs)
+
+
+class AetherBridge(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(AetherBridge, self).__init__(type=NodeType.AETHER_BRIDGE, **kwargs)