about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_internal')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/__init__.py53
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/__init__.py3
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/command.py37
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/component.py232
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/environment.py21
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/input_output.py113
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/node.py75
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_setup.py100
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/__init__.py7
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/_yaml_utils.py58
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/dsl/__init__.py7
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py46
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py31
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py193
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py195
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py35
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py203
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py370
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py157
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py338
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py114
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py29
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py194
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py137
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py47
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py131
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py192
27 files changed, 3118 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/__init__.py
new file mode 100644
index 00000000..14fbaf87
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/__init__.py
@@ -0,0 +1,53 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from ._setup import enable_internal_components_in_pipeline
+from .entities import (
+    Ae365exepool,
+    AISuperComputerConfiguration,
+    AISuperComputerScalePolicy,
+    AISuperComputerStorageReferenceConfiguration,
+    Command,
+    DataTransfer,
+    Distributed,
+    HDInsight,
+    Hemera,
+    InternalInput,
+    ITPConfiguration,
+    ITPInteractiveConfiguration,
+    ITPPriorityConfiguration,
+    ITPResourceConfiguration,
+    ITPRetrySettings,
+    Parallel,
+    Pipeline,
+    Scope,
+    Starlite,
+    TargetSelector,
+)
+
+# enable internal components if users has imported this module directly
+enable_internal_components_in_pipeline()
+
+__all__ = [
+    "TargetSelector",
+    "ITPInteractiveConfiguration",
+    "ITPPriorityConfiguration",
+    "ITPResourceConfiguration",
+    "ITPRetrySettings",
+    "ITPConfiguration",
+    "AISuperComputerConfiguration",
+    "AISuperComputerScalePolicy",
+    "AISuperComputerStorageReferenceConfiguration",
+    "Command",
+    "Distributed",
+    "Parallel",
+    "Scope",
+    "DataTransfer",
+    "Ae365exepool",
+    "HDInsight",
+    "Starlite",
+    "Pipeline",
+    "Hemera",
+    "InternalInput",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/__init__.py
new file mode 100644
index 00000000..d540fd20
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/__init__.py
@@ -0,0 +1,3 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/command.py
new file mode 100644
index 00000000..2dddf02b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/command.py
@@ -0,0 +1,37 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from marshmallow import fields
+
+from ..._schema import NestedField
+from ..._schema.core.fields import DumpableEnumField, EnvironmentField
+from ..._schema.job import ParameterizedCommandSchema, ParameterizedParallelSchema
+from ..._schema.job.job_limits import CommandJobLimitsSchema
+from .._schema.node import InternalBaseNodeSchema, NodeType
+
+
+class CommandSchema(InternalBaseNodeSchema, ParameterizedCommandSchema):
+    class Meta:
+        exclude = ["code", "distribution"]  # internal command doesn't have code & distribution
+
+    environment = EnvironmentField()
+    type = DumpableEnumField(allowed_values=[NodeType.COMMAND])
+    limits = NestedField(CommandJobLimitsSchema)
+
+
+class DistributedSchema(CommandSchema):
+    class Meta:
+        exclude = ["code"]  # need to enable distribution comparing to CommandSchema
+
+    type = DumpableEnumField(allowed_values=[NodeType.DISTRIBUTED])
+
+
+class ParallelSchema(InternalBaseNodeSchema, ParameterizedParallelSchema):
+    class Meta:
+        # partition_keys can still be used with unknown warning, but need to do dump before setting
+        exclude = ["task", "input_data", "mini_batch_error_threshold", "partition_keys"]
+
+    type = DumpableEnumField(allowed_values=[NodeType.PARALLEL])
+    compute = fields.Str()
+    environment = fields.Str()
+    limits = NestedField(CommandJobLimitsSchema)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/component.py
new file mode 100644
index 00000000..11d4bb56
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/component.py
@@ -0,0 +1,232 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os.path
+
+import pydash
+from marshmallow import EXCLUDE, INCLUDE, fields, post_dump, pre_load
+
+from ..._schema import NestedField, StringTransformedEnum, UnionField
+from ..._schema.component.component import ComponentSchema
+from ..._schema.core.fields import ArmVersionedStr, CodeField, EnvironmentField, RegistryStr
+from ..._schema.job.parameterized_spark import SparkEntryClassSchema, SparkEntryFileSchema
+from ..._utils._arm_id_utils import parse_name_label
+from ..._utils.utils import get_valid_dot_keys_with_wildcard
+from ...constants._common import (
+    LABELLED_RESOURCE_NAME,
+    SOURCE_PATH_CONTEXT_KEY,
+    AzureMLResourceType,
+    DefaultOpenEncoding,
+)
+from ...constants._component import NodeType as PublicNodeType
+from .._utils import yaml_safe_load_with_base_resolver
+from .environment import InternalEnvironmentSchema
+from .input_output import (
+    InternalEnumParameterSchema,
+    InternalInputPortSchema,
+    InternalOutputPortSchema,
+    InternalParameterSchema,
+    InternalPrimitiveOutputSchema,
+    InternalSparkParameterSchema,
+)
+
+
+class NodeType:
+    COMMAND = "CommandComponent"
+    DATA_TRANSFER = "DataTransferComponent"
+    DISTRIBUTED = "DistributedComponent"
+    HDI = "HDInsightComponent"
+    SCOPE_V2 = "scope"
+    HDI_V2 = "hdinsight"
+    HEMERA_V2 = "hemera"
+    STARLITE_V2 = "starlite"
+    AE365EXEPOOL_V2 = "ae365exepool"
+    AETHER_BRIDGE_V2 = "aetherbridge"
+    PARALLEL = "ParallelComponent"
+    SCOPE = "ScopeComponent"
+    STARLITE = "StarliteComponent"
+    SWEEP = "SweepComponent"
+    PIPELINE = "PipelineComponent"
+    HEMERA = "HemeraComponent"
+    AE365EXEPOOL = "AE365ExePoolComponent"
+    IPP = "IntellectualPropertyProtectedComponent"
+    # internal spake component got a type value conflict with spark component
+    # this enum is used to identify its create_function in factories
+    SPARK = "DummySpark"
+    AETHER_BRIDGE = "AetherBridgeComponent"
+
+    @classmethod
+    def all_values(cls):
+        all_values = []
+        for key, value in vars(cls).items():
+            if not key.startswith("_") and isinstance(value, str):
+                all_values.append(value)
+        return all_values
+
+
+class InternalComponentSchema(ComponentSchema):
+    class Meta:
+        unknown = INCLUDE
+
+    # override name as 1p components allow . in name, which is not allowed in v2 components
+    name = fields.Str()
+
+    # override to allow empty properties
+    tags = fields.Dict(keys=fields.Str())
+
+    # override inputs & outputs to support 1P inputs & outputs, may need to do strict validation later
+    # no need to check io type match since server will do that
+    inputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField(
+            [
+                NestedField(InternalParameterSchema),
+                NestedField(InternalEnumParameterSchema),
+                NestedField(InternalInputPortSchema),
+            ]
+        ),
+    )
+    # support primitive output for all internal components for now
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField(
+            [
+                NestedField(InternalPrimitiveOutputSchema, unknown=EXCLUDE),
+                NestedField(InternalOutputPortSchema, unknown=EXCLUDE),
+            ]
+        ),
+    )
+
+    # type field is required for registration
+    type = StringTransformedEnum(
+        allowed_values=NodeType.all_values(),
+        casing_transform=lambda x: parse_name_label(x)[0],
+        pass_original=True,
+    )
+
+    # need to resolve as it can be a local field
+    code = CodeField()
+
+    environment = UnionField(
+        [
+            RegistryStr(azureml_type=AzureMLResourceType.ENVIRONMENT),
+            ArmVersionedStr(azureml_type=AzureMLResourceType.ENVIRONMENT),
+            NestedField(InternalEnvironmentSchema),
+        ]
+    )
+
+    def get_skip_fields(self):
+        return ["properties"]
+
+    def _serialize(self, obj, *, many: bool = False):
+        if many and obj is not None:
+            return super(InternalComponentSchema, self)._serialize(obj, many=many)
+        ret = super(InternalComponentSchema, self)._serialize(obj)
+        for attr_name in obj.__dict__.keys():
+            if (
+                not attr_name.startswith("_")
+                and attr_name not in self.get_skip_fields()
+                and attr_name not in self.dump_fields
+            ):
+                ret[attr_name] = self.get_attribute(obj, attr_name, None)
+        return ret
+
+    # override param_override to ensure that param override happens after reloading the yaml
+    @pre_load
+    def add_param_overrides(self, data, **kwargs):
+        source_path = self.context.pop(SOURCE_PATH_CONTEXT_KEY, None)
+        if isinstance(data, dict) and source_path and os.path.isfile(source_path):
+
+            def should_node_overwritten(_root, _parts):
+                parts = _parts.copy()
+                parts.pop()
+                parts.append("type")
+                _input_type = pydash.get(_root, parts, None)
+                return isinstance(_input_type, str) and _input_type.lower() not in ["boolean"]
+
+            # do override here
+            with open(source_path, "r", encoding=DefaultOpenEncoding.READ) as f:
+                origin_data = yaml_safe_load_with_base_resolver(f)
+                for dot_key_wildcard, condition_func in [
+                    ("version", None),
+                    ("inputs.*.default", should_node_overwritten),
+                    ("inputs.*.enum", should_node_overwritten),
+                ]:
+                    for dot_key in get_valid_dot_keys_with_wildcard(
+                        origin_data, dot_key_wildcard, validate_func=condition_func
+                    ):
+                        pydash.set_(data, dot_key, pydash.get(origin_data, dot_key))
+        return super().add_param_overrides(data, **kwargs)
+
+    @post_dump(pass_original=True)
+    def simplify_input_output_port(self, data, original, **kwargs):  # pylint:disable=unused-argument
+        # remove None in input & output
+        for io_ports in [data["inputs"], data["outputs"]]:
+            for port_name, port_definition in io_ports.items():
+                io_ports[port_name] = dict(filter(lambda item: item[1] is not None, port_definition.items()))
+
+        # hack, to match current serialization match expectation
+        for port_name, port_definition in data["inputs"].items():
+            if "mode" in port_definition:
+                del port_definition["mode"]
+
+        return data
+
+    @post_dump(pass_original=True)
+    def add_back_type_label(self, data, original, **kwargs):  # pylint:disable=unused-argument
+        type_label = original._type_label  # pylint:disable=protected-access
+        if type_label:
+            data["type"] = LABELLED_RESOURCE_NAME.format(data["type"], type_label)
+        return data
+
+
+class InternalSparkComponentSchema(InternalComponentSchema):
+    # type field is required for registration
+    type = StringTransformedEnum(
+        allowed_values=PublicNodeType.SPARK,
+        casing_transform=lambda x: parse_name_label(x)[0].lower(),
+        pass_original=True,
+    )
+
+    # override inputs:
+    # https://componentsdk.azurewebsites.net/components/spark_component.html#differences-with-other-component-types
+    inputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField(
+            [
+                NestedField(InternalSparkParameterSchema),
+                NestedField(InternalInputPortSchema),
+            ]
+        ),
+    )
+
+    environment = EnvironmentField(
+        extra_fields=[NestedField(InternalEnvironmentSchema)],
+        allow_none=True,
+    )
+
+    jars = UnionField(
+        [
+            fields.List(fields.Str()),
+            fields.Str(),
+        ],
+    )
+    py_files = UnionField(
+        [
+            fields.List(fields.Str()),
+            fields.Str(),
+        ],
+        data_key="pyFiles",
+        attribute="py_files",
+    )
+
+    entry = UnionField(
+        [NestedField(SparkEntryFileSchema), NestedField(SparkEntryClassSchema)],
+        required=True,
+        metadata={"description": "Entry."},
+    )
+
+    files = fields.List(fields.Str(required=True))
+    archives = fields.List(fields.Str(required=True))
+    conf = fields.Dict(keys=fields.Str(), values=fields.Raw())
+    args = fields.Str(metadata={"description": "Command Line arguments."})
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/environment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/environment.py
new file mode 100644
index 00000000..f7c20228
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/environment.py
@@ -0,0 +1,21 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import fields
+
+from ..._schema import PathAwareSchema
+from ..._schema.core.fields import DumpableEnumField, VersionField
+
+
+class InternalEnvironmentSchema(PathAwareSchema):
+    docker = fields.Dict()
+    conda = fields.Dict()
+    os = DumpableEnumField(
+        # add enum instead of use string transformer here to avoid changing the value
+        allowed_values=["Linux", "Windows", "linux", "windows"],
+        required=False,
+    )
+    name = fields.Str()
+    version = VersionField()
+    python = fields.Dict()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/input_output.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/input_output.py
new file mode 100644
index 00000000..b1fe2188
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/input_output.py
@@ -0,0 +1,113 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import fields, post_dump, post_load
+
+from ..._schema import PatchedSchemaMeta, StringTransformedEnum, UnionField
+from ..._schema.component.input_output import InputPortSchema, ParameterSchema
+from ..._schema.core.fields import DumpableEnumField, PrimitiveValueField
+
+SUPPORTED_INTERNAL_PARAM_TYPES = [
+    "integer",
+    "Integer",
+    "boolean",
+    "Boolean",
+    "string",
+    "String",
+    "float",
+    "Float",
+    "double",
+    "Double",
+]
+
+
+SUPPORTED_INTERNAL_SPARK_PARAM_TYPES = [
+    "integer",
+    "Integer",
+    "boolean",
+    "Boolean",
+    "string",
+    "String",
+    "double",
+    "Double",
+    # remove float and add number
+    "number",
+]
+
+
+class InternalInputPortSchema(InputPortSchema):
+    # skip client-side validate for type enum & support list
+    type = UnionField(
+        [
+            fields.Str(),
+            fields.List(fields.Str()),
+        ],
+        required=True,
+        data_key="type",
+    )
+    is_resource = fields.Bool()
+    datastore_mode = fields.Str()
+
+    @post_dump(pass_original=True)
+    def resolve_list_type(self, data, original_data, **kwargs):  # pylint: disable=unused-argument
+        if isinstance(original_data.type, list):
+            data["type"] = original_data.type
+        return data
+
+
+class InternalOutputPortSchema(metaclass=PatchedSchemaMeta):
+    # skip client-side validate for type enum
+    type = fields.Str(
+        required=True,
+        data_key="type",
+    )
+    description = fields.Str()
+    is_link_mode = fields.Bool()
+    datastore_mode = fields.Str()
+
+
+class InternalPrimitiveOutputSchema(metaclass=PatchedSchemaMeta):
+    type = DumpableEnumField(
+        allowed_values=SUPPORTED_INTERNAL_PARAM_TYPES,
+        required=True,
+    )
+    description = fields.Str()
+
+
+class InternalParameterSchema(ParameterSchema):
+    type = DumpableEnumField(
+        allowed_values=SUPPORTED_INTERNAL_PARAM_TYPES,
+        required=True,
+        data_key="type",
+    )
+
+
+class InternalSparkParameterSchema(ParameterSchema):
+    type = DumpableEnumField(
+        allowed_values=SUPPORTED_INTERNAL_SPARK_PARAM_TYPES,
+        required=True,
+        data_key="type",
+    )
+
+
+class InternalEnumParameterSchema(ParameterSchema):
+    type = StringTransformedEnum(
+        allowed_values=["enum"],
+        required=True,
+        data_key="type",
+    )
+    default = PrimitiveValueField()
+    enum = fields.List(
+        PrimitiveValueField(),
+        required=True,
+    )
+
+    @post_dump
+    @post_load
+    def enum_value_to_string(self, data, **kwargs):  # pylint: disable=unused-argument
+        if "enum" in data:
+            data["enum"] = list(map(str, data["enum"]))
+        if "default" in data and data["default"] is not None:
+            data["default"] = str(data["default"])
+        return data
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/node.py
new file mode 100644
index 00000000..6dbadcd3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_schema/node.py
@@ -0,0 +1,75 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import INCLUDE, fields, post_load, pre_dump
+
+from ..._schema import ArmVersionedStr, NestedField, RegistryStr, UnionField
+from ..._schema.core.fields import DumpableEnumField
+from ..._schema.pipeline.component_job import BaseNodeSchema, _resolve_inputs_outputs
+from ...constants._common import AzureMLResourceType
+from .component import InternalComponentSchema, NodeType
+
+
+class InternalBaseNodeSchema(BaseNodeSchema):
+    class Meta:
+        unknown = INCLUDE
+
+    component = UnionField(
+        [
+            # for registry type assets
+            RegistryStr(azureml_type=AzureMLResourceType.ENVIRONMENT),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+            # inline component or component file reference starting with FILE prefix
+            NestedField(InternalComponentSchema, unknown=INCLUDE),
+        ],
+        required=True,
+    )
+    type = DumpableEnumField(
+        allowed_values=NodeType.all_values(),
+    )
+
+    @post_load
+    def make(self, data, **kwargs):  # pylint: disable=unused-argument
+        from ...entities._builders import parse_inputs_outputs
+
+        # parse inputs/outputs
+        data = parse_inputs_outputs(data)
+
+        # dict to node object
+        from ...entities._job.pipeline._load_component import pipeline_node_factory
+
+        return pipeline_node_factory.load_from_dict(data=data)
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):  # pylint: disable=unused-argument
+        return _resolve_inputs_outputs(job)
+
+
+class ScopeSchema(InternalBaseNodeSchema):
+    type = DumpableEnumField(allowed_values=[NodeType.SCOPE])
+    adla_account_name = fields.Str(required=True)
+    scope_param = fields.Str()
+    custom_job_name_suffix = fields.Str()
+    priority = fields.Int()
+    auto_token = fields.Int()
+    tokens = fields.Int()
+    vcp = fields.Float()
+
+
+class HDInsightSchema(InternalBaseNodeSchema):
+    type = DumpableEnumField(allowed_values=[NodeType.HDI])
+
+    compute_name = fields.Str()
+    queue = fields.Str()
+    driver_memory = fields.Str()
+    driver_cores = fields.Int()
+    executor_memory = fields.Str()
+    executor_cores = fields.Int()
+    number_executors = fields.Int()
+    conf = UnionField(
+        # dictionary or json string
+        union_fields=[fields.Dict(keys=fields.Str()), fields.Str()],
+    )
+    hdinsight_spark_job_name = fields.Str()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_setup.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_setup.py
new file mode 100644
index 00000000..2baf3237
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_setup.py
@@ -0,0 +1,100 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import NoReturn
+
+# pylint: disable=protected-access
+from marshmallow import INCLUDE
+
+from .._schema import NestedField
+from ..entities._builders.control_flow_node import LoopNode
+from ..entities._component.component_factory import component_factory
+from ..entities._job.pipeline._load_component import pipeline_node_factory
+from ._schema.command import CommandSchema, DistributedSchema, ParallelSchema
+from ._schema.component import NodeType
+from ._schema.node import HDInsightSchema, InternalBaseNodeSchema, ScopeSchema
+from .entities import (
+    Command,
+    DataTransfer,
+    Distributed,
+    HDInsight,
+    Hemera,
+    InternalBaseNode,
+    InternalComponent,
+    Parallel,
+    Pipeline,
+    Scope,
+    Starlite,
+)
+from .entities.spark import InternalSparkComponent
+
+_registered = False
+
+
+def _set_registered(value: bool):
+    global _registered  # pylint: disable=global-statement
+    _registered = value
+
+
+def _enable_internal_components():
+    create_schema_func = InternalComponent._create_schema_for_validation
+    for _type in NodeType.all_values():
+        component_factory.register_type(
+            _type=_type,
+            create_instance_func=lambda: InternalComponent.__new__(InternalComponent),
+            create_schema_func=create_schema_func,
+        )
+    component_factory.register_type(
+        _type=NodeType.SPARK,
+        create_instance_func=lambda: InternalSparkComponent.__new__(InternalSparkComponent),
+        create_schema_func=InternalSparkComponent._create_schema_for_validation,
+    )
+
+
+def _register_node(_type, node_cls, schema_cls):
+    pipeline_node_factory.register_type(
+        _type=_type,
+        create_instance_func=lambda: node_cls.__new__(node_cls),
+        load_from_rest_object_func=node_cls._from_rest_object,
+        nested_schema=NestedField(schema_cls, unknown=INCLUDE),
+    )
+
+
+def enable_internal_components_in_pipeline(*, force=False) -> NoReturn:
+    """Enable internal components in pipeline.
+
+    :keyword force: Whether to force re-enable internal components. Defaults to False.
+    :type force: bool
+    :return: No return value.
+    :rtype: None
+    """
+    if _registered and not force:
+        return  # already registered
+
+    _enable_internal_components()
+    for _type in NodeType.all_values():
+        # if we do not register node class for all node types, the only difference will be the type of created node
+        # instance (Ae365exepool => InternalBaseNode). Not sure if this is acceptable.
+        _register_node(_type, InternalBaseNode, InternalBaseNodeSchema)
+
+    # redo the registration for those with specific runsettings
+    _register_node(NodeType.DATA_TRANSFER, DataTransfer, InternalBaseNodeSchema)
+    _register_node(NodeType.COMMAND, Command, CommandSchema)
+    _register_node(NodeType.DISTRIBUTED, Distributed, DistributedSchema)
+    _register_node(NodeType.PARALLEL, Parallel, ParallelSchema)
+    _register_node(NodeType.HEMERA, Hemera, InternalBaseNodeSchema)
+    _register_node(NodeType.STARLITE, Starlite, InternalBaseNodeSchema)
+    _register_node(NodeType.SCOPE, Scope, ScopeSchema)
+    _register_node(NodeType.HDI, HDInsight, HDInsightSchema)
+
+    # register v2 style 1p only components
+    _register_node(NodeType.HEMERA_V2, Hemera, InternalBaseNodeSchema)
+    _register_node(NodeType.STARLITE_V2, Starlite, InternalBaseNodeSchema)
+    _register_node(NodeType.SCOPE_V2, Scope, ScopeSchema)
+    _register_node(NodeType.HDI_V2, HDInsight, HDInsightSchema)
+    # Ae365exepool and AetherBridge have been registered to InternalBaseNode
+
+    # allow using internal nodes in do-while loop
+    LoopNode._extra_body_types = (Command, Pipeline)
+
+    _set_registered(True)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/__init__.py
new file mode 100644
index 00000000..13d8db80
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/__init__.py
@@ -0,0 +1,7 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from ._yaml_utils import yaml_safe_load_with_base_resolver
+
+__all__ = ["yaml_safe_load_with_base_resolver"]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/_yaml_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/_yaml_utils.py
new file mode 100644
index 00000000..fe3fa05e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/_utils/_yaml_utils.py
@@ -0,0 +1,58 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import typing
+
+import strictyaml
+
+
+class _SafeLoaderWithBaseLoader(strictyaml.ruamel.SafeLoader):
+    """This is a SafeLoader with base resolver instead of version default resolver.
+
+    Differences between BaseResolver and VersionedResolver:
+    1) BaseResolver won't try to resolve node value. For example, "yes" and "no" will be resolved to "true"(bool)
+    and "false"(bool) by VersionedResolver, but won't be resolved by BaseResolver.
+    2) VersionedResolver will delay loading the pattern matching rules to pass yaml versions on loading.
+
+    Given SafeLoader inherits from VersionedResolver, we can't directly remove VersionedResolver
+    from the inheritance list. Instead, we overwrite add_version_implicit_resolver method to make
+    _SafeLoaderWithBaseLoader._version_implicit_resolver empty. Then the resolver will act like a BaseResolver.
+    """
+
+    def fetch_comment(self, comment):
+        pass
+
+    def add_version_implicit_resolver(self, version, tag, regexp, first):
+        """Overwrite the method to make the resolver act like a base resolver instead of version default resolver.
+
+        :param version: version of yaml, like (1, 1)(yaml 1.1) and (1, 2)(yaml 1.2)
+        :type version: VersionType
+        :param tag: a tag indicating the type of the resolved node, e.g., tag:yaml.org,2002:bool.
+        :type tag: Any
+        :param regexp: the regular expression to match the node to be resolved
+        :type regexp: Any
+        :param first: a list of first characters to match
+        :type first: Any
+        """
+        self._version_implicit_resolver.setdefault(version, {})
+
+
+def yaml_safe_load_with_base_resolver(stream: typing.IO):
+    """Load yaml string with base resolver instead of version default resolver.
+
+    For example:
+    1) "yes" and "no" will be loaded as "yes"(string) and "no"(string) instead of "true"(bool) and "false"(bool);
+    2) "0.10" will be loaded as "0.10"(string) instead of "0.1"(float).
+    3) "2019-01-01" will be loaded as "2019-01-01"(string) instead of "2019-01-01T00:00:00Z"(datetime).
+    4) "1" will be loaded as "1"(string) instead of "1"(int).
+    5) "1.0" will be loaded as "1.0"(string) instead of "1.0"(float).
+    6) "~" will be loaded as "~"(string) instead of "None"(NoneType).
+
+    Please refer to strictyaml.ruamel.resolver.implicit_resolvers for more details.
+
+    :param stream: A readable stream
+    :type stream: typing.IO
+    :return: The return value of strictyaml.ruamel.load
+    :rtype: Any
+    """
+    return strictyaml.ruamel.load(stream, Loader=_SafeLoaderWithBaseLoader)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/dsl/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/dsl/__init__.py
new file mode 100644
index 00000000..ef7c42d9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/dsl/__init__.py
@@ -0,0 +1,7 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from ...dsl._settings import set_pipeline_settings
+
+__all__ = ["set_pipeline_settings"]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py
new file mode 100644
index 00000000..47296a80
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/__init__.py
@@ -0,0 +1,46 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from ._input_outputs import InternalInput
+from .command import Command, Distributed
+from .component import InternalComponent
+from .node import Ae365exepool, AetherBridge, DataTransfer, HDInsight, Hemera, InternalBaseNode, Pipeline, Starlite
+from .parallel import Parallel
+from .runsettings import (
+    AISuperComputerConfiguration,
+    AISuperComputerScalePolicy,
+    AISuperComputerStorageReferenceConfiguration,
+    ITPConfiguration,
+    ITPInteractiveConfiguration,
+    ITPPriorityConfiguration,
+    ITPResourceConfiguration,
+    ITPRetrySettings,
+    TargetSelector,
+)
+from .scope import Scope
+
+__all__ = [
+    "InternalBaseNode",
+    "DataTransfer",
+    "Ae365exepool",
+    "AetherBridge",
+    "HDInsight",
+    "Parallel",
+    "Starlite",
+    "Pipeline",
+    "Hemera",
+    "Command",
+    "Distributed",
+    "Scope",
+    "InternalComponent",
+    "TargetSelector",
+    "ITPInteractiveConfiguration",
+    "ITPPriorityConfiguration",
+    "ITPResourceConfiguration",
+    "ITPRetrySettings",
+    "ITPConfiguration",
+    "AISuperComputerConfiguration",
+    "AISuperComputerScalePolicy",
+    "AISuperComputerStorageReferenceConfiguration",
+    "InternalInput",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py
new file mode 100644
index 00000000..71a8c0a4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_additional_includes.py
@@ -0,0 +1,31 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from pathlib import Path
+from typing import Optional
+
+from ...constants._common import AzureDevopsArtifactsType
+from ...entities._component._additional_includes import AdditionalIncludes
+
+
+class InternalAdditionalIncludes(AdditionalIncludes):
+    """This class is kept for compatibility with mldesigner."""
+
+    @property
+    def includes(self):
+        if self._is_artifact_includes:
+            return self._get_resolved_additional_include_configs()
+        return self.origin_configs
+
+    @property
+    def code_path(self) -> Optional[Path]:
+        return self.resolved_code_path
+
+    @property
+    def _is_artifact_includes(self):
+        return any(
+            map(
+                lambda x: isinstance(x, dict) and x.get("type", None) == AzureDevopsArtifactsType.ARTIFACT,
+                self.origin_configs,
+            )
+        )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py
new file mode 100644
index 00000000..f9d0a970
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_input_outputs.py
@@ -0,0 +1,193 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Dict, Optional, Union, overload
+
+from ... import Input, Output
+from ..._utils.utils import get_all_enum_values_iter
+from ...constants import AssetTypes
+from ...constants._common import InputTypes
+from ...constants._component import ComponentParameterTypes, IOConstants
+from .._schema.input_output import SUPPORTED_INTERNAL_PARAM_TYPES
+
+_INPUT_TYPE_ENUM = "enum"
+_INPUT_TYPE_ENUM_CAP = "Enum"
+_INPUT_TYPE_FLOAT = "float"
+_INPUT_TYPE_FLOAT_CAP = "Float"
+
+
+class InternalInput(Input):
+    """Internal input class for internal components only. Comparing to the public Input class, this class has additional
+    primitive input types:
+
+    - String
+    - Integer
+    - Float, float
+    - Boolean
+    - Enum, enum (new)
+    """
+
+    def __init__(self, *, datastore_mode=None, is_resource=None, **kwargs):
+        self.datastore_mode = datastore_mode
+        self.is_resource = is_resource
+        super().__init__(**kwargs)
+
+    @property
+    def _allowed_types(self):
+        if self._lower_type == _INPUT_TYPE_ENUM:
+            return str
+        if self._lower_type == _INPUT_TYPE_FLOAT:
+            return float
+        return IOConstants.PRIMITIVE_STR_2_TYPE.get(self._lower_type, None)
+
+    @property
+    def _lower_type(self) -> Optional[str]:
+        if isinstance(self.type, str):
+            return self.type.lower()
+        if self._multiple_types:
+            return None
+        return self.type.__name__.lower()
+
+    @property
+    def _is_primitive_type(self):
+        if self._lower_type in [_INPUT_TYPE_ENUM, _INPUT_TYPE_FLOAT]:
+            return True
+        if self._lower_type in IOConstants.PRIMITIVE_STR_2_TYPE:
+            return True
+        return super()._is_primitive_type
+
+    def _simple_parse(self, value, _type=None):
+        # simple parse is used to parse min, max & optional only, so we don't need to handle enum
+        if _type is None:
+            _type = self._lower_type
+        if _type == _INPUT_TYPE_FLOAT:
+            _type = ComponentParameterTypes.NUMBER
+        return super()._simple_parse(value, _type)
+
+    def _to_rest_object(self) -> Dict:
+        rest_object = super()._to_rest_object()
+        if self._lower_type == _INPUT_TYPE_ENUM:
+            rest_object["type"] = _INPUT_TYPE_ENUM_CAP
+        if self._lower_type == _INPUT_TYPE_FLOAT:
+            rest_object["type"] = _INPUT_TYPE_FLOAT_CAP
+        return rest_object
+
+    @classmethod
+    def _map_from_rest_type(cls, _type):
+        mapping_dict = {
+            _INPUT_TYPE_ENUM_CAP: _INPUT_TYPE_ENUM,
+            _INPUT_TYPE_FLOAT_CAP: _INPUT_TYPE_FLOAT,
+        }
+        if isinstance(_type, str) and _type in mapping_dict:
+            return mapping_dict[_type]
+        return super()._map_from_rest_type(_type)
+
+    @classmethod
+    def _from_rest_object(cls, obj: Dict) -> "InternalInput":
+        obj["type"] = cls._map_from_rest_type(obj["type"])
+        return InternalInput(**obj)
+
+    def _get_python_builtin_type_str(self) -> str:
+        if self._lower_type in [_INPUT_TYPE_ENUM, _INPUT_TYPE_FLOAT]:
+            return self._lower_type
+        if self._is_primitive_type:
+            return IOConstants.PRIMITIVE_STR_2_TYPE[self._lower_type].__name__  # type: ignore[index]
+            # TODO: Bug 2881900
+        return super()._get_python_builtin_type_str()
+
+    @overload
+    @classmethod
+    def _from_base(cls, _input: None) -> None:  # type: ignore[misc]
+        ...
+
+    @overload
+    @classmethod
+    def _from_base(cls, _input: Union[Input, Dict]) -> "InternalInput": ...
+
+    @classmethod
+    def _from_base(cls, _input: Optional[Union[Input, Dict]]) -> Optional["InternalInput"]:
+        """Cast from Input or Dict to InternalInput.
+
+        Do not guarantee to create a new object.
+
+        :param _input: The base input
+        :type _input: Union[Input, Dict]
+        :return:
+          * None if _input is None
+          * InternalInput
+        :rtype: Optional["InternalInput"]
+        """
+        if _input is None:
+            return None
+        if isinstance(_input, InternalInput):
+            return _input
+        if isinstance(_input, Input):
+            # do force cast directly as there is no new field added in InternalInput
+            # need to change the logic if new field is added
+            _input.__class__ = InternalInput
+            return _input  # type: ignore[return-value]
+        return InternalInput(**_input)
+
+
+def _map_v1_io_type(output_type: str) -> str:
+    """Map v1 IO type to v2.
+
+    :param output_type: The v1 IO type
+    :type output_type: str
+    :return: The v2 IO type name
+    :rtype: str
+    """
+
+    # TODO: put it in a common place
+    def _map_primitive_type(_type: str) -> str:
+        """Convert double and float to number type.
+
+        :param _type: A primitive v1 IO type
+        :type _type: str
+        :return:
+          * InputTypes.NUMBER if _type is "double" or "float"
+          * The provided type otherwise
+        :rtype: str
+        """
+        _type = _type.lower()
+        if _type in ["double", "float"]:
+            return InputTypes.NUMBER
+        return _type
+
+    if output_type in list(get_all_enum_values_iter(AssetTypes)):
+        return output_type
+    if output_type in SUPPORTED_INTERNAL_PARAM_TYPES:
+        return _map_primitive_type(output_type)
+    if output_type in ["AnyFile"]:
+        return AssetTypes.URI_FILE
+    # Handle AnyDirectory and the other types.
+    return AssetTypes.URI_FOLDER
+
+
+class InternalOutput(Output):
+    def __init__(self, *, datastore_mode=None, is_link_mode=None, **kwargs):
+        self.datastore_mode = datastore_mode
+        self.is_link_mode = is_link_mode
+        super().__init__(**kwargs)
+
+    @classmethod
+    def _from_base(cls, _output: Union[Output, Dict]) -> Optional["InternalOutput"]:
+        if _output is None:
+            return None
+        if isinstance(_output, InternalOutput):
+            return _output
+        if isinstance(_output, Output):
+            # do force cast directly as there is no new field added in InternalInput
+            # need to change the logic if new field is added
+            _output.__class__ = InternalOutput
+            return _output  # type: ignore[return-value]
+        return InternalOutput(**_output)
+
+    def map_pipeline_output_type(self) -> str:
+        """Map output type to pipeline output type.
+
+        :return: The pipeline output type
+        :rtype: str
+        """
+        # TODO: call this for node output
+        return _map_v1_io_type(self.type)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py
new file mode 100644
index 00000000..3a27b562
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/_merkle_tree.py
@@ -0,0 +1,195 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=pointless-string-statement
+
+import hashlib
+import json
+import os
+from collections import deque
+from datetime import datetime
+from os import listdir
+from os.path import isfile, join
+
+HASH_FILE_CHUNK_SIZE = 65536
+HASH_ALGORITHM = "sha512"
+
+""" Copied from ml-components
+Create a merkle tree for the given directory path
+The directory would typically represent a project directory"""
+
+
+def create_merkletree(file_or_folder_path, exclude_function):
+    root = DirTreeNode("", "Directory", datetime.fromtimestamp(os.path.getmtime(file_or_folder_path)).isoformat())
+    if os.path.isdir(file_or_folder_path):
+        folder_path = file_or_folder_path
+        _create_merkletree_helper(folder_path, root, exclude_function)
+    else:
+        file_path = file_or_folder_path
+        file_node = DirTreeNode(file_path, "File", datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat())
+        hexdigest_hash, bytehash = _get_hash(os.path.normpath(file_path), file_path, "File")
+        if hexdigest_hash and bytehash:
+            file_node.add_hash(hexdigest_hash, bytehash)
+            root.add_child(file_node)
+
+    _populate_hashes(root)
+    return root
+
+
+""" Populate hashes for directory nodes
+by hashing the hashes of child nodes under them"""
+
+
+def _populate_hashes(rootNode):
+    if rootNode.is_file():
+        return rootNode.bytehash
+    h = hashlib.new(HASH_ALGORITHM)
+    for child in rootNode.children:
+        if child.is_file():
+            h.update(child.bytehash)
+        else:
+            h.update(_populate_hashes(child))
+    rootNode.bytehash = h.digest()
+    rootNode.hexdigest_hash = h.hexdigest()
+    return h.digest()
+
+
+""" Create a merkle tree for the given directory path
+    :param projectDir: Directory for which to create a tree.
+    :param rootNode: Root node .
+    Walks the directory and create a dirTree """
+
+
+def _create_merkletree_helper(projectDir, rootNode, exclude_function):
+    for f in sorted(listdir(projectDir)):
+        path = os.path.normpath(join(projectDir, f))
+        if not exclude_function(path):
+            if isfile(join(projectDir, f)):
+                newNode = DirTreeNode(f, "File", datetime.fromtimestamp(os.path.getmtime(path)).isoformat())
+                hexdigest_hash, bytehash = _get_hash(path, f, "File")
+                if hexdigest_hash and bytehash:
+                    newNode.add_hash(hexdigest_hash, bytehash)
+                    rootNode.add_child(newNode)
+            else:
+                newNode = DirTreeNode(f, "Directory", datetime.fromtimestamp(os.path.getmtime(path)).isoformat())
+                rootNode.add_child(newNode)
+                _create_merkletree_helper(path, newNode, exclude_function)
+
+
+def _get_hash(filePath, name, file_type):
+    h = hashlib.new(HASH_ALGORITHM)
+    if not os.access(filePath, os.R_OK):
+        print(filePath, os.R_OK)
+        print("Cannot access file, so excluded from snapshot: {}".format(filePath))
+        return (None, None)
+    with open(filePath, "rb") as f:
+        while True:
+            data = f.read(HASH_FILE_CHUNK_SIZE)
+            if not data:
+                break
+            h.update(data)
+    h.update(name.encode("utf-8"))
+    h.update(file_type.encode("utf-8"))
+    return (h.hexdigest(), h.digest())
+
+
+""" We compute both hexdigest and digest for hashes.
+digest (bytes) is used so that we can compute the bytehash of a parent directory based on bytehash of its children
+hexdigest is used so that we can serialize the tree using json"""
+
+
+class DirTreeNode(object):
+    def __init__(self, name=None, file_type=None, timestamp=None, hexdigest_hash=None, bytehash=None):
+        self.file_type = file_type
+        self.name = name
+        self.timestamp = timestamp
+        self.children = []
+        self.hexdigest_hash = hexdigest_hash
+        self.bytehash = bytehash
+
+    def load_children_from_dict(self, node_dict):
+        if len(node_dict.items()) == 0:
+            return None
+        self.name = node_dict["name"]
+        self.file_type = node_dict["type"]
+        self.hexdigest_hash = node_dict["hash"]
+        self.timestamp = node_dict["timestamp"]
+        for _, child in node_dict["children"].items():
+            node = DirTreeNode()
+            node.load_children_from_dict(child)
+            self.add_child(node)
+        return self
+
+    def load_children_from_json(self, node_dict):
+        self.name = node_dict["name"]
+        self.file_type = node_dict["type"]
+        self.hexdigest_hash = node_dict["hash"]
+        self.timestamp = node_dict["timestamp"]
+        for child in node_dict["children"]:
+            node = DirTreeNode()
+            node.load_children_from_json(child)
+            self.add_child(node)
+        return self
+
+    def load_object_from_dict(self, node_dict):
+        self.load_children_from_dict(node_dict)
+
+    def load_root_object_from_json_string(self, jsondata):
+        node_dict = json.loads(jsondata)
+        self.load_children_from_json(node_dict)
+
+    def add_hash(self, hexdigest_hash, bytehash):
+        self.hexdigest_hash = hexdigest_hash
+        self.bytehash = bytehash
+
+    def add_child(self, node):
+        self.children.append(node)
+
+    def is_file(self):
+        return self.file_type == "File"
+
+    """ Only for debugging purposes"""
+
+    def print_tree(self):
+        queue = deque()
+        print("Name: " + self.name)
+        print("Type: " + self.file_type)
+        for child in self.children:
+            print("    " + child.name)
+            queue.append(child)
+        for i in queue:
+            i.print_tree()
+
+
+""" Serialize merkle tree.
+Serialize all fields except digest (bytes)
+"""
+
+
+class DirTreeJsonEncoder(json.JSONEncoder):
+    def default(self, o):
+        if not isinstance(o, DirTreeNode):
+            return super(DirTreeJsonEncoder, self).default(o)
+        _dict = o.__dict__
+        _dict.pop("bytehash", None)
+        _dict["type"] = _dict.pop("file_type")
+        _dict["hash"] = _dict.pop("hexdigest_hash")
+
+        return _dict
+
+
+class DirTreeJsonEncoderV2(json.JSONEncoder):
+    def default(self, o):
+        if not isinstance(o, DirTreeNode):
+            return super(DirTreeJsonEncoderV2, self).default(o)
+        _dict = o.__dict__
+        _dict.pop("bytehash", None)
+        if "file_type" in _dict:
+            _dict["type"] = _dict.pop("file_type")
+        if "hexdigest_hash" in _dict:
+            _dict["hash"] = _dict.pop("hexdigest_hash")
+        if isinstance(_dict["children"], list):
+            _dict["children"] = {x.name: x for x in _dict["children"]}
+
+        return _dict
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py
new file mode 100644
index 00000000..d4437663
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/code.py
@@ -0,0 +1,35 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Optional
+
+from ...entities._assets import Code
+
+
+class InternalCode(Code):
+    @property
+    def _upload_hash(self) -> Optional[str]:
+        # This property will be used to identify the uploaded content when trying to
+        # upload to datastore. The tracebacks will be as below:
+        #   Traceback (most recent call last):
+        #     _artifact_utilities._check_and_upload_path
+        #     _artifact_utilities._upload_to_datastore
+        #     _artifact_utilities.upload_artifact
+        #     _blob_storage_helper.upload
+        # where asset id will be calculated based on the upload hash.
+
+        if self._is_anonymous is True:
+            # Name of an anonymous internal code is the same as its snapshot id
+            # in ml-component, use it as the upload hash to avoid duplicate hash
+            # calculation with _asset_utils.get_object_hash.
+            return self.name
+
+        return getattr(super(InternalCode, self), "_upload_hash")
+
+    def __setattr__(self, key, value):
+        if key == "name" and hasattr(self, key) and self._is_anonymous is True and value != self.name:
+            raise AttributeError(
+                "InternalCode name are calculated based on its content and cannot "
+                "be changed: current name is {} and new value is {}".format(self.name, value)
+            )
+        super().__setattr__(key, value)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py
new file mode 100644
index 00000000..9ed732ec
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/command.py
@@ -0,0 +1,203 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+from typing import Dict, List, Optional, Union
+
+from marshmallow import INCLUDE, Schema
+
+from ... import MpiDistribution, PyTorchDistribution, RayDistribution, TensorFlowDistribution
+from ..._schema import PathAwareSchema
+from ..._schema.core.fields import DistributionField
+from ...entities import CommandJobLimits, JobResourceConfiguration
+from ...entities._util import get_rest_dict_for_node_attrs
+from .._schema.component import NodeType
+from ..entities.component import InternalComponent
+from ..entities.node import InternalBaseNode
+
+
+class Command(InternalBaseNode):
+    """Node of internal command components in pipeline with specific run settings.
+
+    Different from azure.ai.ml.entities.Command, type of this class is CommandComponent.
+    """
+
+    def __init__(self, **kwargs):
+        node_type = kwargs.pop("type", None) or NodeType.COMMAND
+        super(Command, self).__init__(type=node_type, **kwargs)
+        self._init = True
+        self._resources = kwargs.pop("resources", JobResourceConfiguration())
+        self._compute = kwargs.pop("compute", None)
+        self._environment = kwargs.pop("environment", None)
+        self._environment_variables = kwargs.pop("environment_variables", None)
+        self._limits = kwargs.pop("limits", CommandJobLimits())
+        self._init = False
+
+    @property
+    def compute(self) -> Optional[str]:
+        """Get the compute definition for the command.
+
+        :return: The compute definition
+        :rtype: Optional[str]
+        """
+        return self._compute
+
+    @compute.setter
+    def compute(self, value: str) -> None:
+        """Set the compute definition for the command.
+
+        :param value: The new compute definition
+        :type value: str
+        """
+        self._compute = value
+
+    @property
+    def environment(self) -> Optional[str]:
+        """Get the environment definition for the command.
+
+        :return: The environment definition
+        :rtype: Optional[str]
+        """
+        return self._environment
+
+    @environment.setter
+    def environment(self, value: str) -> None:
+        """Set the environment definition for the command.
+
+        :param value: The new environment definition
+        :type value: str
+        """
+        self._environment = value
+
+    @property
+    def environment_variables(self) -> Optional[Dict[str, str]]:
+        """Get the environment variables for the command.
+
+        :return: The environment variables
+        :rtype: Optional[Dict[str, str]]
+        """
+        return self._environment_variables
+
+    @environment_variables.setter
+    def environment_variables(self, value: Dict[str, str]) -> None:
+        """Set the environment variables for the command.
+
+        :param value: The new environment variables
+        :type value: Dict[str, str]
+        """
+        self._environment_variables = value
+
+    @property
+    def limits(self) -> CommandJobLimits:
+        return self._limits
+
+    @limits.setter
+    def limits(self, value: CommandJobLimits):
+        self._limits = value
+
+    @property
+    def resources(self) -> JobResourceConfiguration:
+        """Compute Resource configuration for the component.
+
+        :return: The resource configuration
+        :rtype: JobResourceConfiguration
+        """
+        return self._resources
+
+    @resources.setter
+    def resources(self, value: JobResourceConfiguration):
+        self._resources = value
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return ["environment", "limits", "resources", "environment_variables"]
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.command import CommandSchema
+
+        return CommandSchema(context=context)
+
+    def _to_rest_object(self, **kwargs) -> dict:
+        rest_obj = super()._to_rest_object(**kwargs)
+        rest_obj.update(
+            {
+                "limits": get_rest_dict_for_node_attrs(self.limits, clear_empty_value=True),
+                "resources": get_rest_dict_for_node_attrs(self.resources, clear_empty_value=True),
+            }
+        )
+        return rest_obj
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj):
+        obj = InternalBaseNode._from_rest_object_to_init_params(obj)
+
+        if "resources" in obj and obj["resources"]:
+            obj["resources"] = JobResourceConfiguration._from_rest_object(obj["resources"])
+
+        # handle limits
+        if "limits" in obj and obj["limits"]:
+            obj["limits"] = CommandJobLimits._from_rest_object(obj["limits"])
+        return obj
+
+
+class Distributed(Command):
+    def __init__(self, **kwargs):
+        super(Distributed, self).__init__(**kwargs)
+        self._distribution = kwargs.pop("distribution", None)
+        self._type = NodeType.DISTRIBUTED
+        if self._distribution is None:
+            # hack: distribution.type is required to set distribution, which is defined in launcher.type
+            if (
+                isinstance(self.component, InternalComponent)
+                and self.component.launcher
+                and "type" in self.component.launcher
+            ):
+                self.distribution = {"type": self.component.launcher["type"]}
+            else:
+                raise ValueError(
+                    "launcher.type must be specified in definition of DistributedComponent but got {}".format(
+                        self.component
+                    )
+                )
+
+    @property
+    def distribution(
+        self,
+    ) -> Union[PyTorchDistribution, MpiDistribution, TensorFlowDistribution, RayDistribution]:
+        """The distribution config of component, e.g. distribution={'type': 'mpi'}.
+
+        :return: The distribution config
+        :rtype: Union[PyTorchDistribution, MpiDistribution, TensorFlowDistribution, RayDistribution]
+        """
+        return self._distribution
+
+    @distribution.setter
+    def distribution(
+        self,
+        value: Union[Dict, PyTorchDistribution, TensorFlowDistribution, MpiDistribution, RayDistribution],
+    ):
+        if isinstance(value, dict):
+            dist_schema = DistributionField(unknown=INCLUDE)
+            value = dist_schema._deserialize(value=value, attr=None, data=None)
+        self._distribution = value
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.command import DistributedSchema
+
+        return DistributedSchema(context=context)
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return Command._picked_fields_from_dict_to_rest_object() + ["distribution"]
+
+    def _to_rest_object(self, **kwargs) -> dict:
+        rest_obj = super()._to_rest_object(**kwargs)
+        distribution = self.distribution._to_rest_object() if self.distribution else None  # pylint: disable=no-member
+        rest_obj.update(
+            {
+                "distribution": get_rest_dict_for_node_attrs(distribution),
+            }
+        )
+        return rest_obj
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
new file mode 100644
index 00000000..e54c1906
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/component.py
@@ -0,0 +1,370 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, redefined-builtin
+# disable redefined-builtin to use id/type as argument name
+import os
+from contextlib import contextmanager
+from os import PathLike
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Union
+from uuid import UUID
+
+import yaml  # type: ignore[import]
+from marshmallow import Schema
+
+from ... import Input, Output
+from ..._restclient.v2022_10_01.models import ComponentVersion, ComponentVersionProperties
+from ..._schema import PathAwareSchema
+from ..._utils._arm_id_utils import parse_name_label
+from ..._utils._asset_utils import IgnoreFile
+from ...constants._common import DefaultOpenEncoding
+from ...entities import Component
+from ...entities._assets import Code
+from ...entities._component._additional_includes import AdditionalIncludes, AdditionalIncludesMixin
+from ...entities._component.code import ComponentIgnoreFile
+from ...entities._job.distribution import DistributionConfiguration
+from ...entities._system_data import SystemData
+from ...entities._util import convert_ordered_dict_to_dict
+from ...entities._validation import MutableValidationResult
+from .._schema.component import InternalComponentSchema
+from ._input_outputs import InternalInput, InternalOutput
+from ._merkle_tree import create_merkletree
+from .code import InternalCode
+from .environment import InternalEnvironment
+from .node import InternalBaseNode
+
+_ADDITIONAL_INCLUDES_CONFIG_KEY = "additional_includes"
+_ADDITIONAL_INCLUDES_SUFFIX = ".additional_includes"
+
+
+class InternalComponent(Component, AdditionalIncludesMixin):
+    # pylint: disable=too-many-instance-attributes, too-many-locals
+    """Base class for internal component version, used to define an internal component. Recommended to create instance
+    with component_factory.
+
+    :param name: Name of the resource.
+    :type name: str
+    :param version: Version of the resource.
+    :type version: str
+    :param id:  Global id of the resource, Azure Resource Manager ID.
+    :type id: str
+    :param type:  Type of the command, supported is 'command'.
+    :type type: str
+    :param description: Description of the resource.
+    :type description: str
+    :param tags: Tag dictionary. Tags can be added, removed, and updated.
+    :type tags: dict
+    :param properties: Internal use only.
+    :type properties: dict
+    :param display_name: Display name of the component.
+    :type display_name: str
+    :param is_deterministic: Whether the component is deterministic.
+    :type is_deterministic: bool
+    :param inputs: Inputs of the component.
+    :type inputs: dict
+    :param outputs: Outputs of the component.
+    :type outputs: dict
+    :param yaml_str: The yaml string of the component.
+    :type yaml_str: str
+    :param _schema: Schema of the component.
+    :type _schema: str
+    :param creation_context: Creation metadata of the component.
+    :type creation_context: ~azure.ai.ml.entities.SystemData
+    """
+
+    def __init__(
+        self,
+        *,
+        _schema: Optional[str] = None,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        display_name: Optional[str] = None,
+        type: Optional[str] = None,
+        description: Optional[str] = None,
+        tags: Optional[Dict] = None,
+        is_deterministic: Optional[bool] = None,
+        successful_return_code: Optional[str] = None,
+        inputs: Optional[Dict] = None,
+        outputs: Optional[Dict] = None,
+        code: Optional[Union[str, os.PathLike]] = None,
+        environment: Optional[Dict] = None,
+        environment_variables: Optional[Dict] = None,
+        command: Optional[str] = None,
+        id: Optional[str] = None,
+        properties: Optional[Dict] = None,
+        yaml_str: Optional[str] = None,
+        creation_context: Optional[SystemData] = None,
+        scope: Optional[Dict] = None,
+        hemera: Optional[Dict] = None,
+        hdinsight: Optional[Dict] = None,
+        parallel: Optional[Dict] = None,
+        starlite: Optional[Dict] = None,
+        ae365exepool: Optional[Dict] = None,
+        launcher: Optional[Dict] = None,
+        datatransfer: Optional[Dict] = None,
+        aether: Optional[Dict] = None,
+        **kwargs,
+    ):
+        _type, self._type_label = parse_name_label(type)
+        super().__init__(
+            name=name,
+            version=version,
+            id=id,
+            type=_type,
+            description=description,
+            tags=tags,
+            properties=properties,
+            display_name=display_name,
+            is_deterministic=is_deterministic,  # type: ignore[arg-type]
+            inputs=inputs,
+            outputs=outputs,
+            yaml_str=yaml_str,
+            _schema=_schema,
+            creation_context=creation_context,
+            **kwargs,
+        )
+        # Store original yaml
+        self._yaml_str = yaml_str
+        self._other_parameter = kwargs
+
+        self.successful_return_code = successful_return_code
+        self.code = code
+        self.environment = InternalEnvironment(**environment) if isinstance(environment, dict) else environment
+        self.environment_variables = environment_variables
+        # TODO: remove these to keep it a general component class
+        self.command = command
+        self.scope = scope
+        self.hemera = hemera
+        self.hdinsight = hdinsight
+        self.parallel = parallel
+        self.starlite = starlite
+        self.ae365exepool = ae365exepool
+        self.launcher = launcher
+        self.datatransfer = datatransfer
+        self.aether = aether
+
+    @classmethod
+    def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool):
+        component_io = {}
+        for name, port in io_dict.items():
+            if is_input:
+                component_io[name] = InternalInput._from_base(port)
+            else:
+                component_io[name] = InternalOutput._from_base(port)
+        return component_io
+
+    # region AdditionalIncludesMixin
+
+    @classmethod
+    def _read_additional_include_configs(cls, yaml_path: Path) -> List[str]:
+        """Read additional include configs from the additional includes file.
+        The name of the file is the same as the component spec file, with a suffix of ".additional_includes".
+        It can be either a yaml file or a text file:
+        1. If it is a yaml file, yaml format of additional_includes looks like below:
+        ```
+        additional_includes:
+         - your/local/path
+         - type: artifact
+           organization: devops_organization
+           project: devops_project
+           feed: artifacts_feed_name
+           name: universal_package_name
+           version: package_version
+           scope: scope_type
+        ```
+        2. If it is a text file, each line is a path to include. Note that artifact config is not supported
+        in this format.
+
+        :param yaml_path: The yaml path
+        :type yaml_path: Path
+        :return: The list of additional includes
+        :rtype: List[str]
+        """
+        additional_includes_config_path = yaml_path.with_suffix(_ADDITIONAL_INCLUDES_SUFFIX)
+        if additional_includes_config_path.is_file():
+            with open(additional_includes_config_path, encoding=DefaultOpenEncoding.READ) as f:
+                file_content = f.read()
+                try:
+                    configs = yaml.safe_load(file_content)
+                    if isinstance(configs, dict):
+                        return configs.get(_ADDITIONAL_INCLUDES_CONFIG_KEY, [])
+                except Exception:  # pylint: disable=W0718
+                    # TODO: check if we should catch yaml.YamlError instead here
+                    pass
+                return [line.strip() for line in file_content.splitlines(keepends=False) if len(line.strip()) > 0]
+        return []
+
+    @classmethod
+    def _get_additional_includes_field_name(cls) -> str:
+        # additional includes for internal components are configured by a file, which is not a field in the yaml
+        # return '*' as diagnostics yaml paths and override _get_all_additional_includes_configs.
+        return "*"
+
+    def _get_all_additional_includes_configs(self) -> List:
+        # internal components must have a source path
+        return self._read_additional_include_configs(Path(self._source_path))  # type: ignore[arg-type]
+        # TODO: Bug 2881943
+
+    def _get_base_path_for_code(self) -> Path:
+        # internal components must have a source path
+        return Path(self._source_path).parent  # type: ignore[arg-type]
+        # TODO: Bug 2881943
+
+    def _get_origin_code_value(self) -> Union[str, PathLike, None]:
+        return super()._get_origin_code_value() or "."
+
+    # endregion
+
+    def _to_ordered_dict_for_yaml_dump(self) -> Dict:
+        """Dump the component content into a sorted yaml string.
+
+        :return: The ordered dict
+        :rtype: Dict
+        """
+
+        obj = super()._to_ordered_dict_for_yaml_dump()
+        # dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
+        if "code" in obj:
+            if not self.code:
+                del obj["code"]
+            else:
+                obj["code"] = self.code
+        return obj
+
+    @property
+    def _additional_includes(self) -> AdditionalIncludes:
+        """This property is kept for compatibility with old mldesigner sdk.
+
+        :return: The additional includes
+        :rtype: AdditionalIncludes
+        """
+        obj = self._generate_additional_includes_obj()
+        from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes
+
+        obj.__class__ = InternalAdditionalIncludes
+        return obj
+
+    # region SchemaValidatableMixin
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        return InternalComponentSchema(context=context)
+
+    def _customized_validate(self) -> MutableValidationResult:
+        validation_result = super(InternalComponent, self)._customized_validate()
+        skip_path_validation = not self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(
+            validation_result
+        )
+        # resolving additional includes & update self._base_path can be dangerous,
+        # so we just skip path validation if additional includes is provided.
+        # note that there will still be client-side error on job submission (after code is resolved)
+        # if paths in environment are invalid
+        if isinstance(self.environment, InternalEnvironment):
+            validation_result.merge_with(
+                self.environment.validate(
+                    self._base_path,
+                    skip_path_validation=skip_path_validation,
+                ),
+                field_name="environment",
+            )
+        return validation_result
+
+    # endregion
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj: ComponentVersion) -> Dict:
+        # put it here as distribution is shared by some components, e.g. command
+        distribution = obj.properties.component_spec.pop("distribution", None)
+        init_kwargs = super()._from_rest_object_to_init_params(obj)
+        if distribution:
+            init_kwargs["distribution"] = DistributionConfiguration._from_rest_object(distribution)
+        return init_kwargs
+
+    def _to_rest_object(self) -> ComponentVersion:
+        component: Union[Dict[Any, Any], List[Any]] = convert_ordered_dict_to_dict(self._to_dict())
+        component["_source"] = self._source  # type: ignore[call-overload]
+        # TODO: 2883063
+
+        properties = ComponentVersionProperties(
+            component_spec=component,
+            description=self.description,
+            is_anonymous=self._is_anonymous,
+            properties=self.properties,
+            tags=self.tags,
+        )
+        result = ComponentVersion(properties=properties)
+        result.name = self.name
+        return result
+
+    @classmethod
+    def _get_snapshot_id(
+        cls,
+        code_path: Union[str, PathLike],
+        ignore_file: IgnoreFile,
+    ) -> str:
+        """Get the snapshot id of a component with specific working directory in ml-components. Use this as the name of
+        code asset to reuse steps in a pipeline job from ml-components runs.
+
+        :param code_path: The path of the working directory.
+        :type code_path: str
+        :param ignore_file: The ignore file of the snapshot.
+        :type ignore_file: IgnoreFile
+        :return: The snapshot id of a component in ml-components with code_path as its working directory.
+        :rtype: str
+        """
+        curr_root = create_merkletree(code_path, ignore_file.is_file_excluded)
+        snapshot_id = str(UUID(curr_root.hexdigest_hash[::4]))
+        return snapshot_id
+
+    @contextmanager  # type: ignore[arg-type]
+    def _try_build_local_code(self) -> Iterable[Code]:
+        """Build final code when origin code is a local code.
+        Will merge code path with additional includes into a temp folder if additional includes is specified.
+        For internal components, file dependencies in environment will be resolved based on the final code.
+
+        :return: The code instance
+        :rtype: Iterable[Code]
+        """
+
+        tmp_code_dir: Path
+        # origin code value of internal component will never be None. check _get_origin_code_value for details
+        with self._generate_additional_includes_obj().merge_local_code_and_additional_includes() as tmp_code_dir:
+            # use absolute path in case temp folder & work dir are in different drive
+            tmp_code_dir = tmp_code_dir.absolute()
+
+            # file dependency in code will be read during internal environment resolution
+            # for example, docker file of the environment may be in additional includes;
+            # and it will be read then insert to the environment object during resolution.
+            # so we need to resolve environment based on the temporary code path
+            if isinstance(self.environment, InternalEnvironment):
+                self.environment.resolve(base_path=tmp_code_dir)
+
+            # additional includes config file itself should be ignored
+            rebased_ignore_file = ComponentIgnoreFile(
+                tmp_code_dir,
+                additional_includes_file_name=Path(self._source_path)
+                .with_suffix(_ADDITIONAL_INCLUDES_SUFFIX)
+                .name,  # type: ignore[arg-type]
+                # TODO: Bug 2881943
+            )
+
+            # Use the snapshot id in ml-components as code name to enable anonymous
+            # component reuse from ml-component runs.
+            # calculate snapshot id here instead of inside InternalCode to ensure that
+            # snapshot id is calculated based on the built code path
+            yield InternalCode(
+                name=self._get_snapshot_id(
+                    # use absolute path in case temp folder & work dir are in different drive
+                    tmp_code_dir,
+                    # this ignore-file should be rebased to the built code path
+                    rebased_ignore_file,
+                ),
+                version="1",
+                base_path=self._base_path,
+                path=tmp_code_dir,
+                is_anonymous=True,
+                ignore_file=rebased_ignore_file,
+            )
+
+    def __call__(self, *args, **kwargs) -> InternalBaseNode:
+        return super(InternalComponent, self).__call__(*args, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py
new file mode 100644
index 00000000..673afeac
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/environment.py
@@ -0,0 +1,157 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from os import PathLike
+from pathlib import Path
+from typing import Dict, Optional, Union
+
+from ..._utils.utils import load_yaml
+from ...constants._common import FILE_PREFIX, DefaultOpenEncoding
+from ...entities._validation import MutableValidationResult, ValidationResultBuilder
+
+
+class InternalEnvironment:
+    # conda section
+    CONDA_DEPENDENCIES = "conda_dependencies"
+    CONDA_DEPENDENCIES_FILE = "conda_dependencies_file"
+    PIP_REQUIREMENTS_FILE = "pip_requirements_file"
+    DEFAULT_PYTHON_VERSION = "3.8.5"
+    # docker section
+    BUILD = "build"
+    DOCKERFILE = "dockerfile"
+
+    def __init__(
+        self,
+        docker: Optional[Dict] = None,
+        conda: Optional[Dict] = None,
+        os: Optional[str] = None,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        python: Optional[Dict] = None,
+    ):
+        self.docker = docker
+        self.conda = conda
+        self.os = os if os else "Linux"
+        self.name = name
+        self.version = version
+        self.python = python
+        self._docker_file_resolved = False
+
+    @staticmethod
+    def _parse_file_path(value: str) -> str:
+        return value[len(FILE_PREFIX) :] if value.startswith(FILE_PREFIX) else value
+
+    def _validate_conda_section(
+        self, base_path: Union[str, PathLike], skip_path_validation: bool
+    ) -> MutableValidationResult:
+        validation_result = ValidationResultBuilder.success()
+        if not self.conda:
+            return validation_result
+        dependencies_field_names = {self.CONDA_DEPENDENCIES, self.CONDA_DEPENDENCIES_FILE, self.PIP_REQUIREMENTS_FILE}
+        if len(set(self.conda) & dependencies_field_names) > 1:
+            validation_result.append_warning(
+                yaml_path="conda",
+                message="Duplicated declaration of dependencies, will honor in the order "
+                "conda_dependencies, conda_dependencies_file and pip_requirements_file.",
+            )
+        if self.conda.get(self.CONDA_DEPENDENCIES_FILE):
+            conda_dependencies_file = self.conda[self.CONDA_DEPENDENCIES_FILE]
+            if not skip_path_validation and not (Path(base_path) / conda_dependencies_file).is_file():
+                validation_result.append_error(
+                    yaml_path=f"conda.{self.CONDA_DEPENDENCIES_FILE}",
+                    message=f"Cannot find conda dependencies file: {conda_dependencies_file!r}",
+                )
+        if self.conda.get(self.PIP_REQUIREMENTS_FILE):
+            pip_requirements_file = self.conda[self.PIP_REQUIREMENTS_FILE]
+            if not skip_path_validation and not (Path(base_path) / pip_requirements_file).is_file():
+                validation_result.append_error(
+                    yaml_path=f"conda.{self.PIP_REQUIREMENTS_FILE}",
+                    message=f"Cannot find pip requirements file: {pip_requirements_file!r}",
+                )
+        return validation_result
+
+    def _validate_docker_section(
+        self, base_path: Union[str, PathLike], skip_path_validation: bool
+    ) -> MutableValidationResult:
+        validation_result = ValidationResultBuilder.success()
+        if not self.docker:
+            return validation_result
+        if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE):
+            return validation_result
+        dockerfile_file = self.docker[self.BUILD][self.DOCKERFILE]
+        dockerfile_file = self._parse_file_path(dockerfile_file)
+        if (
+            not self._docker_file_resolved
+            and not skip_path_validation
+            and not (Path(base_path) / dockerfile_file).is_file()
+        ):
+            validation_result.append_error(
+                yaml_path=f"docker.{self.BUILD}.{self.DOCKERFILE}",
+                message=f"Dockerfile not exists: {dockerfile_file}",
+            )
+        return validation_result
+
+    def validate(self, base_path: Union[str, PathLike], skip_path_validation: bool = False) -> MutableValidationResult:
+        """Validate the environment section.
+
+        This is a public method but won't be exposed to user given InternalEnvironment is an internal class.
+
+        :param base_path: The base path
+        :type base_path: Union[str, PathLike]
+        :param skip_path_validation: Whether to skip path validation. Defaults to False
+        :type skip_path_validation: bool
+        :return: The validation result
+        :rtype: MutableValidationResult
+        """
+        validation_result = ValidationResultBuilder.success()
+        if self.os is not None and self.os not in {"Linux", "Windows", "linux", "windows"}:
+            validation_result.append_error(
+                yaml_path="os",
+                message=f"Only support 'Linux' and 'Windows', but got {self.os!r}",
+            )
+        validation_result.merge_with(self._validate_conda_section(base_path, skip_path_validation))
+        validation_result.merge_with(self._validate_docker_section(base_path, skip_path_validation))
+        return validation_result
+
+    def _resolve_conda_section(self, base_path: Union[str, PathLike]) -> None:
+        if not self.conda:
+            return
+        if self.conda.get(self.CONDA_DEPENDENCIES_FILE):
+            conda_dependencies_file = self.conda.pop(self.CONDA_DEPENDENCIES_FILE)
+            self.conda[self.CONDA_DEPENDENCIES] = load_yaml(Path(base_path) / conda_dependencies_file)
+            return
+        if self.conda.get(self.PIP_REQUIREMENTS_FILE):
+            pip_requirements_file = self.conda.pop(self.PIP_REQUIREMENTS_FILE)
+            with open(Path(base_path) / pip_requirements_file, encoding=DefaultOpenEncoding.READ) as f:
+                pip_requirements = f.read().splitlines()
+                self.conda = {
+                    self.CONDA_DEPENDENCIES: {
+                        "name": "project_environment",
+                        "dependencies": [
+                            f"python={self.DEFAULT_PYTHON_VERSION}",
+                            {
+                                "pip": pip_requirements,
+                            },
+                        ],
+                    }
+                }
+            return
+
+    def _resolve_docker_section(self, base_path: Union[str, PathLike]) -> None:
+        if not self.docker:
+            return
+        if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE):
+            return
+        dockerfile_file = self.docker[self.BUILD][self.DOCKERFILE]
+        if not dockerfile_file.startswith(FILE_PREFIX):
+            return
+        dockerfile_file = self._parse_file_path(dockerfile_file)
+        with open(Path(base_path) / dockerfile_file, "r", encoding=DefaultOpenEncoding.READ) as f:
+            self.docker[self.BUILD][self.DOCKERFILE] = f.read()
+            self._docker_file_resolved = True
+        return
+
+    def resolve(self, base_path: Union[str, PathLike]) -> None:
+        self._resolve_conda_section(base_path)
+        self._resolve_docker_section(base_path)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py
new file mode 100644
index 00000000..89fc032c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/node.py
@@ -0,0 +1,338 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access
+
+from enum import Enum
+from typing import Dict, List, Optional, Union
+
+from marshmallow import Schema
+
+from ... import Input, Output
+from ..._schema import PathAwareSchema
+from ...constants import JobType
+from ...entities import Component, Job
+from ...entities._builders import BaseNode
+from ...entities._job.pipeline._io import NodeInput, NodeOutput, PipelineInput
+from ...entities._util import convert_ordered_dict_to_dict
+from .._schema.component import NodeType
+
+
+class InternalBaseNode(BaseNode):
+    """Base class for node of internal components in pipeline. Can be instantiated directly.
+
+    :param type: Type of pipeline node
+    :type type: str
+    :param component: Id or instance of the component version to be run for the step
+    :type component: Union[Component, str]
+    :param inputs: Inputs to the node.
+    :type inputs: Dict[str, Union[Input, str, bool, int, float, Enum, dict]]
+    :param outputs: Mapping of output data bindings used in the job.
+    :type outputs: Dict[str, Union[str, Output, dict]]
+    :param properties: The job property dictionary.
+    :type properties: dict[str, str]
+    :param compute: Compute definition containing the compute information for the step
+    :type compute: str
+    """
+
+    def __init__(
+        self,
+        *,
+        type: str = JobType.COMPONENT,  # pylint: disable=redefined-builtin
+        component: Union[Component, str],
+        inputs: Optional[
+            Dict[
+                str,
+                Union[
+                    PipelineInput,
+                    NodeOutput,
+                    Input,
+                    str,
+                    bool,
+                    int,
+                    float,
+                    Enum,
+                    "Input",
+                ],
+            ]
+        ] = None,
+        outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
+        properties: Optional[Dict] = None,
+        compute: Optional[str] = None,
+        **kwargs,
+    ):
+        kwargs.pop("type", None)
+        BaseNode.__init__(
+            self,
+            type=type,
+            component=component,  # type: ignore[arg-type]
+            # TODO: Bug 2881892
+            inputs=inputs,
+            outputs=outputs,
+            compute=compute,
+            properties=properties,
+            **kwargs,
+        )
+
+    @property
+    def _skip_required_compute_missing_validation(self) -> bool:
+        return True
+
+    def _to_node(self, context: Optional[Dict] = None, **kwargs) -> BaseNode:
+        return self
+
+    def _to_component(self, context: Optional[Dict] = None, **kwargs) -> Component:
+        return self.component
+
+    def _to_job(self) -> Job:
+        raise RuntimeError("Internal components doesn't support to job")
+
+    @classmethod
+    def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "Job":
+        raise RuntimeError("Internal components doesn't support load from dict")
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.node import InternalBaseNodeSchema
+
+        return InternalBaseNodeSchema(context=context)
+
+    @property
+    def component(self) -> Component:
+        return self._component
+
+    def _to_rest_inputs(self) -> Dict[str, Dict]:
+        rest_dataset_literal_inputs = super(InternalBaseNode, self)._to_rest_inputs()
+        for input_name, input_value in self.inputs.items():
+            # hack: remove unfilled input from rest object instead a default input of {"job_input_type": "literal"}
+            # note that this hack is not always effective as _data will be set to Input() when visiting input_value.type
+            if (
+                isinstance(input_value, NodeInput)
+                and input_value._data is None
+                and input_name in rest_dataset_literal_inputs
+            ):
+                del rest_dataset_literal_inputs[input_name]
+        return rest_dataset_literal_inputs
+
+    def _to_rest_object(self, **kwargs) -> dict:
+        base_dict = super(InternalBaseNode, self)._to_rest_object(**kwargs)
+        for key in ["name", "display_name", "tags"]:
+            if key in base_dict:
+                del base_dict[key]
+        for key in ["computeId"]:
+            if key in base_dict and base_dict[key] is None:
+                del base_dict[key]
+
+        base_dict.update(
+            convert_ordered_dict_to_dict(
+                {
+                    "componentId": self._get_component_id(),
+                    "type": self.type,
+                }
+            )
+        )
+        return base_dict
+
+
+class DataTransfer(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(DataTransfer, self).__init__(type=NodeType.DATA_TRANSFER, **kwargs)
+
+
+class HDInsight(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(HDInsight, self).__init__(type=NodeType.HDI, **kwargs)
+        self._init = True
+        self._compute_name: str = kwargs.pop("compute_name", None)
+        self._queue: str = kwargs.pop("queue", None)
+        self._driver_memory: str = kwargs.pop("driver_memory", None)
+        self._driver_cores: int = kwargs.pop("driver_cores", None)
+        self._executor_memory: str = kwargs.pop("executor_memory", None)
+        self._executor_cores: int = kwargs.pop("executor_cores", None)
+        self._number_executors: int = kwargs.pop("number_executors", None)
+        self._conf: Union[dict, str] = kwargs.pop("conf", None)
+        self._hdinsight_spark_job_name: str = kwargs.pop("hdinsight_spark_job_name", None)
+        self._init = False
+
+    @property
+    def compute_name(self) -> str:
+        """Name of the compute to be used.
+
+        :return: Compute name
+        :rtype: str
+        """
+        return self._compute_name
+
+    @compute_name.setter
+    def compute_name(self, value: str):
+        self._compute_name = value
+
+    @property
+    def queue(self) -> str:
+        """The name of the YARN queue to which submitted.
+
+        :return: YARN queue name
+        :rtype: str
+        """
+        return self._queue
+
+    @queue.setter
+    def queue(self, value: str):
+        self._queue = value
+
+    @property
+    def driver_memory(self) -> str:
+        """Amount of memory to use for the driver process.
+
+        It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte,
+        megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g.
+
+        :return: Amount of memory to use for the driver process
+        :rtype: str
+        """
+        return self._driver_memory
+
+    @driver_memory.setter
+    def driver_memory(self, value: str):
+        self._driver_memory = value
+
+    @property
+    def driver_cores(self) -> int:
+        """Number of cores to use for the driver process.
+
+        :return: Number of cores to use for the driver process.
+        :rtype: int
+        """
+        return self._driver_cores
+
+    @driver_cores.setter
+    def driver_cores(self, value: int):
+        self._driver_cores = value
+
+    @property
+    def executor_memory(self) -> str:
+        """Amount of memory to use per executor process.
+
+        It's the same format as JVM memory strings. Use lower-case suffixes, e.g. k, m, g, t, and p, for kilobyte,
+        megabyte, gigabyte and terabyte respectively. Example values are 10k, 10m and 10g.
+
+        :return: The executor memory
+        :rtype: str
+        """
+        return self._executor_memory
+
+    @executor_memory.setter
+    def executor_memory(self, value: str):
+        self._executor_memory = value
+
+    @property
+    def executor_cores(self) -> int:
+        """Number of cores to use for each executor.
+
+        :return: The number of cores to use for each executor
+        :rtype: int
+        """
+        return self._executor_cores
+
+    @executor_cores.setter
+    def executor_cores(self, value: int):
+        self._executor_cores = value
+
+    @property
+    def number_executors(self) -> int:
+        """Number of executors to launch for this session.
+
+        :return: The number of executors to launch
+        :rtype: int
+        """
+        return self._number_executors
+
+    @number_executors.setter
+    def number_executors(self, value: int):
+        self._number_executors = value
+
+    @property
+    def conf(self) -> Union[dict, str]:
+        """Spark configuration properties.
+
+        :return: The spark configuration properties.
+        :rtype: Union[dict, str]
+        """
+        return self._conf
+
+    @conf.setter
+    def conf(self, value: Union[dict, str]):
+        self._conf = value
+
+    @property
+    def hdinsight_spark_job_name(self) -> str:
+        """
+
+        :return: The name of this session
+        :rtype: str
+        """
+        return self._hdinsight_spark_job_name
+
+    @hdinsight_spark_job_name.setter
+    def hdinsight_spark_job_name(self, value: str):
+        self._hdinsight_spark_job_name = value
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return [
+            "compute_name",
+            "queue",
+            "driver_cores",
+            "executor_memory",
+            "conf",
+            "hdinsight_spark_job_name",
+            "driver_memory",
+            "executor_cores",
+            "number_executors",
+        ]
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.node import HDInsightSchema
+
+        return HDInsightSchema(context=context)
+
+
+class Starlite(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Starlite, self).__init__(type=NodeType.STARLITE, **kwargs)
+
+
+class Pipeline(InternalBaseNode):
+    # this is only for using registered pipeline component
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Pipeline, self).__init__(type=NodeType.PIPELINE, **kwargs)
+
+
+class Hemera(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Hemera, self).__init__(type=NodeType.HEMERA, **kwargs)
+
+
+class Ae365exepool(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Ae365exepool, self).__init__(type=NodeType.AE365EXEPOOL, **kwargs)
+
+
+class Sweep(InternalBaseNode):
+    # this is not in our scope
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Sweep, self).__init__(type=NodeType.SWEEP, **kwargs)
+
+
+class AetherBridge(InternalBaseNode):
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(AetherBridge, self).__init__(type=NodeType.AETHER_BRIDGE, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
new file mode 100644
index 00000000..86fa8939
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/parallel.py
@@ -0,0 +1,114 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import List, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from ...entities import BatchRetrySettings
+from .._schema.component import NodeType
+from ..entities import Command
+
+
+class Parallel(Command):
+    """Node of scope components in pipeline with specific run settings."""
+
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Parallel, self).__init__(type=NodeType.PARALLEL, **kwargs)
+        self._init = True
+        self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None)
+        self._error_threshold = kwargs.pop("error_threshold", None)
+        self._mini_batch_size = kwargs.pop("mini_batch_size", None)
+        self._partition_keys = kwargs.pop("partition_keys", None)
+        self._logging_level = kwargs.pop("logging_level", None)
+        self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings())
+        self._init = False
+
+    @property
+    def max_concurrency_per_instance(self) -> int:
+        """The max parallellism that each compute instance has.
+
+        :return: The max concurrence per compute instance
+        :rtype: int
+        """
+        return self._max_concurrency_per_instance
+
+    @max_concurrency_per_instance.setter
+    def max_concurrency_per_instance(self, value: int):
+        self._max_concurrency_per_instance = value
+
+    @property
+    def error_threshold(self) -> int:
+        """The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored
+        during processing.
+
+        If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input
+        rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all
+        failures during processing.
+
+        :return: The error threshold
+        :rtype: int
+        """
+        return self._error_threshold
+
+    @error_threshold.setter
+    def error_threshold(self, value: int):
+        self._error_threshold = value
+
+    @property
+    def mini_batch_size(self) -> int:
+        """The number of records to be sent to run() method for each mini-batch.
+
+        :return: The batch size
+        :rtype: int
+        """
+        return self._mini_batch_size
+
+    @mini_batch_size.setter
+    def mini_batch_size(self, value: int):
+        self._mini_batch_size = value
+
+    @property
+    def logging_level(self) -> str:
+        """A string of the logging level name.
+
+        :return: The loggin level
+        :rtype: str
+        """
+        return self._logging_level
+
+    @logging_level.setter
+    def logging_level(self, value: str):
+        self._logging_level = value
+
+    @property
+    def retry_settings(self) -> BatchRetrySettings:
+        """Parallel job run failed retry.
+
+        :return: The retry settings
+        :rtype: BatchRetrySettings
+        """
+        return self._retry_settings
+
+    @retry_settings.setter
+    def retry_settings(self, value: BatchRetrySettings):
+        self._retry_settings = value
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return Command._picked_fields_from_dict_to_rest_object() + [
+            "max_concurrency_per_instance",
+            "error_threshold",
+            "logging_level",
+            "retry_settings",
+            "mini_batch_size",
+        ]
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.command import ParallelSchema
+
+        return ParallelSchema(context=context)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py
new file mode 100644
index 00000000..76c94c17
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/__init__.py
@@ -0,0 +1,29 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from .ai_super_computer_configuration import (
+    AISuperComputerConfiguration,
+    AISuperComputerScalePolicy,
+    AISuperComputerStorageReferenceConfiguration,
+)
+from .itp_configuration import (
+    ITPConfiguration,
+    ITPInteractiveConfiguration,
+    ITPPriorityConfiguration,
+    ITPResourceConfiguration,
+    ITPRetrySettings,
+)
+from .target_selector import TargetSelector
+
+__all__ = [
+    "ITPInteractiveConfiguration",
+    "ITPPriorityConfiguration",
+    "ITPResourceConfiguration",
+    "ITPRetrySettings",
+    "ITPConfiguration",
+    "TargetSelector",
+    "AISuperComputerConfiguration",
+    "AISuperComputerScalePolicy",
+    "AISuperComputerStorageReferenceConfiguration",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py
new file mode 100644
index 00000000..89f338ca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/ai_super_computer_configuration.py
@@ -0,0 +1,194 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Any, Dict, List, Optional
+
+from ....entities._job.job_resource_configuration import BaseProperty
+
+
+class PascalCaseProperty(BaseProperty):
+    _KEY_MAPPING: Dict[str, Any] = {}
+
+    def items(self):
+        result = []
+        for key, value in super().items():
+            if key.lower() in self._KEY_MAPPING:
+                key = self._KEY_MAPPING[key.lower()]
+            result.append((key, value))
+        return result
+
+
+class AISuperComputerStorageReferenceConfiguration(PascalCaseProperty):  # pylint: disable=name-too-long
+    _KEY_MAPPING = {
+        "container_name": "ContainerName",
+        "relative_path": "RelativePath",
+    }
+
+    def __init__(
+        self,
+        container_name: str,
+        relative_path: str,
+        **kwargs,
+    ):
+        """
+        :param container_name: The name of the ai-super-computer storage container.
+        :type container_name: str
+        :param relative_path: The path on the ai-super-computer storage container.
+        :type relative_path: str
+        """
+        super().__init__(**kwargs)
+        self.container_name = container_name
+        self.relative_path = relative_path
+
+
+class AISuperComputerScalePolicy(PascalCaseProperty):
+    _KEY_MAPPING = {
+        "auto_scale_instance_type_count_set": "AutoScaleInstanceTypeCountSet",
+        "auto_scale_interval_in_sec": "AutoScaleIntervalInSec",
+        "max_instance_type_count": "MaxInstanceTypeCount",
+        "min_instance_type_count": "MinInstanceTypeCount",
+    }
+
+    def __init__(
+        self,
+        auto_scale_instance_type_count_set: Optional[List[int]] = None,
+        auto_scale_interval_in_sec: Optional[int] = None,
+        max_instance_type_count: Optional[int] = None,
+        min_instance_type_count: Optional[int] = None,
+        **kwargs,
+    ):
+        """
+        :param auto_scale_instance_type_count_set: The list of instance type counts available
+        for elastically scaling the job. Assume currentInstanceTypeCount = 4 and
+        autoScaleInstanceTypeCountSet = [2,4,8], the job will automatically scale down as 8->4->2
+        when less capacity is available, and scale up as 2->4->8 when more capacity is available.
+        The value should be a list of integers in ascending order.
+        :type auto_scale_instance_type_count_set: List[int]
+        :param auto_scale_interval_in_sec: The minimum interval in seconds between job autoscaling.
+        You are recommended to set the autoScaleIntervalInSec longer than the checkpoint interval,
+        to make sure at least one checkpoint is saved before auto-scaling of the job.
+        :type auto_scale_interval_in_sec: int
+        :param max_instance_type_count: The maximum instance type count.
+        :type max_instance_type_count: int
+        :param min_instance_type_count: The minimum instance type count.
+        :type min_instance_type_count: int
+        """
+        super().__init__(**kwargs)
+        self.auto_scale_instance_type_count_set = auto_scale_instance_type_count_set
+        self.auto_scale_interval_in_sec = auto_scale_interval_in_sec
+        self.max_instance_type_count = max_instance_type_count
+        self.min_instance_type_count = min_instance_type_count
+
+
+class AISuperComputerConfiguration(PascalCaseProperty):  # pylint: disable=too-many-instance-attributes
+    """A class to manage AI Super Computer Configuration."""
+
+    _KEY_MAPPING = {
+        "instance_type": "InstanceType",
+        "instance_types": "InstanceTypes",
+        "image_version": "ImageVersion",
+        "location": "Location",
+        "locations": "Locations",
+        "ai_super_computer_storage_data": "AISuperComputerStorageData",
+        "interactive": "Interactive",
+        "scale_policy": "ScalePolicy",
+        "virtual_cluster_arm_id": "VirtualClusterArmId",
+        "tensorboard_log_directory": "TensorboardLogDirectory",
+        "ssh_public_key": "SSHPublicKey",
+        "ssh_public_keys": "SSHPublicKeys",
+        "enable_azml_int": "EnableAzmlInt",
+        "priority": "Priority",
+        "sla_tier": "SLATier",
+        "suspend_on_idle_time_hours": "SuspendOnIdleTimeHours",
+        "user_alias": "UserAlias",
+    }
+
+    def __init__(
+        self,
+        instance_type: Optional[str] = None,
+        instance_types: Optional[List[str]] = None,
+        image_version: Optional[str] = None,
+        location: Optional[str] = None,
+        locations: Optional[List[str]] = None,
+        ai_super_computer_storage_data: Optional[Dict[str, AISuperComputerStorageReferenceConfiguration]] = None,
+        interactive: Optional[bool] = None,
+        scale_policy: Optional[AISuperComputerScalePolicy] = None,
+        virtual_cluster_arm_id: Optional[str] = None,
+        tensorboard_log_directory: Optional[str] = None,
+        ssh_public_key: Optional[str] = None,
+        ssh_public_keys: Optional[List[str]] = None,
+        enable_azml_int: Optional[bool] = None,
+        priority: Optional[str] = None,
+        sla_tier: Optional[str] = None,
+        suspend_on_idle_time_hours: Optional[int] = None,
+        user_alias: Optional[str] = None,
+        **kwargs,
+    ):
+        """
+        :param instance_type: The class of compute to be used. The list of instance types is
+        available in https://singularitydocs.azurewebsites.net/docs/overview/instance_types/
+        :type instance_type: str
+        :param instance_types: The class of compute to be used. The list of instance types is
+        available in https://singularitydocs.azurewebsites.net/docs/overview/instance_types/
+        :type instance_types: List[str]
+        :param image_version: The image to use in ai-super-computer.  Currently only a limited set of predefined
+        images are supported.
+        :type image_version: str
+        :param location: The location (region) where the job will run. The workspace region is used
+        if neither location nor locations is specified.
+        :type location: str
+        :param locations: The location (region) where the job will run. The workspace region is used
+        if neither location nor locations is specified.
+        :type locations: List[str]
+        :param ai_super_computer_storage_data: All of the AI SuperComputer storage data sources to
+        be made available to the run based on the configurations.
+        :type ai_super_computer_storage_data: Dict[str, AISuperComputerStorageReferenceConfiguration]
+        :param interactive: Specifies whether the job should be interactive. Interactive jobs will
+        start the requested nodes, but not run a command.
+        :type interactive: bool
+        :param scale_policy: The elasticity options for a job. By leveraging elastic training,
+        the job will automatically scale up when there is extra capacity available,
+        and automatically scale down when resources are gradually called back.
+        :type scale_policy: AISuperComputerScalePolicy
+        :param virtual_cluster_arm_id: The ARM Resource Id for the Virtual Cluster to submit the
+        job to.
+        :type virtual_cluster_arm_id: str
+        :param tensorboard_log_directory: The directory where the Tensorboard logs will be written.
+        :type tensorboard_log_directory: str
+        :param ssh_public_key: The SSH Public Key to use when enabling SSH access to the job.
+        If not specified, username/password auth will be enabled.
+        :type ssh_public_key: str
+        :param ssh_public_keys: The SSH Public Key to use when enabling SSH access to the job.
+        If not specified, username/password auth will be enabled.
+        :type ssh_public_keys: List[str]
+        :param enable_azml_int: Specifies whether the job should include the azml_int utility
+        :type enable_azml_int: bool
+        :param priority: The priority of the job. The default value is Medium.
+        :type priority: str
+        :param sla_tier: The SLA tier of the job. The default value is Standard.
+        :type sla_tier: str
+        :param suspend_on_idle_time_hours: Minimum idle time before run gets automatically suspended
+        (in hours).
+        :type suspend_on_idle_time_hours: int
+        :param user_alias: User alias, used for naming mount paths.
+        :type user_alias: str
+        """
+        super().__init__(**kwargs)
+        self.instance_type = instance_type
+        self.instance_types = instance_types
+        self.image_version = image_version
+        self.location = location
+        self.locations = locations
+        self.ai_super_computer_storage_data = ai_super_computer_storage_data
+        self.interactive = interactive
+        self.scale_policy = scale_policy
+        self.virtual_cluster_arm_id = virtual_cluster_arm_id
+        self.tensorboard_log_directory = tensorboard_log_directory
+        self.ssh_public_key = ssh_public_key
+        self.ssh_public_keys = ssh_public_keys
+        self.enable_azml_int = enable_azml_int
+        self.priority = priority
+        self.sla_tier = sla_tier
+        self.suspend_on_idle_time_hours = suspend_on_idle_time_hours
+        self.user_alias = user_alias
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py
new file mode 100644
index 00000000..8868b33b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/itp_configuration.py
@@ -0,0 +1,137 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import List, Optional
+
+from ....entities._job.job_resource_configuration import BaseProperty
+
+
+class ITPResourceConfiguration(BaseProperty):
+    """ITP resource configuration."""
+
+    def __init__(
+        self,
+        gpu_count: Optional[int] = None,
+        cpu_count: Optional[int] = None,
+        memory_request_in_gb: Optional[int] = None,
+        **kwargs
+    ):
+        """
+        :param gpu_count: Gpu count Defines how many gpu cores a single node gpu job will use.
+        Default value is 1.
+        :type gpu_count: int
+        :param cpu_count: Cpu count defines how many cpu cores that a single node cpu job will use.
+        Default value is 1.
+        :type cpu_count: int
+        :param memory_request_in_gb: Memory request defines how much GB memory a single node job
+        will request. Default value is 0 which means we will automatically calculate it for user.
+        :type memory_request_in_gb: int
+        """
+        super().__init__(**kwargs)
+        self.gpu_count = gpu_count
+        self.cpu_count = cpu_count
+        self.memory_request_in_gb = memory_request_in_gb
+
+
+class ITPPriorityConfiguration(BaseProperty):
+    """ITP priority configuration."""
+
+    def __init__(
+        self,
+        job_priority: Optional[int] = None,
+        is_preemptible: Optional[bool] = None,
+        node_count_set: Optional[List[int]] = None,
+        scale_interval: Optional[int] = None,
+        **kwargs
+    ):
+        """
+        :param job_priority: The priority of a job. Default value is 200. User can set it to
+        100~200. Any value larger than 200 or less than 100 will be treated as 200.
+        in azureml.components
+        :type job_priority: int
+        :param is_preemptible: Whether to preempt extra compute resources beyond the VC quota.
+        Default value is false.
+        in azureml.components
+        :type is_preemptible: bool
+        :param node_count_set: Node count set determines how compute auto-scale nodes. The value
+        should be a list of integers in ascending order. And Only available when IsPreemptible is
+        true.
+        :type node_count_set: List[int]
+        :param scale_interval: Scale interval in min.
+        :type scale_interval: int
+        """
+        super().__init__(**kwargs)
+        self.job_priority = job_priority
+        self.is_preemptible = is_preemptible
+        self.node_count_set = node_count_set
+        self.scale_interval = scale_interval
+
+
+class ITPInteractiveConfiguration(BaseProperty):
+    """ITP interactive configuration."""
+
+    def __init__(
+        self,
+        is_ssh_enabled: Optional[bool] = None,
+        ssh_public_key: Optional[str] = None,
+        is_i_python_enabled: Optional[bool] = None,
+        is_tensor_board_enabled: Optional[bool] = None,
+        interactive_port: Optional[int] = None,
+        **kwargs
+    ):
+        """
+        :param is_ssh_enabled: Whether to enable SSH for interactive development.
+        Default value is false.
+        :type is_ssh_enabled: bool
+        :param ssh_public_key: SSH public key.
+        :type ssh_public_key: str
+        :param is_i_python_enabled: Is iPython enabled.
+        :type is_i_python_enabled: bool
+        :param is_tensor_board_enabled: Whether to enable TensorBoard. Default value is false.
+
+        :type is_tensor_board_enabled: bool
+        :param interactive_port: Allows user to specify a different interactive port. Available
+        value from 40000 to 49999.
+        :type interactive_port: int
+        """
+        super().__init__(**kwargs)
+        self.is_ssh_enabled = is_ssh_enabled
+        self.ssh_public_key = ssh_public_key
+        self.is_i_python_enabled = is_i_python_enabled
+        self.is_tensor_board_enabled = is_tensor_board_enabled
+        self.interactive_port = interactive_port
+
+
+class ITPRetrySettings(BaseProperty):
+    def __init__(self, max_retry_count=None, **kwargs):
+        super().__init__(**kwargs)
+        self.max_retry_count = max_retry_count
+
+
+class ITPConfiguration(BaseProperty):
+    """ITP configuration."""
+
+    def __init__(
+        self,
+        resource_configuration: Optional[ITPResourceConfiguration] = None,
+        priority_configuration: Optional[ITPPriorityConfiguration] = None,
+        interactive_configuration: Optional[ITPInteractiveConfiguration] = None,
+        retry: Optional[ITPRetrySettings] = None,
+        **kwargs
+    ):
+        """
+        :param resource_configuration: Resource requirement for the compute.
+
+        :type resource_configuration: ITPResourceConfiguration
+        :param priority_configuration: Priority requirement for the compute.
+
+        :type priority_configuration: ITPPriorityConfiguration
+        :param interactive_configuration: Interactive configuration when trying to access the
+        compute.
+        :type interactive_configuration: ITPInteractiveConfiguration
+        """
+        self.resource_configuration = resource_configuration or ITPResourceConfiguration()
+        self.priority_configuration = priority_configuration or ITPPriorityConfiguration()
+        self.interactive_configuration = interactive_configuration or ITPInteractiveConfiguration()
+        self.retry = retry or ITPRetrySettings()
+        super().__init__(**kwargs)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py
new file mode 100644
index 00000000..92f72db7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/runsettings/target_selector.py
@@ -0,0 +1,47 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import List, Optional
+
+from ....entities._job.job_resource_configuration import BaseProperty
+
+
+class TargetSelector(BaseProperty):
+    """Compute target selector."""
+
+    def __init__(
+        self,
+        compute_type: str,
+        instance_types: Optional[List[str]] = None,
+        regions: Optional[List[str]] = None,
+        my_resource_only: Optional[bool] = None,
+        allow_spot_vm: Optional[bool] = None,
+        **kwargs,
+    ):
+        """
+        :param compute_type: Compute type that target selector could route job to.
+        Example value: AmlCompute, AmlK8s.
+        :type compute_type: str
+        :param instance_types: List of instance_type that job could use. If no instance_types sre
+        specified, all sizes are allowed. Note instance_types here only contains VM SKU.
+        Example value: ["STANDARD_D2_V2", "ND24rs_v3"]. Note, this field is case sensitive.
+        :type instance_types: List[str]
+        :param regions: List of regions that would like to submit job to.
+        If no regions are specified, all regions are allowed. Example value: ["eastus"].
+        Currently it only works for ITP.
+        :type regions: List[str]
+        :param my_resource_only: Flag to control whether the job should be sent to the cluster
+        owned by user. If False, target selector may send the job to shared cluster. Currently it
+        only works for ITP.
+        :type my_resource_only: bool
+        :param allow_spot_vm: Flag to enable target selector service to send job to low priority VM.
+        Currently it only works for ITP.
+        :type allow_spot_vm: bool
+        """
+        super().__init__(**kwargs)
+        self.compute_type = compute_type
+        self.instance_types = instance_types
+        self.regions = regions
+        self.my_resource_only = my_resource_only
+        self.allow_spot_vm = allow_spot_vm
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py
new file mode 100644
index 00000000..9965d69f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/scope.py
@@ -0,0 +1,131 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import List, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from .._schema.component import NodeType
+from ..entities.node import InternalBaseNode
+
+
+class Scope(InternalBaseNode):
+    """Node of scope components in pipeline with specific run settings."""
+
+    def __init__(self, **kwargs):
+        kwargs.pop("type", None)
+        super(Scope, self).__init__(type=NodeType.SCOPE, **kwargs)
+        self._init = True
+        self._adla_account_name = kwargs.pop("adla_account_name", None)
+        self._scope_param = kwargs.pop("scope_param", None)
+        self._custom_job_name_suffix = kwargs.pop("custom_job_name_suffix", None)
+        self._priority = kwargs.pop("priority", None)
+        self._auto_token = kwargs.pop("auto_token", None)
+        self._tokens = kwargs.pop("tokens", None)
+        self._vcp = kwargs.pop("vcp", None)
+        self._init = False
+
+    @property
+    def adla_account_name(self) -> str:
+        """The ADLA account name to use for the scope job.
+
+        :return: ADLA account name
+        :rtype: str
+        """
+        return self._adla_account_name
+
+    @adla_account_name.setter
+    def adla_account_name(self, value: str):
+        self._adla_account_name = value
+
+    @property
+    def scope_param(self) -> str:
+        """nebula command used when submit the scope job.
+
+        :return: The nebula command
+        :rtype: str
+        """
+        return self._scope_param
+
+    @scope_param.setter
+    def scope_param(self, value: str):
+        self._scope_param = value
+
+    @property
+    def custom_job_name_suffix(self) -> str:
+        """Optional string to append to scope job name.
+
+        :return: The custom suffix
+        :rtype: str
+        """
+        return self._custom_job_name_suffix
+
+    @custom_job_name_suffix.setter
+    def custom_job_name_suffix(self, value: str):
+        self._custom_job_name_suffix = value
+
+    @property
+    def priority(self) -> int:
+        """scope job priority.
+
+        If set priority in scope_param, will override this setting.
+
+        :return: The job priority
+        :rtype: int
+        """
+        return self._priority
+
+    @priority.setter
+    def priority(self, value: int):
+        self._priority = value
+
+    @property
+    def auto_token(self) -> int:
+        """A predictor for estimating the peak resource usage of scope job.
+
+        :return: auto token
+        :rtype: int
+        """
+        return self._auto_token
+
+    @auto_token.setter
+    def auto_token(self, value: int):
+        self._auto_token = value
+
+    @property
+    def tokens(self) -> int:
+        """Standard token allocation.
+
+        :return: The token allocation
+        :rtype: int
+        """
+        return self._tokens
+
+    @tokens.setter
+    def tokens(self, value: int):
+        self._tokens = value
+
+    @property
+    def vcp(self) -> float:
+        """Standard VC percent allocation; should be a float between 0 and 1.
+
+        :return: The VC allocation
+        :rtype: float
+        """
+        return self._vcp
+
+    @vcp.setter
+    def vcp(self, value: float):
+        self._vcp = value
+
+    @classmethod
+    def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
+        return ["custom_job_name_suffix", "scope_param", "adla_account_name", "priority", "auto_token", "tokens", "vcp"]
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        from .._schema.node import ScopeSchema
+
+        return ScopeSchema(context=context)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py
new file mode 100644
index 00000000..345fa5f2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_internal/entities/spark.py
@@ -0,0 +1,192 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Dict, List, Optional, Union
+
+from marshmallow import Schema
+
+from ..._schema import PathAwareSchema
+from ...constants._job.job import RestSparkConfKey
+from ...entities import Environment, SparkJobEntry
+from ...entities._job.parameterized_spark import DUMMY_IMAGE, ParameterizedSpark
+from ...entities._job.spark_job_entry_mixin import SparkJobEntryMixin
+from .._schema.component import InternalSparkComponentSchema
+from ..entities import InternalComponent
+from .environment import InternalEnvironment
+
+
+class InternalSparkComponent(
+    InternalComponent, ParameterizedSpark, SparkJobEntryMixin
+):  # pylint: disable=too-many-instance-attributes, too-many-ancestors
+    """Internal Spark Component
+    This class is used to handle internal spark component.
+    It can be loaded from internal spark component yaml or from rest object of an internal spark component.
+    But after loaded, its structure will be the same as spark component.
+    """
+
+    def __init__(
+        self,
+        entry: Union[Dict[str, str], SparkJobEntry, None] = None,
+        py_files: Optional[List[str]] = None,
+        jars: Optional[List[str]] = None,
+        files: Optional[List[str]] = None,
+        archives: Optional[List[str]] = None,
+        driver_cores: Optional[int] = None,
+        driver_memory: Optional[str] = None,
+        executor_cores: Optional[int] = None,
+        executor_memory: Optional[str] = None,
+        executor_instances: Optional[int] = None,
+        dynamic_allocation_enabled: Optional[bool] = None,
+        dynamic_allocation_min_executors: Optional[int] = None,
+        dynamic_allocation_max_executors: Optional[int] = None,
+        conf: Optional[Dict[str, str]] = None,
+        args: Optional[str] = None,
+        **kwargs,
+    ):
+        SparkJobEntryMixin.__init__(self, entry=entry, **kwargs)
+        # environment.setter has been overridden in ParameterizedSpark, so we need to pop it out here
+        environment = kwargs.pop("environment", None)
+        InternalComponent.__init__(self, **kwargs)
+        # Pop it to avoid passing multiple values for code in ParameterizedSpark.__init__
+        code = kwargs.pop("code", None)
+        ParameterizedSpark.__init__(
+            self,
+            code=self.base_path,
+            entry=entry,
+            py_files=py_files,
+            jars=jars,
+            files=files,
+            archives=archives,
+            conf=conf,
+            environment=environment,
+            args=args,
+            **kwargs,
+        )
+        self.code = code
+        # For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf.
+        # If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent
+        # verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node
+        # in pipeline sdk
+        conf = conf or {}
+        self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None)
+        self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None)
+        self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None)
+        self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None)
+        self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None)
+        self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get(
+            RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None
+        )
+        self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get(
+            RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None
+        )
+        self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get(
+            RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None
+        )
+
+        self.conf = conf
+        self.args = args
+
+    @classmethod
+    def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
+        return InternalSparkComponentSchema(context=context)
+
+    @property  # type: ignore[override]
+    def environment(self) -> Optional[Union[Environment, str]]:
+        """Get the environment of the component.
+
+        :return: The environment of the component.
+        :rtype: Optional[Union[Environment, str]]]
+        """
+        if isinstance(self._environment, Environment) and self._environment.image is None:
+            return Environment(conda_file=self._environment.conda_file, image=DUMMY_IMAGE)
+        return self._environment
+
+    @environment.setter
+    def environment(self, value):
+        """Set the environment of the component.
+
+        :param value: The environment of the component.
+        :type value: Union[str, Environment, dict]
+        :return: No return
+        :rtype: None
+        """
+        if value is None or isinstance(value, (str, Environment)):
+            self._environment = value
+        elif isinstance(value, dict):
+            internal_environment = InternalEnvironment(**value)
+            internal_environment.resolve(self.base_path)
+            self._environment = Environment(
+                name=internal_environment.name,
+                version=internal_environment.version,
+            )
+            if internal_environment.conda:
+                self._environment.conda_file = {
+                    "dependencies": internal_environment.conda[InternalEnvironment.CONDA_DEPENDENCIES]
+                }
+            if internal_environment.docker:
+                self._environment.image = internal_environment.docker["image"]
+            # we suppose that loaded internal spark component won't be used to create another internal spark component
+            # so the environment construction here can be simplified
+        else:
+            raise ValueError(f"Unsupported environment type: {type(value)}")
+
+    @property
+    def jars(self) -> Optional[List[str]]:
+        """Get the jars of the component.
+
+        :return: The jars of the component.
+        :rtype: Optional[List[str]]
+        """
+        return self._jars
+
+    @jars.setter
+    def jars(self, value: Union[str, List[str]]):
+        """Set the jars of the component.
+
+        :param value: The jars of the component.
+        :type value: Union[str, List[str]]
+        :return: No return
+        :rtype: None
+        """
+        if isinstance(value, str):
+            value = [value]
+        self._jars = value
+
+    @property
+    def py_files(self) -> Optional[List[str]]:
+        """Get the py_files of the component.
+
+        :return: The py_files of the component.
+        :rtype: Optional[List[str]]
+        """
+        return self._py_files
+
+    @py_files.setter
+    def py_files(self, value):
+        """Set the py_files of the component.
+
+        :param value: The py_files of the component.
+        :type value: Union[str, List[str]]
+        :return: No return
+        :rtype: None
+        """
+        if isinstance(value, str):
+            value = [value]
+        self._py_files = value
+
+    def _to_dict(self) -> Dict:
+        result = super()._to_dict()
+        return result
+
+    def _to_rest_object(self):
+        result = super()._to_rest_object()
+        if "pyFiles" in result.properties.component_spec:
+            result.properties.component_spec["py_files"] = result.properties.component_spec.pop("pyFiles")
+        return result
+
+    @classmethod
+    def _from_rest_object_to_init_params(cls, obj) -> Dict:
+        if "py_files" in obj.properties.component_spec:
+            obj.properties.component_spec["pyFiles"] = obj.properties.component_spec.pop("py_files")
+        result = super()._from_rest_object_to_init_params(obj)
+        return result