about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/__init__.py17
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/automl_node.py148
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/component_job.py554
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/condition_node.py48
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/control_flow_job.py147
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_command_job.py31
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_component.py297
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_datatransfer_job.py55
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_import_job.py25
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job.py76
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job_io.py51
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_parallel_job.py40
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_spark_job.py29
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/settings.py42
14 files changed, 1560 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/__init__.py
new file mode 100644
index 00000000..a19931cd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/__init__.py
@@ -0,0 +1,17 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=unused-import
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
+
+from .component_job import (
+    CommandSchema,
+    ImportSchema,
+    ParallelSchema,
+    SparkSchema,
+    DataTransferCopySchema,
+    DataTransferImportSchema,
+    DataTransferExportSchema,
+)
+from .pipeline_job import PipelineJobSchema
+from .settings import PipelineJobSettingsSchema
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/automl_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/automl_node.py
new file mode 100644
index 00000000..4b815db7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/automl_node.py
@@ -0,0 +1,148 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument,protected-access
+from typing import List
+
+from marshmallow import fields, post_dump, post_load, pre_dump
+
+from azure.ai.ml._schema._utils.data_binding_expression import support_data_binding_expression_for_fields
+from azure.ai.ml._schema.automl import AutoMLClassificationSchema, AutoMLForecastingSchema, AutoMLRegressionSchema
+from azure.ai.ml._schema.automl.image_vertical.image_classification import (
+    ImageClassificationMultilabelSchema,
+    ImageClassificationSchema,
+)
+from azure.ai.ml._schema.automl.image_vertical.image_object_detection import (
+    ImageInstanceSegmentationSchema,
+    ImageObjectDetectionSchema,
+)
+from azure.ai.ml._schema.automl.nlp_vertical.text_classification import TextClassificationSchema
+from azure.ai.ml._schema.automl.nlp_vertical.text_classification_multilabel import TextClassificationMultilabelSchema
+from azure.ai.ml._schema.automl.nlp_vertical.text_ner import TextNerSchema
+from azure.ai.ml._schema.core.fields import ComputeField, NestedField, UnionField
+from azure.ai.ml._schema.core.schema import PathAwareSchema
+from azure.ai.ml._schema.job.input_output_entry import MLTableInputSchema, OutputSchema
+from azure.ai.ml._schema.pipeline.pipeline_job_io import OutputBindingStr
+
+
+class AutoMLNodeMixin(PathAwareSchema):
+    """Inherit this mixin to change automl job schemas to automl node schema.
+
+    eg: Compute is required for automl job but not required for automl node in pipeline.
+    Note: Inherit this before BaseJobSchema to make sure optional takes affect.
+    """
+
+    def __init__(self, **kwargs):
+        super(AutoMLNodeMixin, self).__init__(**kwargs)
+        # update field objects and add data binding support, won't bind task & type as data binding
+        support_data_binding_expression_for_fields(self, attrs_to_skip=["task_type", "type"])
+
+    compute = ComputeField(required=False)
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField([NestedField(OutputSchema), OutputBindingStr], allow_none=True),
+    )
+
+    @pre_dump
+    def resolve_outputs(self, job: "AutoMLJob", **kwargs):
+        # Try resolve object's inputs & outputs and return a resolved new object
+        import copy
+
+        result = copy.copy(job)
+        result._outputs = job._build_outputs()
+        return result
+
+    @post_dump(pass_original=True)
+    # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+    def resolve_nested_data(self, job_dict: dict, job: "AutoMLJob", **kwargs):
+        """Resolve nested data into flatten format."""
+        from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+
+        if not isinstance(job, AutoMLJob):
+            return job_dict
+        # change output to rest output dicts
+        job_dict["outputs"] = job._to_rest_outputs()
+        return job_dict
+
+    @post_load
+    def make(self, data, **kwargs):
+        data["task"] = data.pop("task_type")
+        return data
+
+
+class AutoMLClassificationNodeSchema(AutoMLNodeMixin, AutoMLClassificationSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    test_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class AutoMLRegressionNodeSchema(AutoMLNodeMixin, AutoMLRegressionSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    test_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class AutoMLForecastingNodeSchema(AutoMLNodeMixin, AutoMLForecastingSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    test_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class AutoMLTextClassificationNode(AutoMLNodeMixin, TextClassificationSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class AutoMLTextClassificationMultilabelNode(AutoMLNodeMixin, TextClassificationMultilabelSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class AutoMLTextNerNode(AutoMLNodeMixin, TextNerSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class ImageClassificationMulticlassNodeSchema(AutoMLNodeMixin, ImageClassificationSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class ImageClassificationMultilabelNodeSchema(AutoMLNodeMixin, ImageClassificationMultilabelSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class ImageObjectDetectionNodeSchema(AutoMLNodeMixin, ImageObjectDetectionSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+class ImageInstanceSegmentationNodeSchema(AutoMLNodeMixin, ImageInstanceSegmentationSchema):
+    training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+    validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)])
+
+
+def AutoMLNodeSchema(**kwargs) -> List[fields.Field]:
+    """Get the list of all nested schema for all AutoML nodes.
+
+    :return: The list of fields
+    :rtype: List[fields.Field]
+    """
+    return [
+        # region: automl node schemas
+        NestedField(AutoMLClassificationNodeSchema, **kwargs),
+        NestedField(AutoMLRegressionNodeSchema, **kwargs),
+        NestedField(AutoMLForecastingNodeSchema, **kwargs),
+        # Vision
+        NestedField(ImageClassificationMulticlassNodeSchema, **kwargs),
+        NestedField(ImageClassificationMultilabelNodeSchema, **kwargs),
+        NestedField(ImageObjectDetectionNodeSchema, **kwargs),
+        NestedField(ImageInstanceSegmentationNodeSchema, **kwargs),
+        # NLP
+        NestedField(AutoMLTextClassificationNode, **kwargs),
+        NestedField(AutoMLTextClassificationMultilabelNode, **kwargs),
+        NestedField(AutoMLTextNerNode, **kwargs),
+        # endregion
+    ]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/component_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/component_job.py
new file mode 100644
index 00000000..8f179479
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/component_job.py
@@ -0,0 +1,554 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import logging
+
+from marshmallow import INCLUDE, ValidationError, fields, post_dump, post_load, pre_dump, validates
+
+from ..._schema.component import (
+    AnonymousCommandComponentSchema,
+    AnonymousDataTransferCopyComponentSchema,
+    AnonymousImportComponentSchema,
+    AnonymousParallelComponentSchema,
+    AnonymousSparkComponentSchema,
+    ComponentFileRefField,
+    ComponentYamlRefField,
+    DataTransferCopyComponentFileRefField,
+    ImportComponentFileRefField,
+    ParallelComponentFileRefField,
+    SparkComponentFileRefField,
+)
+from ..._utils.utils import is_data_binding_expression
+from ...constants._common import AzureMLResourceType
+from ...constants._component import DataTransferTaskType, NodeType
+from ...entities._inputs_outputs import Input
+from ...entities._job.pipeline._attr_dict import _AttrDict
+from ...exceptions import ValidationException
+from .._sweep.parameterized_sweep import ParameterizedSweepSchema
+from .._utils.data_binding_expression import support_data_binding_expression_for_fields
+from ..component.flow import FlowComponentSchema
+from ..core.fields import (
+    ArmVersionedStr,
+    ComputeField,
+    EnvironmentField,
+    NestedField,
+    RegistryStr,
+    StringTransformedEnum,
+    TypeSensitiveUnionField,
+    UnionField,
+)
+from ..core.schema import PathAwareSchema
+from ..job import ParameterizedCommandSchema, ParameterizedParallelSchema, ParameterizedSparkSchema
+from ..job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema
+from ..job.input_output_entry import DatabaseSchema, FileSystemSchema, OutputSchema
+from ..job.input_output_fields_provider import InputsField
+from ..job.job_limits import CommandJobLimitsSchema
+from ..job.parameterized_spark import SparkEntryClassSchema, SparkEntryFileSchema
+from ..job.services import (
+    JobServiceSchema,
+    JupyterLabJobServiceSchema,
+    SshJobServiceSchema,
+    TensorBoardJobServiceSchema,
+    VsCodeJobServiceSchema,
+)
+from ..pipeline.pipeline_job_io import OutputBindingStr
+from ..spark_resource_configuration import SparkResourceConfigurationForNodeSchema
+
+module_logger = logging.getLogger(__name__)
+
+
+# do inherit PathAwareSchema to support relative path & default partial load (allow None value if not specified)
+class BaseNodeSchema(PathAwareSchema):
+    """Base schema for all node schemas."""
+
+    unknown = INCLUDE
+
+    inputs = InputsField(support_databinding=True)
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField([OutputBindingStr, NestedField(OutputSchema)], allow_none=True),
+    )
+    properties = fields.Dict(keys=fields.Str(), values=fields.Str(allow_none=True))
+    comment = fields.Str()
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        # data binding expression is not supported inside component field, while validation error
+        # message will be very long when component is an object as error message will include
+        # str(component), so just add component to skip list. The same to trial in Sweep.
+        support_data_binding_expression_for_fields(self, ["type", "component", "trial", "inputs"])
+
+    @post_dump(pass_original=True)
+    # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+    def add_user_setting_attr_dict(self, data, original_data, **kwargs):  # pylint: disable=unused-argument
+        """Support serializing unknown fields for pipeline node."""
+        if isinstance(original_data, _AttrDict):
+            user_setting_attr_dict = original_data._get_attrs()
+            # TODO: dump _AttrDict values to serializable data like dict instead of original object
+            # skip fields that are already serialized
+            for key, value in user_setting_attr_dict.items():
+                if key not in data:
+                    data[key] = value
+        return data
+
+    # an alternative would be set schema property to be load_only, but sub-schemas like CommandSchema usually also
+    # inherit from other schema classes which also have schema property. Set post dump here would be more efficient.
+    @post_dump()
+    def remove_meaningless_key_for_node(
+        self,
+        data,
+        **kwargs,  # pylint: disable=unused-argument
+    ):
+        data.pop("$schema", None)
+        return data
+
+
+def _delete_type_for_binding(io):
+    for key in io:
+        if isinstance(io[key], Input) and io[key].path and is_data_binding_expression(io[key].path):
+            io[key].type = None
+
+
+def _resolve_inputs(result, original_job):
+    result._inputs = original_job._build_inputs()
+    # delete type for literal binding input
+    _delete_type_for_binding(result._inputs)
+
+
+def _resolve_outputs(result, original_job):
+    result._outputs = original_job._build_outputs()
+    # delete type for literal binding output
+    _delete_type_for_binding(result._outputs)
+
+
+def _resolve_inputs_outputs(job):
+    # Try resolve object's inputs & outputs and return a resolved new object
+    import copy
+
+    result = copy.copy(job)
+    _resolve_inputs(result, job)
+    _resolve_outputs(result, job)
+
+    return result
+
+
+class CommandSchema(BaseNodeSchema, ParameterizedCommandSchema):
+    """Schema for Command."""
+
+    # pylint: disable=unused-argument
+    component = TypeSensitiveUnionField(
+        {
+            NodeType.COMMAND: [
+                # inline component or component file reference starting with FILE prefix
+                NestedField(AnonymousCommandComponentSchema, unknown=INCLUDE),
+                # component file reference
+                ComponentFileRefField(),
+            ],
+        },
+        plain_union_fields=[
+            # for registry type assets
+            RegistryStr(),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+        ],
+        required=True,
+    )
+    # code is directly linked to component.code, so no need to validate or dump it
+    code = fields.Str(allow_none=True, load_only=True)
+    type = StringTransformedEnum(allowed_values=[NodeType.COMMAND])
+    compute = ComputeField()
+    # do not promote it as CommandComponent has no field named 'limits'
+    limits = NestedField(CommandJobLimitsSchema)
+    # Change required fields to optional
+    command = fields.Str(
+        metadata={
+            "description": "The command run and the parameters passed. \
+            This string may contain place holders of inputs in {}. "
+        },
+        load_only=True,
+    )
+    environment = EnvironmentField()
+    services = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField(
+            [
+                NestedField(SshJobServiceSchema),
+                NestedField(JupyterLabJobServiceSchema),
+                NestedField(TensorBoardJobServiceSchema),
+                NestedField(VsCodeJobServiceSchema),
+                # JobServiceSchema should be the last in the list.
+                # To support types not set by users like Custom, Tracking, Studio.
+                NestedField(JobServiceSchema),
+            ],
+            is_strict=True,
+        ),
+    )
+    identity = UnionField(
+        [
+            NestedField(ManagedIdentitySchema),
+            NestedField(AMLTokenIdentitySchema),
+            NestedField(UserIdentitySchema),
+        ]
+    )
+
+    @post_load
+    def make(self, data, **kwargs) -> "Command":
+        from azure.ai.ml.entities._builders import parse_inputs_outputs
+        from azure.ai.ml.entities._builders.command_func import command
+
+        # parse inputs/outputs
+        data = parse_inputs_outputs(data)
+        try:
+            command_node = command(**data)
+        except ValidationException as e:
+            # It may raise ValidationError during initialization, command._validate_io e.g. raise ValidationError
+            # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate
+            raise ValidationError(e.message) from e
+        return command_node
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):
+        return _resolve_inputs_outputs(job)
+
+
+class SweepSchema(BaseNodeSchema, ParameterizedSweepSchema):
+    """Schema for Sweep."""
+
+    # pylint: disable=unused-argument
+    type = StringTransformedEnum(allowed_values=[NodeType.SWEEP])
+    compute = ComputeField()
+    trial = TypeSensitiveUnionField(
+        {
+            NodeType.SWEEP: [
+                # inline component or component file reference starting with FILE prefix
+                NestedField(AnonymousCommandComponentSchema, unknown=INCLUDE),
+                # component file reference
+                ComponentFileRefField(),
+            ],
+        },
+        plain_union_fields=[
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+        ],
+        required=True,
+    )
+
+    @post_load
+    def make(self, data, **kwargs) -> "Sweep":
+        from azure.ai.ml.entities._builders import Sweep, parse_inputs_outputs
+
+        # parse inputs/outputs
+        data = parse_inputs_outputs(data)
+        return Sweep(**data)
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):
+        return _resolve_inputs_outputs(job)
+
+
+class ParallelSchema(BaseNodeSchema, ParameterizedParallelSchema):
+    """
+    Schema for Parallel.
+    """
+
+    # pylint: disable=unused-argument
+    compute = ComputeField()
+    component = TypeSensitiveUnionField(
+        {
+            NodeType.PARALLEL: [
+                # inline component or component file reference starting with FILE prefix
+                NestedField(AnonymousParallelComponentSchema, unknown=INCLUDE),
+                # component file reference
+                ParallelComponentFileRefField(),
+            ],
+            NodeType.FLOW_PARALLEL: [
+                NestedField(FlowComponentSchema, unknown=INCLUDE, dump_only=True),
+                ComponentYamlRefField(),
+            ],
+        },
+        plain_union_fields=[
+            # for registry type assets
+            RegistryStr(),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+        ],
+        required=True,
+    )
+    identity = UnionField(
+        [
+            NestedField(ManagedIdentitySchema),
+            NestedField(AMLTokenIdentitySchema),
+            NestedField(UserIdentitySchema),
+        ]
+    )
+    type = StringTransformedEnum(allowed_values=[NodeType.PARALLEL])
+
+    @post_load
+    def make(self, data, **kwargs) -> "Parallel":
+        from azure.ai.ml.entities._builders import parse_inputs_outputs
+        from azure.ai.ml.entities._builders.parallel_func import parallel_run_function
+
+        data = parse_inputs_outputs(data)
+        parallel_node = parallel_run_function(**data)
+        return parallel_node
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):
+        return _resolve_inputs_outputs(job)
+
+
+class ImportSchema(BaseNodeSchema):
+    """
+    Schema for Import.
+    """
+
+    # pylint: disable=unused-argument
+    component = TypeSensitiveUnionField(
+        {
+            NodeType.IMPORT: [
+                # inline component or component file reference starting with FILE prefix
+                NestedField(AnonymousImportComponentSchema, unknown=INCLUDE),
+                # component file reference
+                ImportComponentFileRefField(),
+            ],
+        },
+        plain_union_fields=[
+            # for registry type assets
+            RegistryStr(),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+        ],
+        required=True,
+    )
+    type = StringTransformedEnum(allowed_values=[NodeType.IMPORT])
+
+    @post_load
+    def make(self, data, **kwargs) -> "Import":
+        from azure.ai.ml.entities._builders import parse_inputs_outputs
+        from azure.ai.ml.entities._builders.import_func import import_job
+
+        # parse inputs/outputs
+        data = parse_inputs_outputs(data)
+        import_node = import_job(**data)
+        return import_node
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):
+        return _resolve_inputs_outputs(job)
+
+
+class SparkSchema(BaseNodeSchema, ParameterizedSparkSchema):
+    """
+    Schema for Spark.
+    """
+
+    # pylint: disable=unused-argument
+    component = TypeSensitiveUnionField(
+        {
+            NodeType.SPARK: [
+                # inline component or component file reference starting with FILE prefix
+                NestedField(AnonymousSparkComponentSchema, unknown=INCLUDE),
+                # component file reference
+                SparkComponentFileRefField(),
+            ],
+        },
+        plain_union_fields=[
+            # for registry type assets
+            RegistryStr(),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+        ],
+        required=True,
+    )
+    type = StringTransformedEnum(allowed_values=[NodeType.SPARK])
+    compute = ComputeField()
+    resources = NestedField(SparkResourceConfigurationForNodeSchema)
+    entry = UnionField(
+        [NestedField(SparkEntryFileSchema), NestedField(SparkEntryClassSchema)],
+        metadata={"description": "Entry."},
+    )
+    py_files = fields.List(fields.Str())
+    jars = fields.List(fields.Str())
+    files = fields.List(fields.Str())
+    archives = fields.List(fields.Str())
+    identity = UnionField(
+        [
+            NestedField(ManagedIdentitySchema),
+            NestedField(AMLTokenIdentitySchema),
+            NestedField(UserIdentitySchema),
+        ]
+    )
+
+    # code is directly linked to component.code, so no need to validate or dump it
+    code = fields.Str(allow_none=True, load_only=True)
+
+    @post_load
+    def make(self, data, **kwargs) -> "Spark":
+        from azure.ai.ml.entities._builders import parse_inputs_outputs
+        from azure.ai.ml.entities._builders.spark_func import spark
+
+        # parse inputs/outputs
+        data = parse_inputs_outputs(data)
+        try:
+            spark_node = spark(**data)
+        except ValidationException as e:
+            # It may raise ValidationError during initialization, command._validate_io e.g. raise ValidationError
+            # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate
+            raise ValidationError(e.message) from e
+        return spark_node
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):
+        return _resolve_inputs_outputs(job)
+
+
+class DataTransferCopySchema(BaseNodeSchema):
+    """
+    Schema for DataTransferCopy.
+    """
+
+    # pylint: disable=unused-argument
+    component = TypeSensitiveUnionField(
+        {
+            NodeType.DATA_TRANSFER: [
+                # inline component or component file reference starting with FILE prefix
+                NestedField(AnonymousDataTransferCopyComponentSchema, unknown=INCLUDE),
+                # component file reference
+                DataTransferCopyComponentFileRefField(),
+            ],
+        },
+        plain_union_fields=[
+            # for registry type assets
+            RegistryStr(),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+        ],
+        required=True,
+    )
+    task = StringTransformedEnum(allowed_values=[DataTransferTaskType.COPY_DATA], required=True)
+    type = StringTransformedEnum(allowed_values=[NodeType.DATA_TRANSFER], required=True)
+    compute = ComputeField()
+
+    @post_load
+    def make(self, data, **kwargs) -> "DataTransferCopy":
+        from azure.ai.ml.entities._builders import parse_inputs_outputs
+        from azure.ai.ml.entities._builders.data_transfer_func import copy_data
+
+        # parse inputs/outputs
+        data = parse_inputs_outputs(data)
+        try:
+            data_transfer_node = copy_data(**data)
+        except ValidationException as e:
+            # It may raise ValidationError during initialization, data_transfer._validate_io e.g. raise ValidationError
+            # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate
+            raise ValidationError(e.message) from e
+        return data_transfer_node
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):
+        return _resolve_inputs_outputs(job)
+
+
+class DataTransferImportSchema(BaseNodeSchema):
+    # pylint: disable=unused-argument
+    component = UnionField(
+        [
+            # for registry type assets
+            RegistryStr(),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+        ],
+        required=True,
+    )
+    task = StringTransformedEnum(allowed_values=[DataTransferTaskType.IMPORT_DATA], required=True)
+    type = StringTransformedEnum(allowed_values=[NodeType.DATA_TRANSFER], required=True)
+    compute = ComputeField()
+    source = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False)
+    outputs = fields.Dict(
+        keys=fields.Str(), values=UnionField([OutputBindingStr, NestedField(OutputSchema)]), allow_none=False
+    )
+
+    @validates("inputs")
+    def inputs_key(self, value):
+        raise ValidationError(f"inputs field is not a valid filed in task type " f"{DataTransferTaskType.IMPORT_DATA}.")
+
+    @validates("outputs")
+    def outputs_key(self, value):
+        if len(value) != 1 or list(value.keys())[0] != "sink":
+            raise ValidationError(
+                f"outputs field only support one output called sink in task type "
+                f"{DataTransferTaskType.IMPORT_DATA}."
+            )
+
+    @post_load
+    def make(self, data, **kwargs) -> "DataTransferImport":
+        from azure.ai.ml.entities._builders import parse_inputs_outputs
+        from azure.ai.ml.entities._builders.data_transfer_func import import_data
+
+        # parse inputs/outputs
+        data = parse_inputs_outputs(data)
+        try:
+            data_transfer_node = import_data(**data)
+        except ValidationException as e:
+            # It may raise ValidationError during initialization, data_transfer._validate_io e.g. raise ValidationError
+            # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate
+            raise ValidationError(e.message) from e
+        return data_transfer_node
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):
+        return _resolve_inputs_outputs(job)
+
+
+class DataTransferExportSchema(BaseNodeSchema):
+    # pylint: disable=unused-argument
+    component = UnionField(
+        [
+            # for registry type assets
+            RegistryStr(),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+        ],
+        required=True,
+    )
+    task = StringTransformedEnum(allowed_values=[DataTransferTaskType.EXPORT_DATA])
+    type = StringTransformedEnum(allowed_values=[NodeType.DATA_TRANSFER])
+    compute = ComputeField()
+    inputs = InputsField(support_databinding=True, allow_none=False)
+    sink = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False)
+
+    @validates("inputs")
+    def inputs_key(self, value):
+        if len(value) != 1 or list(value.keys())[0] != "source":
+            raise ValidationError(
+                f"inputs field only support one input called source in task type "
+                f"{DataTransferTaskType.EXPORT_DATA}."
+            )
+
+    @validates("outputs")
+    def outputs_key(self, value):
+        raise ValidationError(
+            f"outputs field is not a valid filed in task type " f"{DataTransferTaskType.EXPORT_DATA}."
+        )
+
+    @post_load
+    def make(self, data, **kwargs) -> "DataTransferExport":
+        from azure.ai.ml.entities._builders import parse_inputs_outputs
+        from azure.ai.ml.entities._builders.data_transfer_func import export_data
+
+        # parse inputs/outputs
+        data = parse_inputs_outputs(data)
+        try:
+            data_transfer_node = export_data(**data)
+        except ValidationException as e:
+            # It may raise ValidationError during initialization, data_transfer._validate_io e.g. raise ValidationError
+            # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate
+            raise ValidationError(e.message) from e
+        return data_transfer_node
+
+    @pre_dump
+    def resolve_inputs_outputs(self, job, **kwargs):
+        return _resolve_inputs_outputs(job)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/condition_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/condition_node.py
new file mode 100644
index 00000000..a1d2901c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/condition_node.py
@@ -0,0 +1,48 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from marshmallow import fields, post_dump, ValidationError
+
+from azure.ai.ml._schema import StringTransformedEnum
+from azure.ai.ml._schema.core.fields import DataBindingStr, NodeBindingStr, UnionField
+from azure.ai.ml._schema.pipeline.control_flow_job import ControlFlowSchema
+from azure.ai.ml.constants._component import ControlFlowType
+
+
+# ConditionNodeSchema did not inherit from BaseNodeSchema since it doesn't have inputs/outputs like other nodes.
+class ConditionNodeSchema(ControlFlowSchema):
+    type = StringTransformedEnum(allowed_values=[ControlFlowType.IF_ELSE])
+    condition = UnionField([DataBindingStr(), fields.Bool()])
+    true_block = UnionField([NodeBindingStr(), fields.List(NodeBindingStr())])
+    false_block = UnionField([NodeBindingStr(), fields.List(NodeBindingStr())])
+
+    @post_dump
+    def simplify_blocks(self, data, **kwargs):  # pylint: disable=unused-argument
+        # simplify true_block and false_block to single node if there is only one node in the list
+        # this is to make sure the request to backend won't change after we support list true/false blocks
+        block_keys = ["true_block", "false_block"]
+        for block in block_keys:
+            if isinstance(data.get(block), list) and len(data.get(block)) == 1:
+                data[block] = data.get(block)[0]
+
+        # validate blocks intersection
+        def _normalize_blocks(key):
+            blocks = data.get(key, [])
+            if blocks:
+                if not isinstance(blocks, list):
+                    blocks = [blocks]
+            else:
+                blocks = []
+            return blocks
+
+        true_block = _normalize_blocks("true_block")
+        false_block = _normalize_blocks("false_block")
+
+        if not true_block and not false_block:
+            raise ValidationError("True block and false block cannot be empty at the same time.")
+
+        intersection = set(true_block).intersection(set(false_block))
+        if intersection:
+            raise ValidationError(f"True block and false block cannot contain same nodes: {intersection}")
+
+        return data
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/control_flow_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/control_flow_job.py
new file mode 100644
index 00000000..3d1e3e4a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/control_flow_job.py
@@ -0,0 +1,147 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import copy
+import json
+
+from marshmallow import INCLUDE, fields, pre_dump, pre_load
+
+from azure.ai.ml._schema.core.fields import DataBindingStr, NestedField, StringTransformedEnum, UnionField
+from azure.ai.ml._schema.core.schema import PathAwareSchema
+from azure.ai.ml.constants._component import ControlFlowType
+
+from ..job.input_output_entry import OutputSchema
+from ..job.input_output_fields_provider import InputsField
+from ..job.job_limits import DoWhileLimitsSchema
+from .component_job import _resolve_outputs
+from .pipeline_job_io import OutputBindingStr
+
+# pylint: disable=protected-access
+
+
+class ControlFlowSchema(PathAwareSchema):
+    unknown = INCLUDE
+
+
+class BaseLoopSchema(ControlFlowSchema):
+    unknown = INCLUDE
+    body = DataBindingStr()
+
+    @pre_dump
+    def convert_control_flow_body_to_binding_str(self, data, **kwargs):  # pylint: disable= unused-argument
+        result = copy.copy(data)
+        # Update body object to data_binding_str
+        result._body = data._get_body_binding_str()
+        return result
+
+
+class DoWhileSchema(BaseLoopSchema):
+    # pylint: disable=unused-argument
+    type = StringTransformedEnum(allowed_values=[ControlFlowType.DO_WHILE])
+    condition = UnionField(
+        [
+            DataBindingStr(),
+            fields.Str(),
+        ]
+    )
+    mapping = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField(
+            [
+                fields.List(fields.Str()),
+                fields.Str(),
+            ]
+        ),
+        required=True,
+    )
+    limits = NestedField(DoWhileLimitsSchema, required=True)
+
+    @pre_dump
+    def resolve_inputs_outputs(self, data, **kwargs):
+        # Try resolve object's mapping and condition and return a resolved new object
+        result = copy.copy(data)
+        mapping = {}
+        for k, v in result.mapping.items():
+            v = v if isinstance(v, list) else [v]
+            mapping[k] = [item._port_name for item in v]
+        result._mapping = mapping
+
+        try:
+            result._condition = result._condition._port_name
+        except AttributeError:
+            result._condition = result._condition
+
+        return result
+
+    @pre_dump
+    def convert_control_flow_body_to_binding_str(self, data, **kwargs):
+        return super(DoWhileSchema, self).convert_control_flow_body_to_binding_str(data, **kwargs)
+
+
+class ParallelForSchema(BaseLoopSchema):
+    type = StringTransformedEnum(allowed_values=[ControlFlowType.PARALLEL_FOR])
+    items = UnionField(
+        [
+            fields.Dict(keys=fields.Str(), values=InputsField()),
+            fields.List(InputsField()),
+            # put str in last to make sure other type items won't become string when dumps.
+            # TODO: only support binding here
+            fields.Str(),
+        ],
+        required=True,
+    )
+    max_concurrency = fields.Int()
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField([OutputBindingStr, NestedField(OutputSchema)], allow_none=True),
+    )
+
+    @pre_load
+    def load_items(self, data, **kwargs):  # pylint: disable= unused-argument
+        # load items from json to convert the assets in it to rest
+        try:
+            items = data["items"]
+            if isinstance(items, str):
+                items = json.loads(items)
+            data["items"] = items
+        except Exception:  # pylint: disable=W0718
+            pass
+        return data
+
+    @pre_dump
+    def convert_control_flow_body_to_binding_str(self, data, **kwargs):
+        return super(ParallelForSchema, self).convert_control_flow_body_to_binding_str(data, **kwargs)
+
+    @pre_dump
+    def resolve_outputs(self, job, **kwargs):  # pylint: disable=unused-argument
+        result = copy.copy(job)
+        _resolve_outputs(result, job)
+        return result
+
+    @pre_dump
+    def serialize_items(self, data, **kwargs):  # pylint: disable= unused-argument
+        # serialize items to json string to avoid being removed by _dump_for_validation
+        from azure.ai.ml.entities._job.pipeline._io import InputOutputBase
+
+        def _binding_handler(obj):
+            if isinstance(obj, InputOutputBase):
+                return str(obj)
+            return repr(obj)
+
+        result = copy.copy(data)
+        if isinstance(result.items, (dict, list)):
+            # use str to serialize input/output builder
+            result._items = json.dumps(result.items, default=_binding_handler)
+        return result
+
+
+class FLScatterGatherSchema(ControlFlowSchema):
+    # TODO determine serialization, or if this is actually needed
+
+    # @pre_dump
+    def serialize_items(self, data, **kwargs):
+        pass
+
+    # @pre_dump
+    def resolve_outputs(self, job, **kwargs):
+        pass
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_command_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_command_job.py
new file mode 100644
index 00000000..c2b96f85
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_command_job.py
@@ -0,0 +1,31 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+from typing import Any
+
+from marshmallow import fields, post_load
+
+from azure.ai.ml._schema.core.fields import ComputeField, EnvironmentField, NestedField, UnionField
+from azure.ai.ml._schema.job.command_job import CommandJobSchema
+from azure.ai.ml._schema.job.input_output_entry import OutputSchema
+
+module_logger = logging.getLogger(__name__)
+
+
+class PipelineCommandJobSchema(CommandJobSchema):
+    compute = ComputeField()
+    environment = EnvironmentField()
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField([NestedField(OutputSchema), fields.Str()], allow_none=True),
+    )
+
+    @post_load
+    def make(self, data: Any, **kwargs: Any):
+        from azure.ai.ml.entities import CommandJob
+
+        return CommandJob(**data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_component.py
new file mode 100644
index 00000000..05096e99
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_component.py
@@ -0,0 +1,297 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+from copy import deepcopy
+
+import yaml
+from marshmallow import INCLUDE, fields, post_load, pre_dump
+
+from azure.ai.ml._schema._utils.utils import _resolve_group_inputs_for_component
+from azure.ai.ml._schema.assets.asset import AnonymousAssetSchema
+from azure.ai.ml._schema.component.component import ComponentSchema
+from azure.ai.ml._schema.component.input_output import OutputPortSchema, PrimitiveOutputSchema
+from azure.ai.ml._schema.core.fields import (
+    ArmVersionedStr,
+    FileRefField,
+    NestedField,
+    PipelineNodeNameStr,
+    RegistryStr,
+    StringTransformedEnum,
+    TypeSensitiveUnionField,
+    UnionField,
+)
+from azure.ai.ml._schema.pipeline.automl_node import AutoMLNodeSchema
+from azure.ai.ml._schema.pipeline.component_job import (
+    BaseNodeSchema,
+    CommandSchema,
+    DataTransferCopySchema,
+    DataTransferExportSchema,
+    DataTransferImportSchema,
+    ImportSchema,
+    ParallelSchema,
+    SparkSchema,
+    SweepSchema,
+    _resolve_inputs_outputs,
+)
+from azure.ai.ml._schema.pipeline.condition_node import ConditionNodeSchema
+from azure.ai.ml._schema.pipeline.control_flow_job import DoWhileSchema, ParallelForSchema
+from azure.ai.ml._schema.pipeline.pipeline_command_job import PipelineCommandJobSchema
+from azure.ai.ml._schema.pipeline.pipeline_datatransfer_job import (
+    PipelineDataTransferCopyJobSchema,
+    PipelineDataTransferExportJobSchema,
+    PipelineDataTransferImportJobSchema,
+)
+from azure.ai.ml._schema.pipeline.pipeline_import_job import PipelineImportJobSchema
+from azure.ai.ml._schema.pipeline.pipeline_parallel_job import PipelineParallelJobSchema
+from azure.ai.ml._schema.pipeline.pipeline_spark_job import PipelineSparkJobSchema
+from azure.ai.ml._utils.utils import is_private_preview_enabled
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AzureMLResourceType
+from azure.ai.ml.constants._component import (
+    CONTROL_FLOW_TYPES,
+    ComponentSource,
+    ControlFlowType,
+    DataTransferTaskType,
+    NodeType,
+)
+
+
+class NodeNameStr(PipelineNodeNameStr):
+    def _get_field_name(self) -> str:
+        return "Pipeline node"
+
+
+def PipelineJobsField():
+    pipeline_enable_job_type = {
+        NodeType.COMMAND: [
+            NestedField(CommandSchema, unknown=INCLUDE),
+            NestedField(PipelineCommandJobSchema),
+        ],
+        NodeType.IMPORT: [
+            NestedField(ImportSchema, unknown=INCLUDE),
+            NestedField(PipelineImportJobSchema),
+        ],
+        NodeType.SWEEP: [NestedField(SweepSchema, unknown=INCLUDE)],
+        NodeType.PARALLEL: [
+            # ParallelSchema support parallel pipeline yml with "component"
+            NestedField(ParallelSchema, unknown=INCLUDE),
+            NestedField(PipelineParallelJobSchema, unknown=INCLUDE),
+        ],
+        NodeType.PIPELINE: [NestedField("PipelineSchema", unknown=INCLUDE)],
+        NodeType.AUTOML: AutoMLNodeSchema(unknown=INCLUDE),
+        NodeType.SPARK: [
+            NestedField(SparkSchema, unknown=INCLUDE),
+            NestedField(PipelineSparkJobSchema),
+        ],
+    }
+
+    # Note: the private node types only available when private preview flag opened before init of pipeline job
+    # schema class.
+    if is_private_preview_enabled():
+        pipeline_enable_job_type[ControlFlowType.DO_WHILE] = [NestedField(DoWhileSchema, unknown=INCLUDE)]
+        pipeline_enable_job_type[ControlFlowType.IF_ELSE] = [NestedField(ConditionNodeSchema, unknown=INCLUDE)]
+        pipeline_enable_job_type[ControlFlowType.PARALLEL_FOR] = [NestedField(ParallelForSchema, unknown=INCLUDE)]
+
+    # Todo: Put data_transfer logic to the last to avoid error message conflict, open a item to track:
+    #  https://msdata.visualstudio.com/Vienna/_workitems/edit/2244262/
+    pipeline_enable_job_type[NodeType.DATA_TRANSFER] = [
+        TypeSensitiveUnionField(
+            {
+                DataTransferTaskType.COPY_DATA: [
+                    NestedField(DataTransferCopySchema, unknown=INCLUDE),
+                    NestedField(PipelineDataTransferCopyJobSchema),
+                ],
+                DataTransferTaskType.IMPORT_DATA: [
+                    NestedField(DataTransferImportSchema, unknown=INCLUDE),
+                    NestedField(PipelineDataTransferImportJobSchema),
+                ],
+                DataTransferTaskType.EXPORT_DATA: [
+                    NestedField(DataTransferExportSchema, unknown=INCLUDE),
+                    NestedField(PipelineDataTransferExportJobSchema),
+                ],
+            },
+            type_field_name="task",
+            unknown=INCLUDE,
+        )
+    ]
+
+    pipeline_job_field = fields.Dict(
+        keys=NodeNameStr(),
+        values=TypeSensitiveUnionField(pipeline_enable_job_type),
+    )
+    return pipeline_job_field
+
+
+# pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+def _post_load_pipeline_jobs(context, data: dict) -> dict:
+    """Silently convert Job in pipeline jobs to node."""
+    from azure.ai.ml.entities._builders import parse_inputs_outputs
+    from azure.ai.ml.entities._builders.condition_node import ConditionNode
+    from azure.ai.ml.entities._builders.do_while import DoWhile
+    from azure.ai.ml.entities._builders.parallel_for import ParallelFor
+    from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+    from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
+
+    # parse inputs/outputs
+    data = parse_inputs_outputs(data)
+    # convert JobNode to Component here
+    jobs = data.get("jobs", {})
+
+    for key, job_instance in jobs.items():
+        if isinstance(job_instance, dict):
+            # convert AutoML job dict to instance
+            if job_instance.get("type") == NodeType.AUTOML:
+                job_instance = AutoMLJob._create_instance_from_schema_dict(
+                    loaded_data=job_instance,
+                )
+            elif job_instance.get("type") in CONTROL_FLOW_TYPES:
+                # Set source to yaml job for control flow node.
+                job_instance["_source"] = ComponentSource.YAML_JOB
+
+                job_type = job_instance.get("type")
+                if job_type == ControlFlowType.IF_ELSE:
+                    # Convert to if-else node.
+                    job_instance = ConditionNode._create_instance_from_schema_dict(loaded_data=job_instance)
+                elif job_instance.get("type") == ControlFlowType.DO_WHILE:
+                    # Convert to do-while node.
+                    job_instance = DoWhile._create_instance_from_schema_dict(
+                        pipeline_jobs=jobs, loaded_data=job_instance
+                    )
+                elif job_instance.get("type") == ControlFlowType.PARALLEL_FOR:
+                    # Convert to do-while node.
+                    job_instance = ParallelFor._create_instance_from_schema_dict(
+                        pipeline_jobs=jobs, loaded_data=job_instance
+                    )
+            jobs[key] = job_instance
+
+    for key, job_instance in jobs.items():
+        # Translate job to node if translatable and overrides to_node.
+        if isinstance(job_instance, ComponentTranslatableMixin) and "_to_node" in type(job_instance).__dict__:
+            # set source as YAML
+            job_instance = job_instance._to_node(
+                context=context,
+                pipeline_job_dict=data,
+            )
+            if job_instance.type == NodeType.DATA_TRANSFER and job_instance.task != DataTransferTaskType.COPY_DATA:
+                job_instance._source = ComponentSource.BUILTIN
+            else:
+                job_instance.component._source = ComponentSource.YAML_JOB
+                job_instance._source = job_instance.component._source
+            jobs[key] = job_instance
+        # update job instance name to key
+        job_instance.name = key
+    return data
+
+
+class PipelineComponentSchema(ComponentSchema):
+    type = StringTransformedEnum(allowed_values=[NodeType.PIPELINE])
+    jobs = PipelineJobsField()
+
+    # primitive output is only supported for command component & pipeline component
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField(
+            [
+                NestedField(PrimitiveOutputSchema, unknown=INCLUDE),
+                NestedField(OutputPortSchema),
+            ]
+        ),
+    )
+
+    @post_load
+    def make(self, data, **kwargs):  # pylint: disable=unused-argument
+        return _post_load_pipeline_jobs(self.context, data)
+
+
+class RestPipelineComponentSchema(PipelineComponentSchema):
+    """When component load from rest, won't validate on name since there might
+    be existing component with invalid name."""
+
+    name = fields.Str(required=True)
+
+
+class _AnonymousPipelineComponentSchema(AnonymousAssetSchema, PipelineComponentSchema):
+    """Anonymous pipeline component schema.
+
+    Note that do not support inline define anonymous pipeline component
+    directly. Inheritance follows order: AnonymousAssetSchema,
+    PipelineComponentSchema because we need name and version to be
+    dump_only(marshmallow collects fields follows method resolution
+    order).
+    """
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.entities._component.pipeline_component import PipelineComponent
+
+        # pipeline jobs post process is required before init of pipeline component: it converts control node dict
+        # to entity.
+        # however @post_load invocation order is not guaranteed, so we need to call it explicitly here.
+        _post_load_pipeline_jobs(self.context, data)
+
+        return PipelineComponent(
+            base_path=self.context[BASE_PATH_CONTEXT_KEY],
+            **data,
+        )
+
+
+class PipelineComponentFileRefField(FileRefField):
+    # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+    def _serialize(self, value, attr, obj, **kwargs):
+        """FileRefField does not support serialize.
+
+        Call AnonymousPipelineComponent schema to serialize. This
+        function is overwrite because we need Pipeline can be dumped.
+        """
+        # Update base_path to parent path of component file.
+        component_schema_context = deepcopy(self.context)
+        value = _resolve_group_inputs_for_component(value)
+        return _AnonymousPipelineComponentSchema(context=component_schema_context)._serialize(value, **kwargs)
+
+    def _deserialize(self, value, attr, data, **kwargs):
+        # Get component info from component yaml file.
+        data = super()._deserialize(value, attr, data, **kwargs)
+        component_dict = yaml.safe_load(data)
+        source_path = self.context[BASE_PATH_CONTEXT_KEY] / value
+
+        # Update base_path to parent path of component file.
+        component_schema_context = deepcopy(self.context)
+        component_schema_context[BASE_PATH_CONTEXT_KEY] = source_path.parent
+        component = _AnonymousPipelineComponentSchema(context=component_schema_context).load(
+            component_dict, unknown=INCLUDE
+        )
+        component._source_path = source_path
+        component._source = ComponentSource.YAML_COMPONENT
+        return component
+
+
+# Note: PipelineSchema is defined here instead of component_job.py is to
+# resolve circular import and support recursive schema.
+class PipelineSchema(BaseNodeSchema):
+    # pylint: disable=unused-argument
+    # do not support inline define a pipeline node
+    component = UnionField(
+        [
+            # for registry type assets
+            RegistryStr(azureml_type=AzureMLResourceType.COMPONENT),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+            # component file reference
+            PipelineComponentFileRefField(),
+        ],
+        required=True,
+    )
+    type = StringTransformedEnum(allowed_values=[NodeType.PIPELINE])
+
+    @post_load
+    def make(self, data, **kwargs) -> "Pipeline":
+        from azure.ai.ml.entities._builders import parse_inputs_outputs
+        from azure.ai.ml.entities._builders.pipeline import Pipeline
+
+        data = parse_inputs_outputs(data)
+        return Pipeline(**data)
+
+    @pre_dump
+    def resolve_inputs_outputs(self, data, **kwargs):
+        return _resolve_inputs_outputs(data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_datatransfer_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_datatransfer_job.py
new file mode 100644
index 00000000..a63e687d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_datatransfer_job.py
@@ -0,0 +1,55 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+from typing import Any
+
+from marshmallow import fields, post_load
+
+from azure.ai.ml._schema.core.fields import NestedField, UnionField
+from azure.ai.ml._schema.job.input_output_entry import OutputSchema
+from azure.ai.ml._schema.pipeline.pipeline_job_io import OutputBindingStr
+from azure.ai.ml._schema.job.data_transfer_job import (
+    DataTransferCopyJobSchema,
+    DataTransferImportJobSchema,
+    DataTransferExportJobSchema,
+)
+
+module_logger = logging.getLogger(__name__)
+
+
+class PipelineDataTransferCopyJobSchema(DataTransferCopyJobSchema):
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField([NestedField(OutputSchema), OutputBindingStr], allow_none=True),
+    )
+
+    @post_load
+    def make(self, data: Any, **kwargs: Any):
+        from azure.ai.ml.entities._job.data_transfer.data_transfer_job import DataTransferCopyJob
+
+        return DataTransferCopyJob(**data)
+
+
+class PipelineDataTransferImportJobSchema(DataTransferImportJobSchema):
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField([NestedField(OutputSchema), OutputBindingStr], allow_none=True),
+    )
+
+    @post_load
+    def make(self, data: Any, **kwargs: Any):
+        from azure.ai.ml.entities._job.data_transfer.data_transfer_job import DataTransferImportJob
+
+        return DataTransferImportJob(**data)
+
+
+class PipelineDataTransferExportJobSchema(DataTransferExportJobSchema):
+    @post_load
+    def make(self, data: Any, **kwargs: Any):
+        from azure.ai.ml.entities._job.data_transfer.data_transfer_job import DataTransferExportJob
+
+        return DataTransferExportJob(**data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_import_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_import_job.py
new file mode 100644
index 00000000..ae338597
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_import_job.py
@@ -0,0 +1,25 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+from typing import Any
+
+from marshmallow import post_load
+
+from azure.ai.ml._schema.job.import_job import ImportJobSchema
+
+module_logger = logging.getLogger(__name__)
+
+
+class PipelineImportJobSchema(ImportJobSchema):
+    class Meta:
+        exclude = ["compute"]  # compute property not applicable to import job
+
+    @post_load
+    def make(self, data: Any, **kwargs: Any):
+        from azure.ai.ml.entities._job.import_job import ImportJob
+
+        return ImportJob(**data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job.py
new file mode 100644
index 00000000..46daeb92
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job.py
@@ -0,0 +1,76 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+
+from marshmallow import INCLUDE, ValidationError, post_load, pre_dump, pre_load
+
+from azure.ai.ml._schema.core.fields import (
+    ArmVersionedStr,
+    ComputeField,
+    NestedField,
+    RegistryStr,
+    StringTransformedEnum,
+    UnionField,
+)
+from azure.ai.ml._schema.job import BaseJobSchema
+from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField
+from azure.ai.ml._schema.pipeline.component_job import _resolve_inputs_outputs
+from azure.ai.ml._schema.pipeline.pipeline_component import (
+    PipelineComponentFileRefField,
+    PipelineJobsField,
+    _post_load_pipeline_jobs,
+)
+from azure.ai.ml._schema.pipeline.settings import PipelineJobSettingsSchema
+from azure.ai.ml.constants import JobType
+from azure.ai.ml.constants._common import AzureMLResourceType
+
+module_logger = logging.getLogger(__name__)
+
+
+class PipelineJobSchema(BaseJobSchema):
+    type = StringTransformedEnum(allowed_values=[JobType.PIPELINE])
+    compute = ComputeField()
+    settings = NestedField(PipelineJobSettingsSchema, unknown=INCLUDE)
+    # Support databinding in inputs as we support macro like ${{name}}
+    inputs = InputsField(support_databinding=True)
+    outputs = OutputsField()
+    jobs = PipelineJobsField()
+    component = UnionField(
+        [
+            # for registry type assets
+            RegistryStr(azureml_type=AzureMLResourceType.COMPONENT),
+            # existing component
+            ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True),
+            # component file reference
+            PipelineComponentFileRefField(),
+        ],
+    )
+
+    @pre_dump()
+    def backup_jobs_and_remove_component(self, job, **kwargs):
+        # pylint: disable=protected-access
+        job_copy = _resolve_inputs_outputs(job)
+        if not isinstance(job_copy.component, str):
+            # If component is pipeline component object,
+            # copy jobs to job and remove component.
+            if not job_copy._jobs:
+                job_copy._jobs = job_copy.component.jobs
+            job_copy.component = None
+        return job_copy
+
+    @pre_load()
+    def check_exclusive_fields(self, data: dict, **kwargs) -> dict:
+        error_msg = "'jobs' and 'component' are mutually exclusive fields in pipeline job."
+        # When loading from yaml, data["component"] must be a local path (str)
+        # Otherwise, data["component"] can be a PipelineComponent so data["jobs"] must exist
+        if isinstance(data.get("component"), str) and data.get("jobs"):
+            raise ValidationError(error_msg)
+        return data
+
+    @post_load
+    def make(self, data: dict, **kwargs) -> dict:
+        return _post_load_pipeline_jobs(self.context, data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job_io.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job_io.py
new file mode 100644
index 00000000..3fb6a7b7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job_io.py
@@ -0,0 +1,51 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import logging
+import re
+
+from marshmallow import ValidationError, fields
+
+from azure.ai.ml.constants._component import ComponentJobConstants
+
+module_logger = logging.getLogger(__name__)
+
+
+class OutputBindingStr(fields.Field):
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+
+    def _jsonschema_type_mapping(self):
+        schema = {"type": "string", "pattern": ComponentJobConstants.OUTPUT_PATTERN}
+        if self.name is not None:
+            schema["title"] = self.name
+        if self.dump_only:
+            schema["readonly"] = True
+        return schema
+
+    def _serialize(self, value, attr, obj, **kwargs):
+        if isinstance(value, str) and re.match(ComponentJobConstants.OUTPUT_PATTERN, value):
+            return value
+        # _to_job_output in io.py will return Output
+        # add this branch to judge whether original value is a simple binding or Output
+        if (
+            isinstance(value.path, str)
+            and re.match(ComponentJobConstants.OUTPUT_PATTERN, value.path)
+            and value.mode is None
+        ):
+            return value.path
+        raise ValidationError(f"Invalid output binding string '{value}' passed")
+
+    def _deserialize(self, value, attr, data, **kwargs):
+        if (
+            isinstance(value, dict)
+            and "path" in value
+            and "mode" not in value
+            and "name" not in value
+            and "version" not in value
+        ):
+            value = value["path"]
+        if isinstance(value, str) and re.match(ComponentJobConstants.OUTPUT_PATTERN, value):
+            return value
+        raise ValidationError(f"Invalid output binding string '{value}' passed")
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_parallel_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_parallel_job.py
new file mode 100644
index 00000000..3b30fb66
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_parallel_job.py
@@ -0,0 +1,40 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+from typing import Any
+
+from marshmallow import post_load
+
+from azure.ai.ml._schema.core.fields import ComputeField, EnvironmentField, StringTransformedEnum
+from azure.ai.ml._schema.job import ParameterizedParallelSchema
+from azure.ai.ml._schema.pipeline.component_job import BaseNodeSchema
+
+from ...constants._component import NodeType
+
+module_logger = logging.getLogger(__name__)
+
+
+# parallel job inherits parallel attributes from ParameterizedParallelSchema and node functionality from BaseNodeSchema
+class PipelineParallelJobSchema(BaseNodeSchema, ParameterizedParallelSchema):
+    """Schema for ParallelJob in PipelineJob/PipelineComponent."""
+
+    type = StringTransformedEnum(allowed_values=NodeType.PARALLEL)
+    compute = ComputeField()
+    environment = EnvironmentField()
+
+    @post_load
+    def make(self, data: Any, **kwargs: Any):
+        """Construct a ParallelJob from deserialized data.
+
+        :param data: The deserialized data.
+        :type data: dict[str, Any]
+        :return: A ParallelJob.
+        :rtype: azure.ai.ml.entities._job.parallel.ParallelJob
+        """
+        from azure.ai.ml.entities._job.parallel.parallel_job import ParallelJob
+
+        return ParallelJob(**data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_spark_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_spark_job.py
new file mode 100644
index 00000000..69d58255
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_spark_job.py
@@ -0,0 +1,29 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+from typing import Any
+
+from marshmallow import fields, post_load
+
+from azure.ai.ml._schema.core.fields import NestedField, UnionField
+from azure.ai.ml._schema.job.input_output_entry import OutputSchema
+from azure.ai.ml._schema.job.spark_job import SparkJobSchema
+
+module_logger = logging.getLogger(__name__)
+
+
+class PipelineSparkJobSchema(SparkJobSchema):
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField([NestedField(OutputSchema), fields.Str()], allow_none=True),
+    )
+
+    @post_load
+    def make(self, data: Any, **kwargs: Any):
+        from azure.ai.ml.entities._job.spark_job import SparkJob
+
+        return SparkJob(**data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/settings.py
new file mode 100644
index 00000000..1e5227b0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/settings.py
@@ -0,0 +1,42 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+from marshmallow import INCLUDE, Schema, fields, post_dump, post_load
+
+from azure.ai.ml._schema.core.fields import ArmStr, StringTransformedEnum, UnionField
+from azure.ai.ml._schema.pipeline.pipeline_component import NodeNameStr
+from azure.ai.ml._utils.utils import is_private_preview_enabled
+from azure.ai.ml.constants._common import AzureMLResourceType, SERVERLESS_COMPUTE
+
+
+class PipelineJobSettingsSchema(Schema):
+    class Meta:
+        unknown = INCLUDE
+
+    default_datastore = ArmStr(azureml_type=AzureMLResourceType.DATASTORE)
+    default_compute = UnionField(
+        [
+            StringTransformedEnum(allowed_values=[SERVERLESS_COMPUTE]),
+            ArmStr(azureml_type=AzureMLResourceType.COMPUTE),
+        ]
+    )
+    continue_on_step_failure = fields.Bool()
+    force_rerun = fields.Bool()
+
+    # move init/finalize under private preview flag to hide them in spec
+    if is_private_preview_enabled():
+        on_init = NodeNameStr()
+        on_finalize = NodeNameStr()
+
+    @post_load
+    def make(self, data, **kwargs) -> "PipelineJobSettings":
+        from azure.ai.ml.entities import PipelineJobSettings
+
+        return PipelineJobSettings(**data)
+
+    @post_dump
+    def remove_none(self, data, **kwargs):
+        return {key: value for key, value in data.items() if value is not None}