aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py46
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py31
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py193
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py195
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py35
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py203
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py370
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py157
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py338
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py114
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py29
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py194
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py137
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py47
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py131
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py192
16 files changed, 2412 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py
new file mode 100644
index 00000000..47296a80
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py
@@ -0,0 +1,46 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from ._input_outputs import InternalInput
+from .command import Command, Distributed
+from .component import InternalComponent
+from .node import Ae365exepool, AetherBridge, DataTransfer, HDInsight, Hemera, InternalBaseNode, Pipeline, Starlite
+from .parallel import Parallel
+from .runsettings import (
+ AISuperComputerConfiguration,
+ AISuperComputerScalePolicy,
+ AISuperComputerStorageReferenceConfiguration,
+ ITPConfiguration,
+ ITPInteractiveConfiguration,
+ ITPPriorityConfiguration,
+ ITPResourceConfiguration,
+ ITPRetrySettings,
+ TargetSelector,
+)
+from .scope import Scope
+
+__all__ = [
+ "InternalBaseNode",
+ "DataTransfer",
+ "Ae365exepool",
+ "AetherBridge",
+ "HDInsight",
+ "Parallel",
+ "Starlite",
+ "Pipeline",
+ "Hemera",
+ "Command",
+ "Distributed",
+ "Scope",
+ "InternalComponent",
+ "TargetSelector",
+ "ITPInteractiveConfiguration",
+ "ITPPriorityConfiguration",
+ "ITPResourceConfiguration",
+ "ITPRetrySettings",
+ "ITPConfiguration",
+ "AISuperComputerConfiguration",
+ "AISuperComputerScalePolicy",
+ "AISuperComputerStorageReferenceConfiguration",
+ "InternalInput",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py
new file mode 100644
index 00000000..71a8c0a4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py
@@ -0,0 +1,31 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from pathlib import Path
+from typing import Optional
+
+from ...constants._common import AzureDevopsArtifactsType
+from ...entities._component._additional_includes import AdditionalIncludes
+
+
+class InternalAdditionalIncludes(AdditionalIncludes):
+ """This class is kept for compatibility with mldesigner."""
+
+ @property
+ def includes(self):
+ if self._is_artifact_includes:
+ return self._get_resolved_additional_include_configs()
+ return self.origin_configs
+
+ @property
+ def code_path(self) -> Optional[Path]:
+ return self.resolved_code_path
+
+ @property
+ def _is_artifact_includes(self):
+ return any(
+ map(
+ lambda x: isinstance(x, dict) and x.get("type", None) == AzureDevopsArtifactsType.ARTIFACT,
+ self.origin_configs,
+ )
+ )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py
new file mode 100644
index 00000000..f9d0a970
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py
@@ -0,0 +1,193 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Dict, Optional, Union, overload
+
+from ... import Input, Output
+from ..._utils.utils import get_all_enum_values_iter
+from ...constants import AssetTypes
+from ...constants._common import InputTypes
+from ...constants._component import ComponentParameterTypes, IOConstants
+from .._schema.input_output import SUPPORTED_INTERNAL_PARAM_TYPES
+
+_INPUT_TYPE_ENUM = "enum"
+_INPUT_TYPE_ENUM_CAP = "Enum"
+_INPUT_TYPE_FLOAT = "float"
+_INPUT_TYPE_FLOAT_CAP = "Float"
+
+
+class InternalInput(Input):
+ """Internal input class for internal components only. Comparing to the public Input class, this class has additional
+ primitive input types:
+
+ - String
+ - Integer
+ - Float, float
+ - Boolean
+ - Enum, enum (new)
+ """
+
+ def __init__(self, *, datastore_mode=None, is_resource=None, **kwargs):
+ self.datastore_mode = datastore_mode
+ self.is_resource = is_resource
+ super().__init__(**kwargs)
+
+ @property
+ def _allowed_types(self):
+ if self._lower_type == _INPUT_TYPE_ENUM:
+ return str
+ if self._lower_type == _INPUT_TYPE_FLOAT:
+ return float
+ return IOConstants.PRIMITIVE_STR_2_TYPE.get(self._lower_type, None)
+
+ @property
+ def _lower_type(self) -> Optional[str]:
+ if isinstance(self.type, str):
+ return self.type.lower()
+ if self._multiple_types:
+ return None
+ return self.type.__name__.lower()
+
+ @property
+ def _is_primitive_type(self):
+ if self._lower_type in [_INPUT_TYPE_ENUM, _INPUT_TYPE_FLOAT]:
+ return True
+ if self._lower_type in IOConstants.PRIMITIVE_STR_2_TYPE:
+ return True
+ return super()._is_primitive_type
+
+ def _simple_parse(self, value, _type=None):
+ # simple parse is used to parse min, max & optional only, so we don't need to handle enum
+ if _type is None:
+ _type = self._lower_type
+ if _type == _INPUT_TYPE_FLOAT:
+ _type = ComponentParameterTypes.NUMBER
+ return super()._simple_parse(value, _type)
+
+ def _to_rest_object(self) -> Dict:
+ rest_object = super()._to_rest_object()
+ if self._lower_type == _INPUT_TYPE_ENUM:
+ rest_object["type"] = _INPUT_TYPE_ENUM_CAP
+ if self._lower_type == _INPUT_TYPE_FLOAT:
+ rest_object["type"] = _INPUT_TYPE_FLOAT_CAP
+ return rest_object
+
+ @classmethod
+ def _map_from_rest_type(cls, _type):
+ mapping_dict = {
+ _INPUT_TYPE_ENUM_CAP: _INPUT_TYPE_ENUM,
+ _INPUT_TYPE_FLOAT_CAP: _INPUT_TYPE_FLOAT,
+ }
+ if isinstance(_type, str) and _type in mapping_dict:
+ return mapping_dict[_type]
+ return super()._map_from_rest_type(_type)
+
+ @classmethod
+ def _from_rest_object(cls, obj: Dict) -> "InternalInput":
+ obj["type"] = cls._map_from_rest_type(obj["type"])
+ return InternalInput(**obj)
+
+ def _get_python_builtin_type_str(self) -> str:
+ if self._lower_type in [_INPUT_TYPE_ENUM, _INPUT_TYPE_FLOAT]:
+ return self._lower_type
+ if self._is_primitive_type:
+ return IOConstants.PRIMITIVE_STR_2_TYPE[self._lower_type].__name__ # type: ignore[index]
+ # TODO: Bug 2881900
+ return super()._get_python_builtin_type_str()
+
+ @overload
+ @classmethod
+ def _from_base(cls, _input: None) -> None: # type: ignore[misc]
+ ...
+
+ @overload
+ @classmethod
+ def _from_base(cls, _input: Union[Input, Dict]) -> "InternalInput": ...
+
+ @classmethod
+ def _from_base(cls, _input: Optional[Union[Input, Dict]]) -> Optional["InternalInput"]:
+ """Cast from Input or Dict to InternalInput.
+
+ Do not guarantee to create a new object.
+
+ :param _input: The base input
+ :type _input: Union[Input, Dict]
+ :return:
+ * None if _input is None
+ * InternalInput
+ :rtype: Optional["InternalInput"]
+ """
+ if _input is None:
+ return None
+ if isinstance(_input, InternalInput):
+ return _input
+ if isinstance(_input, Input):
+ # do force cast directly as there is no new field added in InternalInput
+ # need to change the logic if new field is added
+ _input.__class__ = InternalInput
+ return _input # type: ignore[return-value]
+ return InternalInput(**_input)
+
+
+def _map_v1_io_type(output_type: str) -> str:
+ """Map v1 IO type to v2.
+
+ :param output_type: The v1 IO type
+ :type output_type: str
+ :return: The v2 IO type name
+ :rtype: str
+ """
+
+ # TODO: put it in a common place
+ def _map_primitive_type(_type: str) -> str:
+ """Convert double and float to number type.
+
+ :param _type: A primitive v1 IO type
+ :type _type: str
+ :return:
+ * InputTypes.NUMBER if _type is "double" or "float"
+ * The provided type otherwise
+ :rtype: str
+ """
+ _type = _type.lower()
+ if _type in ["double", "float"]:
+ return InputTypes.NUMBER
+ return _type
+
+ if output_type in list(get_all_enum_values_iter(AssetTypes)):
+ return output_type
+ if output_type in SUPPORTED_INTERNAL_PARAM_TYPES:
+ return _map_primitive_type(output_type)
+ if output_type in ["AnyFile"]:
+ return AssetTypes.URI_FILE
+ # Handle AnyDirectory and the other types.
+ return AssetTypes.URI_FOLDER
+
+
+class InternalOutput(Output):
+ def __init__(self, *, datastore_mode=None, is_link_mode=None, **kwargs):
+ self.datastore_mode = datastore_mode
+ self.is_link_mode = is_link_mode
+ super().__init__(**kwargs)
+
+ @classmethod
+ def _from_base(cls, _output: Union[Output, Dict]) -> Optional["InternalOutput"]:
+ if _output is None:
+ return None
+ if isinstance(_output, InternalOutput):
+ return _output
+ if isinstance(_output, Output):
+ # do force cast directly as there is no new field added in InternalInput
+ # need to change the logic if new field is added
+ _output.__class__ = InternalOutput
+ return _output # type: ignore[return-value]
+ return InternalOutput(**_output)
+
+ def map_pipeline_output_type(self) -> str:
+ """Map output type to pipeline output type.
+
+ :return: The pipeline output type
+ :rtype: str
+ """
+ # TODO: call this for node output
+ return _map_v1_io_type(self.type)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py
new file mode 100644
index 00000000..3a27b562
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py
@@ -0,0 +1,195 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=pointless-string-statement
+
+import hashlib
+import json
+import os
+from collections import deque
+from datetime import datetime
+from os import listdir
+from os.path import isfile, join
+
+HASH_FILE_CHUNK_SIZE = 65536
+HASH_ALGORITHM = "sha512"
+
+""" Copied from ml-components
+Create a merkle tree for the given directory path
+The directory would typically represent a project directory"""
+
+
+def create_merkletree(file_or_folder_path, exclude_function):
+ root = DirTreeNode("", "Directory", datetime.fromtimestamp(os.path.getmtime(file_or_folder_path)).isoformat())
+ if os.path.isdir(file_or_folder_path):
+ folder_path = file_or_folder_path
+ _create_merkletree_helper(folder_path, root, exclude_function)
+ else:
+ file_path = file_or_folder_path
+ file_node = DirTreeNode(file_path, "File", datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat())
+ hexdigest_hash, bytehash = _get_hash(os.path.normpath(file_path), file_path, "File")
+ if hexdigest_hash and bytehash:
+ file_node.add_hash(hexdigest_hash, bytehash)
+ root.add_child(file_node)
+
+ _populate_hashes(root)
+ return root
+
+
+""" Populate hashes for directory nodes
+by hashing the hashes of child nodes under them"""
+
+
+def _populate_hashes(rootNode):
+ if rootNode.is_file():
+ return rootNode.bytehash
+ h = hashlib.new(HASH_ALGORITHM)
+ for child in rootNode.children:
+ if child.is_file():
+ h.update(child.bytehash)
+ else:
+ h.update(_populate_hashes(child))
+ rootNode.bytehash = h.digest()
+ rootNode.hexdigest_hash = h.hexdigest()
+ return h.digest()
+
+
+""" Create a merkle tree for the given directory path
+ :param projectDir: Directory for which to create a tree.
+ :param rootNode: Root node .
+ Walks the directory and create a dirTree """
+
+
+def _create_merkletree_helper(projectDir, rootNode, exclude_function):
+ for f in sorted(listdir(projectDir)):
+ path = os.path.normpath(join(projectDir, f))
+ if not exclude_function(path):
+ if isfile(join(projectDir, f)):
+ newNode = DirTreeNode(f, "File", datetime.fromtimestamp(os.path.getmtime(path)).isoformat())
+ hexdigest_hash, bytehash = _get_hash(path, f, "File")
+ if hexdigest_hash and bytehash:
+ newNode.add_hash(hexdigest_hash, bytehash)
+ rootNode.add_child(newNode)
+ else:
+ newNode = DirTreeNode(f, "Directory", datetime.fromtimestamp(os.path.getmtime(path)).isoformat())
+ rootNode.add_child(newNode)
+ _create_merkletree_helper(path, newNode, exclude_function)
+
+
+def _get_hash(filePath, name, file_type):
+ h = hashlib.new(HASH_ALGORITHM)
+ if not os.access(filePath, os.R_OK):
+ print(filePath, os.R_OK)
+ print("Cannot access file, so excluded from snapshot: {}".format(filePath))
+ return (None, None)
+ with open(filePath, "rb") as f:
+ while True:
+ data = f.read(HASH_FILE_CHUNK_SIZE)
+ if not data:
+ break
+ h.update(data)
+ h.update(name.encode("utf-8"))
+ h.update(file_type.encode("utf-8"))
+ return (h.hexdigest(), h.digest())
+
+
+""" We compute both hexdigest and digest for hashes.
+digest (bytes) is used so that we can compute the bytehash of a parent directory based on bytehash of its children
+hexdigest is used so that we can serialize the tree using json"""
+
+
+class DirTreeNode(object):
+ def __init__(self, name=None, file_type=None, timestamp=None, hexdigest_hash=None, bytehash=None):
+ self.file_type = file_type
+ self.name = name
+ self.timestamp = timestamp
+ self.children = []
+ self.hexdigest_hash = hexdigest_hash
+ self.bytehash = bytehash
+
+ def load_children_from_dict(self, node_dict):
+ if len(node_dict.items()) == 0:
+ return None
+ self.name = node_dict["name"]
+ self.file_type = node_dict["type"]
+ self.hexdigest_hash = node_dict["hash"]
+ self.timestamp = node_dict["timestamp"]
+ for _, child in node_dict["children"].items():
+ node = DirTreeNode()
+ node.load_children_from_dict(child)
+ self.add_child(node)
+ return self
+
+ def load_children_from_json(self, node_dict):
+ self.name = node_dict["name"]
+ self.file_type = node_dict["type"]
+ self.hexdigest_hash = node_dict["hash"]
+ self.timestamp = node_dict["timestamp"]
+ for child in node_dict["children"]:
+ node = DirTreeNode()
+ node.load_children_from_json(child)
+ self.add_child(node)
+ return self
+
+ def load_object_from_dict(self, node_dict):
+ self.load_children_from_dict(node_dict)
+
+ def load_root_object_from_json_string(self, jsondata):
+ node_dict = json.loads(jsondata)
+ self.load_children_from_json(node_dict)
+
+ def add_hash(self, hexdigest_hash, bytehash):
+ self.hexdigest_hash = hexdigest_hash
+ self.bytehash = bytehash
+
+ def add_child(self, node):
+ self.children.append(node)
+
+ def is_file(self):
+ return self.file_type == "File"
+
+ """ Only for debugging purposes"""
+
+ def print_tree(self):
+ queue = deque()
+ print("Name: " + self.name)
+ print("Type: " + self.file_type)
+ for child in self.children:
+ print(" " + child.name)
+ queue.append(child)
+ for i in queue:
+ i.print_tree()
+
+
+""" Serialize merkle tree.
+Serialize all fields except digest (bytes)
+"""
+
+
+class DirTreeJsonEncoder(json.JSONEncoder):
+ def default(self, o):
+ if not isinstance(o, DirTreeNode):
+ return super(DirTreeJsonEncoder, self).default(o)
+ _dict = o.__dict__
+ _dict.pop("bytehash", None)
+ _dict["type"] = _dict.pop("file_type")
+ _dict["hash"] = _dict.pop("hexdigest_hash")
+
+ return _dict
+
+
+class DirTreeJsonEncoderV2(json.JSONEncoder):
+ def default(self, o):
+ if not isinstance(o, DirTreeNode):
+ return super(DirTreeJsonEncoderV2, self).default(o)
+ _dict = o.__dict__
+ _dict.pop("bytehash", None)
+ if "file_type" in _dict:
+ _dict["type"] = _dict.pop("file_type")
+ if "hexdigest_hash" in _dict:
+ _dict["hash"] = _dict.pop("hexdigest_hash")
+ if isinstance(_dict["children"], list):
+ _dict["children"] = {x.name: x for x in _dict["children"]}
+
+ return _dict
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py
new file mode 100644
index 00000000..d4437663
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py
@@ -0,0 +1,35 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Optional
+
+from ...entities._assets import Code
+
+
+class InternalCode(Code):
+ @property
+ def _upload_hash(self) -> Optional[str]:
+ # This property will be used to identify the uploaded content when trying to
+ # upload to datastore. The tracebacks will be as below:
+ # Traceback (most recent call last):
+ # _artifact_utilities._check_and_upload_path
+ # _artifact_utilities._upload_to_datastore
+ # _artifact_utilities.upload_artifact
+ # _blob_storage_helper.upload
+ # where asset id will be calculated based on the upload hash.
+
+ if self._is_anonymous is True:
+ # Name of an anonymous internal code is the same as its snapshot id
+ # in ml-component, use it as the upload hash to avoid duplicate hash
+ # calculation with _asset_utils.get_object_hash.
+ return self.name
+
+ return getattr(super(InternalCode, self), "_upload_hash")
+
+ def __setattr__(self, key, value):
+ if key == "name" and hasattr(self, key) and self._is_anonymous is True and value != self.name:
+ raise AttributeError(
+ "InternalCode name are calculated based on its content and cannot "
+ "be changed: current name is {} and new value is {}".format(self.name, value)
+ )
+ super().__setattr__(key, value)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py
new file mode 100644
index 00000000..9ed732ec
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py
@@ -0,0 +1,203 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+from typing import Dict, List, Optional, Union
+
+from marshmallow import INCLUDE, Schema
+
+from ... import MpiDistribution, PyTorchDistribution, RayDistribution, TensorFlowDistribution
+from ..._schema import PathAwareSchema
+from ..._schema.core.fields import DistributionField
+from ...entities import CommandJobLimits, JobResourceConfiguration
+from ...entities._util import get_rest_dict_for_node_attrs
+from .._schema.component import NodeType
+from ..entities.component import InternalComponent
+from ..entities.node import InternalBaseNode
+
+
+class Command(InternalBaseNode):
+ """Node of internal command components in pipeline with specific run settings.
+
+ Different from azure.ai.ml.entities.Command, type of this class is CommandComponent.
+ """
+
+ def __init__(self, **kwargs):
+ node_type = kwargs.pop("type", None) or NodeType.COMMAND
+ super(Command, self).__init__(type=node_type, **kwargs)
+ self._init = True
+ self._resources = kwargs.pop("resources", JobResourceConfiguration())
+ self._compute = kwargs.pop("compute", None)
+ self._environment = kwargs.pop("environment", None)
+ self._environment_variables = kwargs.pop("environment_variables", None)
+ self._limits = kwargs.pop("limits", CommandJobLimits())
+ self._init = False
+
+ @property
+ def compute(self) -> Optional[str]:
+ """Get the compute definition for the command.
+
+ :return: The compute definition
+ :rtype: Optional[str]
+ """
+ return self._compute
+
+ @compute.setter
+ def compute(self, value: str) -> None:
+ """Set the compute definition for the command.
+
+ :param value: The new compute definition
+ :type value: str
+ """
+ self._compute = value
+
+ @property
+ def environment(self) -> Optional[str]:
+ """Get the environment definition for the command.
+
+ :return: The environment definition
+ :rtype: Optional[str]
+ """
+ return self._environment
+
+ @environment.setter
+ def environment(self, value: str) -> None:
+ """Set the environment definition for the command.
+
+ :param value: The new environment definition
+ :type value: str
+ """
+ self._environment = value
+
+ @property
+ def environment_variables(self) -> Optional[Dict[str, str]]:
+ """Get the environment variables for the command.
+
+ :return: The environment variables
+ :rtype: Optional[Dict[str, str]]
+ """
+ return self._environment_variables
+
+ @environment_variables.setter
+ def environment_variables(self, value: Dict[str, str]) -> None:
+ """Set the environment variables for the command.
+
+ :param value: The new environment variables
+ :type value: Dict[str, str]
+ """
+ self._environment_variables = value
+
+ @property
+ def limits(self) -> CommandJobLimits:
+ return self._limits
+
+ @limits.setter
+ def limits(self, value: CommandJobLimits):
+ self._limits = value
+
+ @property
+ def resources(self) -> JobResourceConfiguration:
+ """Compute Resource configuration for the component.
+
+ :return: The resource configuration
+ :rtype: JobResourceConfiguration
+ """
+ return self._resources
+
+ @resources.setter
+ def resources(self, value: JobResourceConfiguration):
+ self._resources = value
+
+ @classmethod
+ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+ return ["environment", "limits", "resources", "environment_variables"]
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ from .._schema.command import CommandSchema
+
+ return CommandSchema(context=context)
+
+ def _to_rest_object(self, **kwargs) -> dict:
+ rest_obj = super()._to_rest_object(**kwargs)
+ rest_obj.update(
+ {
+ "limits": get_rest_dict_for_node_attrs(self.limits, clear_empty_value=True),
+ "resources": get_rest_dict_for_node_attrs(self.resources, clear_empty_value=True),
+ }
+ )
+ return rest_obj
+
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj):
+ obj = InternalBaseNode._from_rest_object_to_init_params(obj)
+
+ if "resources" in obj and obj["resources"]:
+ obj["resources"] = JobResourceConfiguration._from_rest_object(obj["resources"])
+
+ # handle limits
+ if "limits" in obj and obj["limits"]:
+ obj["limits"] = CommandJobLimits._from_rest_object(obj["limits"])
+ return obj
+
+
+class Distributed(Command):
+ def __init__(self, **kwargs):
+ super(Distributed, self).__init__(**kwargs)
+ self._distribution = kwargs.pop("distribution", None)
+ self._type = NodeType.DISTRIBUTED
+ if self._distribution is None:
+ # hack: distribution.type is required to set distribution, which is defined in launcher.type
+ if (
+ isinstance(self.component, InternalComponent)
+ and self.component.launcher
+ and "type" in self.component.launcher
+ ):
+ self.distribution = {"type": self.component.launcher["type"]}
+ else:
+ raise ValueError(
+ "launcher.type must be specified in definition of DistributedComponent but got {}".format(
+ self.component
+ )
+ )
+
+ @property
+ def distribution(
+ self,
+ ) -> Union[PyTorchDistribution, MpiDistribution, TensorFlowDistribution, RayDistribution]:
+ """The distribution config of component, e.g. distribution={'type': 'mpi'}.
+
+ :return: The distribution config
+ :rtype: Union[PyTorchDistribution, MpiDistribution, TensorFlowDistribution, RayDistribution]
+ """
+ return self._distribution
+
+ @distribution.setter
+ def distribution(
+ self,
+ value: Union[Dict, PyTorchDistribution, TensorFlowDistribution, MpiDistribution, RayDistribution],
+ ):
+ if isinstance(value, dict):
+ dist_schema = DistributionField(unknown=INCLUDE)
+ value = dist_schema._deserialize(value=value, attr=None, data=None)
+ self._distribution = value
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ from .._schema.command import DistributedSchema
+
+ return DistributedSchema(context=context)
+
+ @classmethod
+ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+ return Command._picked_fields_from_dict_to_rest_object() + ["distribution"]
+
+ def _to_rest_object(self, **kwargs) -> dict:
+ rest_obj = super()._to_rest_object(**kwargs)
+ distribution = self.distribution._to_rest_object() if self.distribution else None # pylint: disable=no-member
+ rest_obj.update(
+ {
+ "distribution": get_rest_dict_for_node_attrs(distribution),
+ }
+ )
+ return rest_obj
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
new file mode 100644
index 00000000..e54c1906
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
@@ -0,0 +1,370 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, redefined-builtin
+# disable redefined-builtin to use id/type as argument name
+import os
+from contextlib import contextmanager
+from os import PathLike
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Union
+from uuid import UUID
+
+import yaml # type: ignore[import]
+from marshmallow import Schema
+
+from ... import Input, Output
+from ..._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties
+from ..._schema import PathAwareSchema
+from ..._utils._arm_id_utils import parse_name_label
+from ..._utils._asset_utils import IgnoreFile
+from ...constants._common import DefaultOpenEncoding
+from ...entities import Component
+from ...entities._assets import Code
+from ...entities._component._additional_includes import AdditionalIncludes, AdditionalIncludesMixin
+from ...entities._component.code import ComponentIgnoreFile
+from ...entities._job.distribution import DistributionConfiguration
+from ...entities._system_data import SystemData
+from ...entities._util import convert_ordered_dict_to_dict
+from ...entities._validation import MutableValidationResult
+from .._schema.component import InternalComponentSchema
+from ._input_outputs import InternalInput, InternalOutput
+from ._merkle_tree import create_merkletree
+from .code import InternalCode
+from .environment import InternalEnvironment
+from .node import InternalBaseNode
+
+_ADDITIONAL_INCLUDES_CONFIG_KEY = "additional_includes"
+_ADDITIONAL_INCLUDES_SUFFIX = ".additional_includes"
+
+
+class InternalComponent(Component, AdditionalIncludesMixin):
+ # pylint: disable=too-many-instance-attributes, too-many-locals
+ """Base class for internal component version, used to define an internal component. Recommended to create instance
+ with component_factory.
+
+ :param name: Name of the resource.
+ :type name: str
+ :param version: Version of the resource.
+ :type version: str
+ :param id: Global id of the resource, Azure Resource Manager ID.
+ :type id: str
+ :param type: Type of the command, supported is 'command'.
+ :type type: str
+ :param description: Description of the resource.
+ :type description: str
+ :param tags: Tag dictionary. Tags can be added, removed, and updated.
+ :type tags: dict
+ :param properties: Internal use only.
+ :type properties: dict
+ :param display_name: Display name of the component.
+ :type display_name: str
+ :param is_deterministic: Whether the component is deterministic.
+ :type is_deterministic: bool
+ :param inputs: Inputs of the component.
+ :type inputs: dict
+ :param outputs: Outputs of the component.
+ :type outputs: dict
+ :param yaml_str: The yaml string of the component.
+ :type yaml_str: str
+ :param _schema: Schema of the component.
+ :type _schema: str
+ :param creation_context: Creation metadata of the component.
+ :type creation_context: ~azure.ai.ml.entities.SystemData
+ """
+
+ def __init__(
+ self,
+ *,
+ _schema: Optional[str] = None,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ display_name: Optional[str] = None,
+ type: Optional[str] = None,
+ description: Optional[str] = None,
+ tags: Optional[Dict] = None,
+ is_deterministic: Optional[bool] = None,
+ successful_return_code: Optional[str] = None,
+ inputs: Optional[Dict] = None,
+ outputs: Optional[Dict] = None,
+ code: Optional[Union[str, os.PathLike]] = None,
+ environment: Optional[Dict] = None,
+ environment_variables: Optional[Dict] = None,
+ command: Optional[str] = None,
+ id: Optional[str] = None,
+ properties: Optional[Dict] = None,
+ yaml_str: Optional[str] = None,
+ creation_context: Optional[SystemData] = None,
+ scope: Optional[Dict] = None,
+ hemera: Optional[Dict] = None,
+ hdinsight: Optional[Dict] = None,
+ parallel: Optional[Dict] = None,
+ starlite: Optional[Dict] = None,
+ ae365exepool: Optional[Dict] = None,
+ launcher: Optional[Dict] = None,
+ datatransfer: Optional[Dict] = None,
+ aether: Optional[Dict] = None,
+ **kwargs,
+ ):
+ _type, self._type_label = parse_name_label(type)
+ super().__init__(
+ name=name,
+ version=version,
+ id=id,
+ type=_type,
+ description=description,
+ tags=tags,
+ properties=properties,
+ display_name=display_name,
+ is_deterministic=is_deterministic, # type: ignore[arg-type]
+ inputs=inputs,
+ outputs=outputs,
+ yaml_str=yaml_str,
+ _schema=_schema,
+ creation_context=creation_context,
+ **kwargs,
+ )
+ # Store original yaml
+ self._yaml_str = yaml_str
+ self._other_parameter = kwargs
+
+ self.successful_return_code = successful_return_code
+ self.code = code
+ self.environment = InternalEnvironment(**environment) if isinstance(environment, dict) else environment
+ self.environment_variables = environment_variables
+ # TODO: remove these to keep it a general component class
+ self.command = command
+ self.scope = scope
+ self.hemera = hemera
+ self.hdinsight = hdinsight
+ self.parallel = parallel
+ self.starlite = starlite
+ self.ae365exepool = ae365exepool
+ self.launcher = launcher
+ self.datatransfer = datatransfer
+ self.aether = aether
+
+ @classmethod
+ def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool):
+ component_io = {}
+ for name, port in io_dict.items():
+ if is_input:
+ component_io[name] = InternalInput._from_base(port)
+ else:
+ component_io[name] = InternalOutput._from_base(port)
+ return component_io
+
+ # region AdditionalIncludesMixin
+
+ @classmethod
+ def _read_additional_include_configs(cls, yaml_path: Path) -> List[str]:
+ """Read additional include configs from the additional includes file.
+ The name of the file is the same as the component spec file, with a suffix of ".additional_includes".
+ It can be either a yaml file or a text file:
+ 1. If it is a yaml file, yaml format of additional_includes looks like below:
+ ```
+ additional_includes:
+ - your/local/path
+ - type: artifact
+ organization: devops_organization
+ project: devops_project
+ feed: artifacts_feed_name
+ name: universal_package_name
+ version: package_version
+ scope: scope_type
+ ```
+ 2. If it is a text file, each line is a path to include. Note that artifact config is not supported
+ in this format.
+
+ :param yaml_path: The yaml path
+ :type yaml_path: Path
+ :return: The list of additional includes
+ :rtype: List[str]
+ """
+ additional_includes_config_path = yaml_path.with_suffix(_ADDITIONAL_INCLUDES_SUFFIX)
+ if additional_includes_config_path.is_file():
+ with open(additional_includes_config_path, encoding=DefaultOpenEncoding.READ) as f:
+ file_content = f.read()
+ try:
+ configs = yaml.safe_load(file_content)
+ if isinstance(configs, dict):
+ return configs.get(_ADDITIONAL_INCLUDES_CONFIG_KEY, [])
+ except Exception: # pylint: disable=W0718
+ # TODO: check if we should catch yaml.YamlError instead here
+ pass
+ return [line.strip() for line in file_content.splitlines(keepends=False) if len(line.strip()) > 0]
+ return []
+
+ @classmethod
+ def _get_additional_includes_field_name(cls) -> str:
+ # additional includes for internal components are configured by a file, which is not a field in the yaml
+ # return '*' as diagnostics yaml paths and override _get_all_additional_includes_configs.
+ return "*"
+
+ def _get_all_additional_includes_configs(self) -> List:
+ # internal components must have a source path
+ return self._read_additional_include_configs(Path(self._source_path)) # type: ignore[arg-type]
+ # TODO: Bug 2881943
+
+ def _get_base_path_for_code(self) -> Path:
+ # internal components must have a source path
+ return Path(self._source_path).parent # type: ignore[arg-type]
+ # TODO: Bug 2881943
+
+ def _get_origin_code_value(self) -> Union[str, PathLike, None]:
+ return super()._get_origin_code_value() or "."
+
+ # endregion
+
+ def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+ """Dump the component content into a sorted yaml string.
+
+ :return: The ordered dict
+ :rtype: Dict
+ """
+
+ obj = super()._to_ordered_dict_for_yaml_dump()
+ # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+ if "code" in obj:
+ if not self.code:
+ del obj["code"]
+ else:
+ obj["code"] = self.code
+ return obj
+
+ @property
+ def _additional_includes(self) -> AdditionalIncludes:
+ """This property is kept for compatibility with old mldesigner sdk.
+
+ :return: The additional includes
+ :rtype: AdditionalIncludes
+ """
+ obj = self._generate_additional_includes_obj()
+ from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes
+
+ obj.__class__ = InternalAdditionalIncludes
+ return obj
+
+ # region SchemaValidatableMixin
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ return InternalComponentSchema(context=context)
+
+ def _customized_validate(self) -> MutableValidationResult:
+ validation_result = super(InternalComponent, self)._customized_validate()
+ skip_path_validation = not self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+ validation_result
+ )
+ # resolving additional includes & update self._base_path can be dangerous,
+ # so we just skip path validation if additional includes is provided.
+ # note that there will still be client-side error on job submission (after code is resolved)
+ # if paths in environment are invalid
+ if isinstance(self.environment, InternalEnvironment):
+ validation_result.merge_with(
+ self.environment.validate(
+ self._base_path,
+ skip_path_validation=skip_path_validation,
+ ),
+ field_name="environment",
+ )
+ return validation_result
+
+ # endregion
+
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+ # put it here as distribution is shared by some components, e.g. command
+ distribution = obj.properties.component_spec.pop("distribution", None)
+ init_kwargs = super()._from_rest_object_to_init_params(obj)
+ if distribution:
+ init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution)
+ return init_kwargs
+
+ def _to_rest_object(self) -> ComponentVersion:
+ component: Union[Dict[Any, Any], List[Any]] = convert_ordered_dict_to_dict(self._to_dict())
+ component["_source"] = self._source # type: ignore[call-overload]
+ # TODO: 2883063
+
+ properties = ComponentVersionProperties(
+ component_spec=component,
+ description=self.description,
+ is_anonymous=self._is_anonymous,
+ properties=self.properties,
+ tags=self.tags,
+ )
+ result = ComponentVersion(properties=properties)
+ result.name = self.name
+ return result
+
+ @classmethod
+ def _get_snapshot_id(
+ cls,
+ code_path: Union[str, PathLike],
+ ignore_file: IgnoreFile,
+ ) -> str:
+ """Get the snapshot id of a component with specific working directory in ml-components. Use this as the name of
+ code asset to reuse steps in a pipeline job from ml-components runs.
+
+ :param code_path: The path of the working directory.
+ :type code_path: str
+ :param ignore_file: The ignore file of the snapshot.
+ :type ignore_file: IgnoreFile
+ :return: The snapshot id of a component in ml-components with code_path as its working directory.
+ :rtype: str
+ """
+ curr_root = create_merkletree(code_path, ignore_file.is_file_excluded)
+ snapshot_id = str(UUID(curr_root.hexdigest_hash[::4]))
+ return snapshot_id
+
+ @contextmanager # type: ignore[arg-type]
+ def _try_build_local_code(self) -> Iterable[Code]:
+ """Build final code when origin code is a local code.
+ Will merge code path with additional includes into a temp folder if additional includes is specified.
+ For internal components, file dependencies in environment will be resolved based on the final code.
+
+ :return: The code instance
+ :rtype: Iterable[Code]
+ """
+
+ tmp_code_dir: Path
+ # origin code value of internal component will never be None. check _get_origin_code_value for details
+ with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir:
+ # use absolute path in case temp folder & work dir are in different drive
+ tmp_code_dir = tmp_code_dir.absolute()
+
+ # file dependency in code will be read during internal environment resolution
+ # for example, docker file of the environment may be in additional includes;
+ # and it will be read then insert to the environment object during resolution.
+ # so we need to resolve environment based on the temporary code path
+ if isinstance(self.environment, InternalEnvironment):
+ self.environment.resolve(base_path=tmp_code_dir)
+
+ # additional includes config file itself should be ignored
+ rebased_ignore_file = ComponentIgnoreFile(
+ tmp_code_dir,
+ additional_includes_file_name=Path(self._source_path)
+ .with_suffix(_ADDITIONAL_INCLUDES_SUFFIX)
+ .name, # type: ignore[arg-type]
+ # TODO: Bug 2881943
+ )
+
+ # Use the snapshot id in ml-components as code name to enable anonymous
+ # component reuse from ml-component runs.
+ # calculate snapshot id here instead of inside InternalCode to ensure that
+ # snapshot id is calculated based on the built code path
+ yield InternalCode(
+ name=self._get_snapshot_id(
+ # use absolute path in case temp folder & work dir are in different drive
+ tmp_code_dir,
+ # this ignore-file should be rebased to the built code path
+ rebased_ignore_file,
+ ),
+ version="1",
+ base_path=self._base_path,
+ path=tmp_code_dir,
+ is_anonymous=True,
+ ignore_file=rebased_ignore_file,
+ )
+
+ def __call__(self, *args, **kwargs) -> InternalBaseNode:
+ return super(InternalComponent, self).__call__(*args, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py
new file mode 100644
index 00000000..673afeac
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py
@@ -0,0 +1,157 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from os import PathLike
+from pathlib import Path
+from typing import Dict, Optional, Union
+
+from ..._utils.utils import load_yaml
+from ...constants._common import FILE_PREFIX, DefaultOpenEncoding
+from ...entities._validation import MutableValidationResult, ValidationResultBuilder
+
+
+class InternalEnvironment:
+ # conda section
+ CONDA_DEPENDENCIES = "conda_dependencies"
+ CONDA_DEPENDENCIES_FILE = "conda_dependencies_file"
+ PIP_REQUIREMENTS_FILE = "pip_requirements_file"
+ DEFAULT_PYTHON_VERSION = "3.8.5"
+ # docker section
+ BUILD = "build"
+ DOCKERFILE = "dockerfile"
+
+ def __init__(
+ self,
+ docker: Optional[Dict] = None,
+ conda: Optional[Dict] = None,
+ os: Optional[str] = None,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ python: Optional[Dict] = None,
+ ):
+ self.docker = docker
+ self.conda = conda
+ self.os = os if os else "Linux"
+ self.name = name
+ self.version = version
+ self.python = python
+ self._docker_file_resolved = False
+
+ @staticmethod
+ def _parse_file_path(value: str) -> str:
+ return value[len(FILE_PREFIX) :] if value.startswith(FILE_PREFIX) else value
+
+ def _validate_conda_section(
+ self, base_path: Union[str, PathLike], skip_path_validation: bool
+ ) -> MutableValidationResult:
+ validation_result = ValidationResultBuilder.success()
+ if not self.conda:
+ return validation_result
+ dependencies_field_names = {self.CONDA_DEPENDENCIES, self.CONDA_DEPENDENCIES_FILE, self.PIP_REQUIREMENTS_FILE}
+ if len(set(self.conda) & dependencies_field_names) > 1:
+ validation_result.append_warning(
+ yaml_path="conda",
+ message="Duplicated declaration of dependencies, will honor in the order "
+ "conda_dependencies, conda_dependencies_file and pip_requirements_file.",
+ )
+ if self.conda.get(self.CONDA_DEPENDENCIES_FILE):
+ conda_dependencies_file = self.conda[self.CONDA_DEPENDENCIES_FILE]
+ if not skip_path_validation and not (Path(base_path) / conda_dependencies_file).is_file():
+ validation_result.append_error(
+ yaml_path=f"conda.{self.CONDA_DEPENDENCIES_FILE}",
+ message=f"Cannot find conda dependencies file: {conda_dependencies_file!r}",
+ )
+ if self.conda.get(self.PIP_REQUIREMENTS_FILE):
+ pip_requirements_file = self.conda[self.PIP_REQUIREMENTS_FILE]
+ if not skip_path_validation and not (Path(base_path) / pip_requirements_file).is_file():
+ validation_result.append_error(
+ yaml_path=f"conda.{self.PIP_REQUIREMENTS_FILE}",
+ message=f"Cannot find pip requirements file: {pip_requirements_file!r}",
+ )
+ return validation_result
+
+ def _validate_docker_section(
+ self, base_path: Union[str, PathLike], skip_path_validation: bool
+ ) -> MutableValidationResult:
+ validation_result = ValidationResultBuilder.success()
+ if not self.docker:
+ return validation_result
+ if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE):
+ return validation_result
+ dockerfile_file = self.docker[self.BUILD][self.DOCKERFILE]
+ dockerfile_file = self._parse_file_path(dockerfile_file)
+ if (
+ not self._docker_file_resolved
+ and not skip_path_validation
+ and not (Path(base_path) / dockerfile_file).is_file()
+ ):
+ validation_result.append_error(
+ yaml_path=f"docker.{self.BUILD}.{self.DOCKERFILE}",
+ message=f"Dockerfile not exists: {dockerfile_file}",
+ )
+ return validation_result
+
+ def validate(self, base_path: Union[str, PathLike], skip_path_validation: bool = False) -> MutableValidationResult:
+ """Validate the environment section.
+
+ This is a public method but won't be exposed to user given InternalEnvironment is an internal class.
+
+ :param base_path: The base path
+ :type base_path: Union[str, PathLike]
+ :param skip_path_validation: Whether to skip path validation. Defaults to False
+ :type skip_path_validation: bool
+ :return: The validation result
+ :rtype: MutableValidationResult
+ """
+ validation_result = ValidationResultBuilder.success()
+ if self.os is not None and self.os not in {"Linux", "Windows", "linux", "windows"}:
+ validation_result.append_error(
+ yaml_path="os",
+ message=f"Only support 'Linux' and 'Windows', but got {self.os!r}",
+ )
+ validation_result.merge_with(self._validate_conda_section(base_path, skip_path_validation))
+ validation_result.merge_with(self._validate_docker_section(base_path, skip_path_validation))
+ return validation_result
+
+ def _resolve_conda_section(self, base_path: Union[str, PathLike]) -> None:
+ if not self.conda:
+ return
+ if self.conda.get(self.CONDA_DEPENDENCIES_FILE):
+ conda_dependencies_file = self.conda.pop(self.CONDA_DEPENDENCIES_FILE)
+ self.conda[self.CONDA_DEPENDENCIES] = load_yaml(Path(base_path) / conda_dependencies_file)
+ return
+ if self.conda.get(self.PIP_REQUIREMENTS_FILE):
+ pip_requirements_file = self.conda.pop(self.PIP_REQUIREMENTS_FILE)
+ with open(Path(base_path) / pip_requirements_file, encoding=DefaultOpenEncoding.READ) as f:
+ pip_requirements = f.read().splitlines()
+ self.conda = {
+ self.CONDA_DEPENDENCIES: {
+ "name": "project_environment",
+ "dependencies": [
+ f"python={self.DEFAULT_PYTHON_VERSION}",
+ {
+ "pip": pip_requirements,
+ },
+ ],
+ }
+ }
+ return
+
+ def _resolve_docker_section(self, base_path: Union[str, PathLike]) -> None:
+ if not self.docker:
+ return
+ if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE):
+ return
+ dockerfile_file = self.docker[self.BUILD][self.DOCKERFILE]
+ if not dockerfile_file.startswith(FILE_PREFIX):
+ return
+ dockerfile_file = self._parse_file_path(dockerfile_file)
+ with open(Path(base_path) / dockerfile_file, "r", encoding=DefaultOpenEncoding.READ) as f:
+ self.docker[self.BUILD][self.DOCKERFILE] = f.read()
+ self._docker_file_resolved = True
+ return
+
+ def resolve(self, base_path: Union[str, PathLike]) -> None:
+ self._resolve_conda_section(base_path)
+ self._resolve_docker_section(base_path)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py
new file mode 100644
index 00000000..89fc032c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py
@@ -0,0 +1,338 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+
+from enum import Enum
+from typing import Dict, List, Optional, Union
+
+from marshmallow import Schema
+
+from ... import Input, Output
+from ..._schema import PathAwareSchema
+from ...constants import JobType
+from ...entities import Component, Job
+from ...entities._builders import BaseNode
+from ...entities._job.pipeline._io import NodeInput, NodeOutput, PipelineInput
+from ...entities._util import convert_ordered_dict_to_dict
+from .._schema.component import NodeType
+
+
+class InternalBaseNode(BaseNode):
+ """Base class for node of internal components in pipeline. Can be instantiated directly.
+
+ :param type: Type of pipeline node
+ :type type: str
+ :param component: Id or instance of the component version to be run for the step
+ :type component: Union[Component, str]
+ :param inputs: Inputs to the node.
+ :type inputs: Dict[str, Union[Input, str, bool, int, float, Enum, dict]]
+ :param outputs: Mapping of output data bindings used in the job.
+ :type outputs: Dict[str, Union[str, Output, dict]]
+ :param properties: The job property dictionary.
+ :type properties: dict[str, str]
+ :param compute: Compute definition containing the compute information for the step
+ :type compute: str
+ """
+
+ def __init__(
+ self,
+ *,
+ type: str = JobType.COMPONENT, # pylint: disable=redefined-builtin
+ component: Union[Component, str],
+ inputs: Optional[
+ Dict[
+ str,
+ Union[
+ PipelineInput,
+ NodeOutput,
+ Input,
+ str,
+ bool,
+ int,
+ float,
+ Enum,
+ "Input",
+ ],
+ ]
+ ] = None,
+ outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
+ properties: Optional[Dict] = None,
+ compute: Optional[str] = None,
+ **kwargs,
+ ):
+ kwargs.pop("type", None)
+ BaseNode.__init__(
+ self,
+ type=type,
+ component=component, # type: ignore[arg-type]
+ # TODO: Bug 2881892
+ inputs=inputs,
+ outputs=outputs,
+ compute=compute,
+ properties=properties,
+ **kwargs,
+ )
+
+ @property
+ def _skip_required_compute_missing_validation(self) -> bool:
+ return True
+
+ def _to_node(self, context: Optional[Dict] = None, **kwargs) -> BaseNode:
+ return self
+
+ def _to_component(self, context: Optional[Dict] = None, **kwargs) -> Component:
+ return self.component
+
+ def _to_job(self) -> Job:
+ raise RuntimeError("Internal components doesn't support to job")
+
+ @classmethod
+ def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "Job":
+ raise RuntimeError("Internal components doesn't support load from dict")
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ from .._schema.node import InternalBaseNodeSchema
+
+ return InternalBaseNodeSchema(context=context)
+
+ @property
+ def component(self) -> Component:
+ return self._component
+
+ def _to_rest_inputs(self) -> Dict[str, Dict]:
+ rest_dataset_literal_inputs = super(InternalBaseNode, self)._to_rest_inputs()
+ for input_name, input_value in self.inputs.items():
+ # hack: remove unfilled input from rest object instead a default input of {"job_input_type": "literal"}
+ # note that this hack is not always effective as _data will be set to Input() when visiting input_value.type
+ if (
+ isinstance(input_value, NodeInput)
+ and input_value._data is None
+ and input_name in rest_dataset_literal_inputs
+ ):
+ del rest_dataset_literal_inputs[input_name]
+ return rest_dataset_literal_inputs
+
+ def _to_rest_object(self, **kwargs) -> dict:
+ base_dict = super(InternalBaseNode, self)._to_rest_object(**kwargs)
+ for key in ["name", "display_name", "tags"]:
+ if key in base_dict:
+ del base_dict[key]
+ for key in ["computeId"]:
+ if key in base_dict and base_dict[key] is None:
+ del base_dict[key]
+
+ base_dict.update(
+ convert_ordered_dict_to_dict(
+ {
+ "componentId": self._get_component_id(),
+ "type": self.type,
+ }
+ )
+ )
+ return base_dict
+
+
+class DataTransfer(InternalBaseNode):
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(DataTransfer, self).__init__(type=NodeType.DATA_TRANSFER, **kwargs)
+
+
+class HDInsight(InternalBaseNode):
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(HDInsight, self).__init__(type=NodeType.HDI, **kwargs)
+ self._init = True
+ self._compute_name: str = kwargs.pop("compute_name", None)
+ self._queue: str = kwargs.pop("queue", None)
+ self._driver_memory: str = kwargs.pop("driver_memory", None)
+ self._driver_cores: int = kwargs.pop("driver_cores", None)
+ self._executor_memory: str = kwargs.pop("executor_memory", None)
+ self._executor_cores: int = kwargs.pop("executor_cores", None)
+ self._number_executors: int = kwargs.pop("number_executors", None)
+ self._conf: Union[dict, str] = kwargs.pop("conf", None)
+ self._hdinsight_spark_job_name: str = kwargs.pop("hdinsight_spark_job_name", None)
+ self._init = False
+
+ @property
+ def compute_name(self) -> str:
+ """Name of the compute to be used.
+
+ :return: Compute name
+ :rtype: str
+ """
+ return self._compute_name
+
+ @compute_name.setter
+ def compute_name(self, value: str):
+ self._compute_name = value
+
+ @property
+ def queue(self) -> str:
+ """The name of the YARN queue to which submitted.
+
+ :return: YARN queue name
+ :rtype: str
+ """
+ return self._queue
+
+ @queue.setter
+ def queue(self, value: str):
+ self._queue = value
+
+ @property
+ def driver_memory(self) -> str:
+ """Amount of memory to use for the driver process.
+
+ It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte,
+ megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g.
+
+ :return: Amount of memory to use for the driver process
+ :rtype: str
+ """
+ return self._driver_memory
+
+ @driver_memory.setter
+ def driver_memory(self, value: str):
+ self._driver_memory = value
+
+ @property
+ def driver_cores(self) -> int:
+ """Number of cores to use for the driver process.
+
+ :return: Number of cores to use for the driver process.
+ :rtype: int
+ """
+ return self._driver_cores
+
+ @driver_cores.setter
+ def driver_cores(self, value: int):
+ self._driver_cores = value
+
+ @property
+ def executor_memory(self) -> str:
+ """Amount of memory to use per executor process.
+
+ It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte,
+ megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g.
+
+ :return: The executor memory
+ :rtype: str
+ """
+ return self._executor_memory
+
+ @executor_memory.setter
+ def executor_memory(self, value: str):
+ self._executor_memory = value
+
+ @property
+ def executor_cores(self) -> int:
+ """Number of cores to use for each executor.
+
+ :return: The number of cores to use for each executor
+ :rtype: int
+ """
+ return self._executor_cores
+
+ @executor_cores.setter
+ def executor_cores(self, value: int):
+ self._executor_cores = value
+
+ @property
+ def number_executors(self) -> int:
+ """Number of executors to launch for this session.
+
+ :return: The number of executors to launch
+ :rtype: int
+ """
+ return self._number_executors
+
+ @number_executors.setter
+ def number_executors(self, value: int):
+ self._number_executors = value
+
+ @property
+ def conf(self) -> Union[dict, str]:
+ """Spark configuration properties.
+
+ :return: The spark configuration properties.
+ :rtype: Union[dict, str]
+ """
+ return self._conf
+
+ @conf.setter
+ def conf(self, value: Union[dict, str]):
+ self._conf = value
+
+ @property
+ def hdinsight_spark_job_name(self) -> str:
+ """
+
+ :return: The name of this session
+ :rtype: str
+ """
+ return self._hdinsight_spark_job_name
+
+ @hdinsight_spark_job_name.setter
+ def hdinsight_spark_job_name(self, value: str):
+ self._hdinsight_spark_job_name = value
+
+ @classmethod
+ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+ return [
+ "compute_name",
+ "queue",
+ "driver_cores",
+ "executor_memory",
+ "conf",
+ "hdinsight_spark_job_name",
+ "driver_memory",
+ "executor_cores",
+ "number_executors",
+ ]
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ from .._schema.node import HDInsightSchema
+
+ return HDInsightSchema(context=context)
+
+
+class Starlite(InternalBaseNode):
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(Starlite, self).__init__(type=NodeType.STARLITE, **kwargs)
+
+
+class Pipeline(InternalBaseNode):
+ # this is only for using registered pipeline component
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(Pipeline, self).__init__(type=NodeType.PIPELINE, **kwargs)
+
+
+class Hemera(InternalBaseNode):
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(Hemera, self).__init__(type=NodeType.HEMERA, **kwargs)
+
+
+class Ae365exepool(InternalBaseNode):
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(Ae365exepool, self).__init__(type=NodeType.AE365EXEPOOL, **kwargs)
+
+
+class Sweep(InternalBaseNode):
+ # this is not in our scope
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(Sweep, self).__init__(type=NodeType.SWEEP, **kwargs)
+
+
+class AetherBridge(InternalBaseNode):
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(AetherBridge, self).__init__(type=NodeType.AETHER_BRIDGE, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
new file mode 100644
index 00000000..86fa8939
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
@@ -0,0 +1,114 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import List, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from ...entities import BatchRetrySettings
+from .._schema.component import NodeType
+from ..entities import Command
+
+
+class Parallel(Command):
+ """Node of scope components in pipeline with specific run settings."""
+
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(Parallel, self).__init__(type=NodeType.PARALLEL, **kwargs)
+ self._init = True
+ self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None)
+ self._error_threshold = kwargs.pop("error_threshold", None)
+ self._mini_batch_size = kwargs.pop("mini_batch_size", None)
+ self._partition_keys = kwargs.pop("partition_keys", None)
+ self._logging_level = kwargs.pop("logging_level", None)
+ self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings())
+ self._init = False
+
+ @property
+ def max_concurrency_per_instance(self) -> int:
+ """The max parallellism that each compute instance has.
+
+ :return: The max concurrence per compute instance
+ :rtype: int
+ """
+ return self._max_concurrency_per_instance
+
+ @max_concurrency_per_instance.setter
+ def max_concurrency_per_instance(self, value: int):
+ self._max_concurrency_per_instance = value
+
+ @property
+ def error_threshold(self) -> int:
+ """The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored
+ during processing.
+
+ If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input
+ rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all
+ failures during processing.
+
+ :return: The error threshold
+ :rtype: int
+ """
+ return self._error_threshold
+
+ @error_threshold.setter
+ def error_threshold(self, value: int):
+ self._error_threshold = value
+
+ @property
+ def mini_batch_size(self) -> int:
+ """The number of records to be sent to run() method for each mini-batch.
+
+ :return: The batch size
+ :rtype: int
+ """
+ return self._mini_batch_size
+
+ @mini_batch_size.setter
+ def mini_batch_size(self, value: int):
+ self._mini_batch_size = value
+
+ @property
+ def logging_level(self) -> str:
+ """A string of the logging level name.
+
+ :return: The loggin level
+ :rtype: str
+ """
+ return self._logging_level
+
+ @logging_level.setter
+ def logging_level(self, value: str):
+ self._logging_level = value
+
+ @property
+ def retry_settings(self) -> BatchRetrySettings:
+ """Parallel job run failed retry.
+
+ :return: The retry settings
+ :rtype: BatchRetrySettings
+ """
+ return self._retry_settings
+
+ @retry_settings.setter
+ def retry_settings(self, value: BatchRetrySettings):
+ self._retry_settings = value
+
+ @classmethod
+ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+ return Command._picked_fields_from_dict_to_rest_object() + [
+ "max_concurrency_per_instance",
+ "error_threshold",
+ "logging_level",
+ "retry_settings",
+ "mini_batch_size",
+ ]
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ from .._schema.command import ParallelSchema
+
+ return ParallelSchema(context=context)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py
new file mode 100644
index 00000000..76c94c17
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py
@@ -0,0 +1,29 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from .ai_super_computer_configuration import (
+ AISuperComputerConfiguration,
+ AISuperComputerScalePolicy,
+ AISuperComputerStorageReferenceConfiguration,
+)
+from .itp_configuration import (
+ ITPConfiguration,
+ ITPInteractiveConfiguration,
+ ITPPriorityConfiguration,
+ ITPResourceConfiguration,
+ ITPRetrySettings,
+)
+from .target_selector import TargetSelector
+
+__all__ = [
+ "ITPInteractiveConfiguration",
+ "ITPPriorityConfiguration",
+ "ITPResourceConfiguration",
+ "ITPRetrySettings",
+ "ITPConfiguration",
+ "TargetSelector",
+ "AISuperComputerConfiguration",
+ "AISuperComputerScalePolicy",
+ "AISuperComputerStorageReferenceConfiguration",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py
new file mode 100644
index 00000000..89f338ca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py
@@ -0,0 +1,194 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, List, Optional
+
+from ....entities._job.job_resource_configuration import BaseProperty
+
+
+class PascalCaseProperty(BaseProperty):
+ _KEY_MAPPING: Dict[str, Any] = {}
+
+ def items(self):
+ result = []
+ for key, value in super().items():
+ if key.lower() in self._KEY_MAPPING:
+ key = self._KEY_MAPPING[key.lower()]
+ result.append((key, value))
+ return result
+
+
+class AISuperComputerStorageReferenceConfiguration(PascalCaseProperty): # pylint: disable=name-too-long
+ _KEY_MAPPING = {
+ "container_name": "ContainerName",
+ "relative_path": "RelativePath",
+ }
+
+ def __init__(
+ self,
+ container_name: str,
+ relative_path: str,
+ **kwargs,
+ ):
+ """
+ :param container_name: The name of the ai-super-computer storage container.
+ :type container_name: str
+ :param relative_path: The path on the ai-super-computer storage container.
+ :type relative_path: str
+ """
+ super().__init__(**kwargs)
+ self.container_name = container_name
+ self.relative_path = relative_path
+
+
+class AISuperComputerScalePolicy(PascalCaseProperty):
+ _KEY_MAPPING = {
+ "auto_scale_instance_type_count_set": "AutoScaleInstanceTypeCountSet",
+ "auto_scale_interval_in_sec": "AutoScaleIntervalInSec",
+ "max_instance_type_count": "MaxInstanceTypeCount",
+ "min_instance_type_count": "MinInstanceTypeCount",
+ }
+
+ def __init__(
+ self,
+ auto_scale_instance_type_count_set: Optional[List[int]] = None,
+ auto_scale_interval_in_sec: Optional[int] = None,
+ max_instance_type_count: Optional[int] = None,
+ min_instance_type_count: Optional[int] = None,
+ **kwargs,
+ ):
+ """
+ :param auto_scale_instance_type_count_set: The list of instance type counts available
+ for elastically scaling the job. Assume currentInstanceTypeCount = 4 and
+ autoScaleInstanceTypeCountSet = [2,4,8], the job will automatically scale down as 8->4->2
+ when less capacity is available, and scale up as 2->4->8 when more capacity is available.
+ The value should be a list of integers in ascending order.
+ :type auto_scale_instance_type_count_set: List[int]
+ :param auto_scale_interval_in_sec: The minimum interval in seconds between job autoscaling.
+ You are recommended to set the autoScaleIntervalInSec longer than the checkpoint interval,
+ to make sure at least one checkpoint is saved before auto-scaling of the job.
+ :type auto_scale_interval_in_sec: int
+ :param max_instance_type_count: The maximum instance type count.
+ :type max_instance_type_count: int
+ :param min_instance_type_count: The minimum instance type count.
+ :type min_instance_type_count: int
+ """
+ super().__init__(**kwargs)
+ self.auto_scale_instance_type_count_set = auto_scale_instance_type_count_set
+ self.auto_scale_interval_in_sec = auto_scale_interval_in_sec
+ self.max_instance_type_count = max_instance_type_count
+ self.min_instance_type_count = min_instance_type_count
+
+
+class AISuperComputerConfiguration(PascalCaseProperty): # pylint: disable=too-many-instance-attributes
+ """A class to manage AI Super Computer Configuration."""
+
+ _KEY_MAPPING = {
+ "instance_type": "InstanceType",
+ "instance_types": "InstanceTypes",
+ "image_version": "ImageVersion",
+ "location": "Location",
+ "locations": "Locations",
+ "ai_super_computer_storage_data": "AISuperComputerStorageData",
+ "interactive": "Interactive",
+ "scale_policy": "ScalePolicy",
+ "virtual_cluster_arm_id": "VirtualClusterArmId",
+ "tensorboard_log_directory": "TensorboardLogDirectory",
+ "ssh_public_key": "SSHPublicKey",
+ "ssh_public_keys": "SSHPublicKeys",
+ "enable_azml_int": "EnableAzmlInt",
+ "priority": "Priority",
+ "sla_tier": "SLATier",
+ "suspend_on_idle_time_hours": "SuspendOnIdleTimeHours",
+ "user_alias": "UserAlias",
+ }
+
+ def __init__(
+ self,
+ instance_type: Optional[str] = None,
+ instance_types: Optional[List[str]] = None,
+ image_version: Optional[str] = None,
+ location: Optional[str] = None,
+ locations: Optional[List[str]] = None,
+ ai_super_computer_storage_data: Optional[Dict[str, AISuperComputerStorageReferenceConfiguration]] = None,
+ interactive: Optional[bool] = None,
+ scale_policy: Optional[AISuperComputerScalePolicy] = None,
+ virtual_cluster_arm_id: Optional[str] = None,
+ tensorboard_log_directory: Optional[str] = None,
+ ssh_public_key: Optional[str] = None,
+ ssh_public_keys: Optional[List[str]] = None,
+ enable_azml_int: Optional[bool] = None,
+ priority: Optional[str] = None,
+ sla_tier: Optional[str] = None,
+ suspend_on_idle_time_hours: Optional[int] = None,
+ user_alias: Optional[str] = None,
+ **kwargs,
+ ):
+ """
+ :param instance_type: The class of compute to be used. The list of instance types is
+ available in https://singularitydocs.azurewebsites.net/docs/overview/instance_types/
+ :type instance_type: str
+ :param instance_types: The class of compute to be used. The list of instance types is
+ available in https://singularitydocs.azurewebsites.net/docs/overview/instance_types/
+ :type instance_types: List[str]
+ :param image_version: The image to use in ai-super-computer. Currently only a limited set of predefined
+ images are supported.
+ :type image_version: str
+ :param location: The location (region) where the job will run. The workspace region is used
+ if neither location nor locations is specified.
+ :type location: str
+ :param locations: The location (region) where the job will run. The workspace region is used
+ if neither location nor locations is specified.
+ :type locations: List[str]
+ :param ai_super_computer_storage_data: All of the AI SuperComputer storage data sources to
+ be made available to the run based on the configurations.
+ :type ai_super_computer_storage_data: Dict[str, AISuperComputerStorageReferenceConfiguration]
+ :param interactive: Specifies whether the job should be interactive. Interactive jobs will
+ start the requested nodes, but not run a command.
+ :type interactive: bool
+ :param scale_policy: The elasticity options for a job. By leveraging elastic training,
+ the job will automatically scale up when there is extra capacity available,
+ and automatically scale down when resources are gradually called back.
+ :type scale_policy: AISuperComputerScalePolicy
+ :param virtual_cluster_arm_id: The ARM Resource Id for the Virtual Cluster to submit the
+ job to.
+ :type virtual_cluster_arm_id: str
+ :param tensorboard_log_directory: The directory where the Tensorboard logs will be written.
+ :type tensorboard_log_directory: str
+ :param ssh_public_key: The SSH Public Key to use when enabling SSH access to the job.
+ If not specified, username/password auth will be enabled.
+ :type ssh_public_key: str
+ :param ssh_public_keys: The SSH Public Key to use when enabling SSH access to the job.
+ If not specified, username/password auth will be enabled.
+ :type ssh_public_keys: List[str]
+ :param enable_azml_int: Specifies whether the job should include the azml_int utility
+ :type enable_azml_int: bool
+ :param priority: The priority of the job. The default value is Medium.
+ :type priority: str
+ :param sla_tier: The SLA tier of the job. The default value is Standard.
+ :type sla_tier: str
+ :param suspend_on_idle_time_hours: Minimum idle time before run gets automatically suspended
+ (in hours).
+ :type suspend_on_idle_time_hours: int
+ :param user_alias: User alias, used for naming mount paths.
+ :type user_alias: str
+ """
+ super().__init__(**kwargs)
+ self.instance_type = instance_type
+ self.instance_types = instance_types
+ self.image_version = image_version
+ self.location = location
+ self.locations = locations
+ self.ai_super_computer_storage_data = ai_super_computer_storage_data
+ self.interactive = interactive
+ self.scale_policy = scale_policy
+ self.virtual_cluster_arm_id = virtual_cluster_arm_id
+ self.tensorboard_log_directory = tensorboard_log_directory
+ self.ssh_public_key = ssh_public_key
+ self.ssh_public_keys = ssh_public_keys
+ self.enable_azml_int = enable_azml_int
+ self.priority = priority
+ self.sla_tier = sla_tier
+ self.suspend_on_idle_time_hours = suspend_on_idle_time_hours
+ self.user_alias = user_alias
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py
new file mode 100644
index 00000000..8868b33b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py
@@ -0,0 +1,137 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import List, Optional
+
+from ....entities._job.job_resource_configuration import BaseProperty
+
+
+class ITPResourceConfiguration(BaseProperty):
+ """ITP resource configuration."""
+
+ def __init__(
+ self,
+ gpu_count: Optional[int] = None,
+ cpu_count: Optional[int] = None,
+ memory_request_in_gb: Optional[int] = None,
+ **kwargs
+ ):
+ """
+ :param gpu_count: Gpu count Defines how many gpu cores a single node gpu job will use.
+ Default value is 1.
+ :type gpu_count: int
+ :param cpu_count: Cpu count defines how many cpu cores that a single node cpu job will use.
+ Default value is 1.
+ :type cpu_count: int
+ :param memory_request_in_gb: Memory request defines how much GB memory a single node job
+ will request. Default value is 0 which means we will automatically calculate it for user.
+ :type memory_request_in_gb: int
+ """
+ super().__init__(**kwargs)
+ self.gpu_count = gpu_count
+ self.cpu_count = cpu_count
+ self.memory_request_in_gb = memory_request_in_gb
+
+
+class ITPPriorityConfiguration(BaseProperty):
+ """ITP priority configuration."""
+
+ def __init__(
+ self,
+ job_priority: Optional[int] = None,
+ is_preemptible: Optional[bool] = None,
+ node_count_set: Optional[List[int]] = None,
+ scale_interval: Optional[int] = None,
+ **kwargs
+ ):
+ """
+ :param job_priority: The priority of a job. Default value is 200. User can set it to
+ 100~200. Any value larger than 200 or less than 100 will be treated as 200.
+ in azureml.components
+ :type job_priority: int
+ :param is_preemptible: Whether to preempt extra compute resources beyond the VC quota.
+ Default value is false.
+ in azureml.components
+ :type is_preemptible: bool
+ :param node_count_set: Node count set determines how compute auto-scale nodes. The value
+ should be a list of integers in ascending order. And Only available when IsPreemptible is
+ true.
+ :type node_count_set: List[int]
+ :param scale_interval: Scale interval in min.
+ :type scale_interval: int
+ """
+ super().__init__(**kwargs)
+ self.job_priority = job_priority
+ self.is_preemptible = is_preemptible
+ self.node_count_set = node_count_set
+ self.scale_interval = scale_interval
+
+
+class ITPInteractiveConfiguration(BaseProperty):
+ """ITP interactive configuration."""
+
+ def __init__(
+ self,
+ is_ssh_enabled: Optional[bool] = None,
+ ssh_public_key: Optional[str] = None,
+ is_i_python_enabled: Optional[bool] = None,
+ is_tensor_board_enabled: Optional[bool] = None,
+ interactive_port: Optional[int] = None,
+ **kwargs
+ ):
+ """
+ :param is_ssh_enabled: Whether to enable SSH for interactive development.
+ Default value is false.
+ :type is_ssh_enabled: bool
+ :param ssh_public_key: SSH public key.
+ :type ssh_public_key: str
+ :param is_i_python_enabled: Is iPython enabled.
+ :type is_i_python_enabled: bool
+ :param is_tensor_board_enabled: Whether to enable TensorBoard. Default value is false.
+
+ :type is_tensor_board_enabled: bool
+ :param interactive_port: Allows user to specify a different interactive port. Available
+ value from 40000 to 49999.
+ :type interactive_port: int
+ """
+ super().__init__(**kwargs)
+ self.is_ssh_enabled = is_ssh_enabled
+ self.ssh_public_key = ssh_public_key
+ self.is_i_python_enabled = is_i_python_enabled
+ self.is_tensor_board_enabled = is_tensor_board_enabled
+ self.interactive_port = interactive_port
+
+
+class ITPRetrySettings(BaseProperty):
+ def __init__(self, max_retry_count=None, **kwargs):
+ super().__init__(**kwargs)
+ self.max_retry_count = max_retry_count
+
+
+class ITPConfiguration(BaseProperty):
+ """ITP configuration."""
+
+ def __init__(
+ self,
+ resource_configuration: Optional[ITPResourceConfiguration] = None,
+ priority_configuration: Optional[ITPPriorityConfiguration] = None,
+ interactive_configuration: Optional[ITPInteractiveConfiguration] = None,
+ retry: Optional[ITPRetrySettings] = None,
+ **kwargs
+ ):
+ """
+ :param resource_configuration: Resource requirement for the compute.
+
+ :type resource_configuration: ITPResourceConfiguration
+ :param priority_configuration: Priority requirement for the compute.
+
+ :type priority_configuration: ITPPriorityConfiguration
+ :param interactive_configuration: Interactive configuration when trying to access the
+ compute.
+ :type interactive_configuration: ITPInteractiveConfiguration
+ """
+ self.resource_configuration = resource_configuration or ITPResourceConfiguration()
+ self.priority_configuration = priority_configuration or ITPPriorityConfiguration()
+ self.interactive_configuration = interactive_configuration or ITPInteractiveConfiguration()
+ self.retry = retry or ITPRetrySettings()
+ super().__init__(**kwargs)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py
new file mode 100644
index 00000000..92f72db7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py
@@ -0,0 +1,47 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import List, Optional
+
+from ....entities._job.job_resource_configuration import BaseProperty
+
+
+class TargetSelector(BaseProperty):
+ """Compute target selector."""
+
+ def __init__(
+ self,
+ compute_type: str,
+ instance_types: Optional[List[str]] = None,
+ regions: Optional[List[str]] = None,
+ my_resource_only: Optional[bool] = None,
+ allow_spot_vm: Optional[bool] = None,
+ **kwargs,
+ ):
+ """
+ :param compute_type: Compute type that target selector could route job to.
+ Example value: AmlCompute, AmlK8s.
+ :type compute_type: str
+ :param instance_types: List of instance_type that job could use. If no instance_types sre
+ specified, all sizes are allowed. Note instance_types here only contains VM SKU.
+ Example value: ["STANDARD_D2_V2", "ND24rs_v3"]. Note, this field is case sensitive.
+ :type instance_types: List[str]
+ :param regions: List of regions that would like to submit job to.
+ If no regions are specified, all regions are allowed. Example value: ["eastus"].
+ Currently it only works for ITP.
+ :type regions: List[str]
+ :param my_resource_only: Flag to control whether the job should be sent to the cluster
+ owned by user. If False, target selector may send the job to shared cluster. Currently it
+ only works for ITP.
+ :type my_resource_only: bool
+ :param allow_spot_vm: Flag to enable target selector service to send job to low priority VM.
+ Currently it only works for ITP.
+ :type allow_spot_vm: bool
+ """
+ super().__init__(**kwargs)
+ self.compute_type = compute_type
+ self.instance_types = instance_types
+ self.regions = regions
+ self.my_resource_only = my_resource_only
+ self.allow_spot_vm = allow_spot_vm
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py
new file mode 100644
index 00000000..9965d69f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py
@@ -0,0 +1,131 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import List, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from .._schema.component import NodeType
+from ..entities.node import InternalBaseNode
+
+
+class Scope(InternalBaseNode):
+ """Node of scope components in pipeline with specific run settings."""
+
+ def __init__(self, **kwargs):
+ kwargs.pop("type", None)
+ super(Scope, self).__init__(type=NodeType.SCOPE, **kwargs)
+ self._init = True
+ self._adla_account_name = kwargs.pop("adla_account_name", None)
+ self._scope_param = kwargs.pop("scope_param", None)
+ self._custom_job_name_suffix = kwargs.pop("custom_job_name_suffix", None)
+ self._priority = kwargs.pop("priority", None)
+ self._auto_token = kwargs.pop("auto_token", None)
+ self._tokens = kwargs.pop("tokens", None)
+ self._vcp = kwargs.pop("vcp", None)
+ self._init = False
+
+ @property
+ def adla_account_name(self) -> str:
+ """The ADLA account name to use for the scope job.
+
+ :return: ADLA account name
+ :rtype: str
+ """
+ return self._adla_account_name
+
+ @adla_account_name.setter
+ def adla_account_name(self, value: str):
+ self._adla_account_name = value
+
+ @property
+ def scope_param(self) -> str:
+ """nebula command used when submit the scope job.
+
+ :return: The nebula command
+ :rtype: str
+ """
+ return self._scope_param
+
+ @scope_param.setter
+ def scope_param(self, value: str):
+ self._scope_param = value
+
+ @property
+ def custom_job_name_suffix(self) -> str:
+ """Optional string to append to scope job name.
+
+ :return: The custom suffix
+ :rtype: str
+ """
+ return self._custom_job_name_suffix
+
+ @custom_job_name_suffix.setter
+ def custom_job_name_suffix(self, value: str):
+ self._custom_job_name_suffix = value
+
+ @property
+ def priority(self) -> int:
+ """scope job priority.
+
+ If set priority in scope_param, will override this setting.
+
+ :return: The job priority
+ :rtype: int
+ """
+ return self._priority
+
+ @priority.setter
+ def priority(self, value: int):
+ self._priority = value
+
+ @property
+ def auto_token(self) -> int:
+ """A predictor for estimating the peak resource usage of scope job.
+
+ :return: auto token
+ :rtype: int
+ """
+ return self._auto_token
+
+ @auto_token.setter
+ def auto_token(self, value: int):
+ self._auto_token = value
+
+ @property
+ def tokens(self) -> int:
+ """Standard token allocation.
+
+ :return: The token allocation
+ :rtype: int
+ """
+ return self._tokens
+
+ @tokens.setter
+ def tokens(self, value: int):
+ self._tokens = value
+
+ @property
+ def vcp(self) -> float:
+ """Standard VC percent allocation; should be a float between 0 and 1.
+
+ :return: The VC allocation
+ :rtype: float
+ """
+ return self._vcp
+
+ @vcp.setter
+ def vcp(self, value: float):
+ self._vcp = value
+
+ @classmethod
+ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+ return ["custom_job_name_suffix", "scope_param", "adla_account_name", "priority", "auto_token", "tokens", "vcp"]
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ from .._schema.node import ScopeSchema
+
+ return ScopeSchema(context=context)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py
new file mode 100644
index 00000000..345fa5f2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py
@@ -0,0 +1,192 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Dict, List, Optional, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from ...constants._job.job import RestSparkConfKey
+from ...entities import Environment, SparkJobEntry
+from ...entities._job.parameterized_spark import DUMMY_IMAGE, ParameterizedSpark
+from ...entities._job.spark_job_entry_mixin import SparkJobEntryMixin
+from .._schema.component import InternalSparkComponentSchema
+from ..entities import InternalComponent
+from .environment import InternalEnvironment
+
+
+class InternalSparkComponent(
+ InternalComponent, ParameterizedSpark, SparkJobEntryMixin
+): # pylint: disable=too-many-instance-attributes, too-many-ancestors
+ """Internal Spark Component
+ This class is used to handle internal spark component.
+ It can be loaded from internal spark component yaml or from rest object of an internal spark component.
+ But after loaded, its structure will be the same as spark component.
+ """
+
+ def __init__(
+ self,
+ entry: Union[Dict[str, str], SparkJobEntry, None] = None,
+ py_files: Optional[List[str]] = None,
+ jars: Optional[List[str]] = None,
+ files: Optional[List[str]] = None,
+ archives: Optional[List[str]] = None,
+ driver_cores: Optional[int] = None,
+ driver_memory: Optional[str] = None,
+ executor_cores: Optional[int] = None,
+ executor_memory: Optional[str] = None,
+ executor_instances: Optional[int] = None,
+ dynamic_allocation_enabled: Optional[bool] = None,
+ dynamic_allocation_min_executors: Optional[int] = None,
+ dynamic_allocation_max_executors: Optional[int] = None,
+ conf: Optional[Dict[str, str]] = None,
+ args: Optional[str] = None,
+ **kwargs,
+ ):
+ SparkJobEntryMixin.__init__(self, entry=entry, **kwargs)
+ # environment.setter has been overridden in ParameterizedSpark, so we need to pop it out here
+ environment = kwargs.pop("environment", None)
+ InternalComponent.__init__(self, **kwargs)
+ # Pop it to avoid passing multiple values for code in ParameterizedSpark.__init__
+ code = kwargs.pop("code", None)
+ ParameterizedSpark.__init__(
+ self,
+ code=self.base_path,
+ entry=entry,
+ py_files=py_files,
+ jars=jars,
+ files=files,
+ archives=archives,
+ conf=conf,
+ environment=environment,
+ args=args,
+ **kwargs,
+ )
+ self.code = code
+ # For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf.
+ # If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent
+ # verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node
+ # in pipeline sdk
+ conf = conf or {}
+ self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None)
+ self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None)
+ self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None)
+ self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None)
+ self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None)
+ self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None
+ )
+ self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None
+ )
+ self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get(
+ RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None
+ )
+
+ self.conf = conf
+ self.args = args
+
+ @classmethod
+ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+ return InternalSparkComponentSchema(context=context)
+
+ @property # type: ignore[override]
+ def environment(self) -> Optional[Union[Environment, str]]:
+ """Get the environment of the component.
+
+ :return: The environment of the component.
+ :rtype: Optional[Union[Environment, str]]]
+ """
+ if isinstance(self._environment, Environment) and self._environment.image is None:
+ return Environment(conda_file=self._environment.conda_file, image=DUMMY_IMAGE)
+ return self._environment
+
+ @environment.setter
+ def environment(self, value):
+ """Set the environment of the component.
+
+ :param value: The environment of the component.
+ :type value: Union[str, Environment, dict]
+ :return: No return
+ :rtype: None
+ """
+ if value is None or isinstance(value, (str, Environment)):
+ self._environment = value
+ elif isinstance(value, dict):
+ internal_environment = InternalEnvironment(**value)
+ internal_environment.resolve(self.base_path)
+ self._environment = Environment(
+ name=internal_environment.name,
+ version=internal_environment.version,
+ )
+ if internal_environment.conda:
+ self._environment.conda_file = {
+ "dependencies": internal_environment.conda[InternalEnvironment.CONDA_DEPENDENCIES]
+ }
+ if internal_environment.docker:
+ self._environment.image = internal_environment.docker["image"]
+ # we suppose that loaded internal spark component won't be used to create another internal spark component
+ # so the environment construction here can be simplified
+ else:
+ raise ValueError(f"Unsupported environment type: {type(value)}")
+
+ @property
+ def jars(self) -> Optional[List[str]]:
+ """Get the jars of the component.
+
+ :return: The jars of the component.
+ :rtype: Optional[List[str]]
+ """
+ return self._jars
+
+ @jars.setter
+ def jars(self, value: Union[str, List[str]]):
+ """Set the jars of the component.
+
+ :param value: The jars of the component.
+ :type value: Union[str, List[str]]
+ :return: No return
+ :rtype: None
+ """
+ if isinstance(value, str):
+ value = [value]
+ self._jars = value
+
+ @property
+ def py_files(self) -> Optional[List[str]]:
+ """Get the py_files of the component.
+
+ :return: The py_files of the component.
+ :rtype: Optional[List[str]]
+ """
+ return self._py_files
+
+ @py_files.setter
+ def py_files(self, value):
+ """Set the py_files of the component.
+
+ :param value: The py_files of the component.
+ :type value: Union[str, List[str]]
+ :return: No return
+ :rtype: None
+ """
+ if isinstance(value, str):
+ value = [value]
+ self._py_files = value
+
+ def _to_dict(self) -> Dict:
+ result = super()._to_dict()
+ return result
+
+ def _to_rest_object(self):
+ result = super()._to_rest_object()
+ if "pyFiles" in result.properties.component_spec:
+ result.properties.component_spec["py_files"] = result.properties.component_spec.pop("pyFiles")
+ return result
+
+ @classmethod
+ def _from_rest_object_to_init_params(cls, obj) -> Dict:
+ if "py_files" in obj.properties.component_spec:
+ obj.properties.component_spec["pyFiles"] = obj.properties.component_spec.pop("py_files")
+ result = super()._from_rest_object_to_init_params(obj)
+ return result