about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/__init__.py28
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/base_job.py69
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/command_job.py23
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/creation_context.py16
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/data_transfer_job.py60
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/distribution.py104
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/identity.py67
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/import_job.py54
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_entry.py256
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_fields_provider.py50
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_port.py29
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_limits.py45
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_output.py18
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parallel_job.py15
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_command.py41
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_parallel.py72
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_spark.py151
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/services.py100
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/spark_job.py28
19 files changed, 1226 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/__init__.py
new file mode 100644
index 00000000..11687396
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/__init__.py
@@ -0,0 +1,28 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)  # type: ignore
+
+from azure.ai.ml._schema.job.creation_context import CreationContextSchema
+
+from .base_job import BaseJobSchema
+from .command_job import CommandJobSchema
+from .import_job import ImportJobSchema
+from .parallel_job import ParallelJobSchema
+from .parameterized_command import ParameterizedCommandSchema
+from .parameterized_parallel import ParameterizedParallelSchema
+from .parameterized_spark import ParameterizedSparkSchema
+from .spark_job import SparkJobSchema
+
+__all__ = [
+    "BaseJobSchema",
+    "ParameterizedCommandSchema",
+    "ParameterizedParallelSchema",
+    "CommandJobSchema",
+    "ImportJobSchema",
+    "SparkJobSchema",
+    "ParallelJobSchema",
+    "CreationContextSchema",
+    "ParameterizedSparkSchema",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/base_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/base_job.py
new file mode 100644
index 00000000..852d3921
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/base_job.py
@@ -0,0 +1,69 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import logging
+
+from marshmallow import fields
+
+from azure.ai.ml._schema.core.fields import ArmStr, ComputeField, NestedField, UnionField
+from azure.ai.ml._schema.core.resource import ResourceSchema
+from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema
+from azure.ai.ml.constants._common import AzureMLResourceType
+
+from .creation_context import CreationContextSchema
+from .services import (
+    JobServiceSchema,
+    SshJobServiceSchema,
+    VsCodeJobServiceSchema,
+    TensorBoardJobServiceSchema,
+    JupyterLabJobServiceSchema,
+)
+
+module_logger = logging.getLogger(__name__)
+
+
+class BaseJobSchema(ResourceSchema):
+    creation_context = NestedField(CreationContextSchema, dump_only=True)
+    services = fields.Dict(
+        keys=fields.Str(),
+        values=UnionField(
+            [
+                NestedField(SshJobServiceSchema),
+                NestedField(TensorBoardJobServiceSchema),
+                NestedField(VsCodeJobServiceSchema),
+                NestedField(JupyterLabJobServiceSchema),
+                # JobServiceSchema should be the last in the list.
+                # To support types not set by users like Custom, Tracking, Studio.
+                NestedField(JobServiceSchema),
+            ],
+            is_strict=True,
+        ),
+    )
+    name = fields.Str()
+    id = ArmStr(azureml_type=AzureMLResourceType.JOB, dump_only=True, required=False)
+    display_name = fields.Str(required=False)
+    tags = fields.Dict(keys=fields.Str(), values=fields.Str(allow_none=True))
+    status = fields.Str(dump_only=True)
+    experiment_name = fields.Str()
+    properties = fields.Dict(keys=fields.Str(), values=fields.Str(allow_none=True))
+    description = fields.Str()
+    log_files = fields.Dict(
+        keys=fields.Str(),
+        values=fields.Str(),
+        dump_only=True,
+        metadata={
+            "description": (
+                "The list of log files associated with this run. This section is only populated "
+                "by the service and will be ignored if contained in a yaml sent to the service "
+                "(e.g. via `az ml job create` ...)"
+            )
+        },
+    )
+    compute = ComputeField(required=False)
+    identity = UnionField(
+        [
+            NestedField(ManagedIdentitySchema),
+            NestedField(AMLTokenIdentitySchema),
+            NestedField(UserIdentitySchema),
+        ]
+    )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/command_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/command_job.py
new file mode 100644
index 00000000..9cce7de7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/command_job.py
@@ -0,0 +1,23 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import fields
+
+from azure.ai.ml._schema.core.fields import NestedField, StringTransformedEnum
+from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField
+from azure.ai.ml.constants import JobType
+
+from .base_job import BaseJobSchema
+from .job_limits import CommandJobLimitsSchema
+from .parameterized_command import ParameterizedCommandSchema
+
+
+class CommandJobSchema(ParameterizedCommandSchema, BaseJobSchema):
+    type = StringTransformedEnum(allowed_values=JobType.COMMAND)
+    # do not promote it as CommandComponent has no field named 'limits'
+    limits = NestedField(CommandJobLimitsSchema)
+    parameters = fields.Dict(dump_only=True)
+    inputs = InputsField()
+    outputs = OutputsField()
+    parent_job_name = fields.Str()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/creation_context.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/creation_context.py
new file mode 100644
index 00000000..79956e1c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/creation_context.py
@@ -0,0 +1,16 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import fields
+
+from azure.ai.ml._schema.core.schema import PatchedSchemaMeta
+
+
+class CreationContextSchema(metaclass=PatchedSchemaMeta):
+    created_at = fields.DateTime()
+    created_by = fields.Str()
+    created_by_type = fields.Str()
+    last_modified_at = fields.DateTime()
+    last_modified_by = fields.Str()
+    last_modified_by_type = fields.Str()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/data_transfer_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/data_transfer_job.py
new file mode 100644
index 00000000..6ea54df6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/data_transfer_job.py
@@ -0,0 +1,60 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import validates, ValidationError, fields
+from azure.ai.ml._schema.core.fields import NestedField
+from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField
+from azure.ai.ml._schema.job.input_output_entry import DatabaseSchema, FileSystemSchema, OutputSchema
+from azure.ai.ml.constants import JobType
+from azure.ai.ml.constants._component import DataTransferTaskType, DataCopyMode
+
+from ..core.fields import ComputeField, StringTransformedEnum, UnionField
+from .base_job import BaseJobSchema
+
+
+class DataTransferCopyJobSchema(BaseJobSchema):
+    type = StringTransformedEnum(required=True, allowed_values=JobType.DATA_TRANSFER)
+    task = StringTransformedEnum(allowed_values=[DataTransferTaskType.COPY_DATA], required=True)
+    data_copy_mode = StringTransformedEnum(
+        allowed_values=[DataCopyMode.MERGE_WITH_OVERWRITE, DataCopyMode.FAIL_IF_CONFLICT]
+    )
+    compute = ComputeField()
+    inputs = InputsField()
+    outputs = OutputsField()
+
+
+class DataTransferImportJobSchema(BaseJobSchema):
+    type = StringTransformedEnum(required=True, allowed_values=JobType.DATA_TRANSFER)
+    task = StringTransformedEnum(allowed_values=[DataTransferTaskType.IMPORT_DATA], required=True)
+    compute = ComputeField()
+    outputs = fields.Dict(
+        keys=fields.Str(),
+        values=NestedField(nested=OutputSchema, allow_none=False),
+        metadata={"description": "Outputs of a data transfer job."},
+    )
+    source = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False)
+
+    @validates("outputs")
+    def outputs_key(self, value):
+        if len(value) != 1 or list(value.keys())[0] != "sink":
+            raise ValidationError(
+                f"outputs field only support one output called sink in task type "
+                f"{DataTransferTaskType.IMPORT_DATA}."
+            )
+
+
+class DataTransferExportJobSchema(BaseJobSchema):
+    type = StringTransformedEnum(required=True, allowed_values=JobType.DATA_TRANSFER)
+    task = StringTransformedEnum(allowed_values=[DataTransferTaskType.EXPORT_DATA], required=True)
+    compute = ComputeField()
+    inputs = InputsField(allow_none=False)
+    sink = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False)
+
+    @validates("inputs")
+    def inputs_key(self, value):
+        if len(value) != 1 or list(value.keys())[0] != "source":
+            raise ValidationError(
+                f"inputs field only support one input called source in task type "
+                f"{DataTransferTaskType.EXPORT_DATA}."
+            )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/distribution.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/distribution.py
new file mode 100644
index 00000000..475792a3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/distribution.py
@@ -0,0 +1,104 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+
+from marshmallow import ValidationError, fields, post_load, pre_dump
+
+from azure.ai.ml._schema.core.fields import StringTransformedEnum
+from azure.ai.ml.constants import DistributionType
+from azure.ai.ml._utils._experimental import experimental
+
+from ..core.schema import PatchedSchemaMeta
+
+module_logger = logging.getLogger(__name__)
+
+
+class MPIDistributionSchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(required=True, allowed_values=DistributionType.MPI)
+    process_count_per_instance = fields.Int()
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml import MpiDistribution
+
+        data.pop("type", None)
+        return MpiDistribution(**data)
+
+    @pre_dump
+    def predump(self, data, **kwargs):
+        from azure.ai.ml import MpiDistribution
+
+        if not isinstance(data, MpiDistribution):
+            raise ValidationError("Cannot dump non-MpiDistribution object into MpiDistributionSchema")
+        return data
+
+
+class TensorFlowDistributionSchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(required=True, allowed_values=DistributionType.TENSORFLOW)
+    parameter_server_count = fields.Int()
+    worker_count = fields.Int()
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml import TensorFlowDistribution
+
+        data.pop("type", None)
+        return TensorFlowDistribution(**data)
+
+    @pre_dump
+    def predump(self, data, **kwargs):
+        from azure.ai.ml import TensorFlowDistribution
+
+        if not isinstance(data, TensorFlowDistribution):
+            raise ValidationError("Cannot dump non-TensorFlowDistribution object into TensorFlowDistributionSchema")
+        return data
+
+
+class PyTorchDistributionSchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(required=True, allowed_values=DistributionType.PYTORCH)
+    process_count_per_instance = fields.Int()
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml import PyTorchDistribution
+
+        data.pop("type", None)
+        return PyTorchDistribution(**data)
+
+    @pre_dump
+    def predump(self, data, **kwargs):
+        from azure.ai.ml import PyTorchDistribution
+
+        if not isinstance(data, PyTorchDistribution):
+            raise ValidationError("Cannot dump non-PyTorchDistribution object into PyTorchDistributionSchema")
+        return data
+
+
+@experimental
+class RayDistributionSchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(required=True, allowed_values=DistributionType.RAY)
+    port = fields.Int()
+    address = fields.Str()
+    include_dashboard = fields.Bool()
+    dashboard_port = fields.Int()
+    head_node_additional_args = fields.Str()
+    worker_node_additional_args = fields.Str()
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml import RayDistribution
+
+        data.pop("type", None)
+        return RayDistribution(**data)
+
+    @pre_dump
+    def predump(self, data, **kwargs):
+        from azure.ai.ml import RayDistribution
+
+        if not isinstance(data, RayDistribution):
+            raise ValidationError("Cannot dump non-RayDistribution object into RayDistributionSchema")
+        return data
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/identity.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/identity.py
new file mode 100644
index 00000000..2f2be676
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/identity.py
@@ -0,0 +1,67 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+
+from marshmallow import fields, post_load
+
+from azure.ai.ml._restclient.v2023_04_01_preview.models import (
+    ConnectionAuthType,
+    IdentityConfigurationType,
+)
+from azure.ai.ml._schema.core.fields import StringTransformedEnum
+from azure.ai.ml._utils.utils import camel_to_snake
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+)
+
+from ..core.schema import PatchedSchemaMeta
+
+module_logger = logging.getLogger(__name__)
+
+
+class ManagedIdentitySchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(
+        required=True,
+        allowed_values=[IdentityConfigurationType.MANAGED, ConnectionAuthType.MANAGED_IDENTITY],
+        casing_transform=camel_to_snake,
+    )
+    client_id = fields.Str()
+    object_id = fields.Str()
+    msi_resource_id = fields.Str()
+
+    @post_load
+    def make(self, data, **kwargs):
+        data.pop("type")
+        return ManagedIdentityConfiguration(**data)
+
+
+class AMLTokenIdentitySchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(
+        required=True,
+        allowed_values=IdentityConfigurationType.AML_TOKEN,
+        casing_transform=camel_to_snake,
+    )
+
+    @post_load
+    def make(self, data, **kwargs):
+        data.pop("type")
+        return AmlTokenConfiguration(**data)
+
+
+class UserIdentitySchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(
+        required=True,
+        allowed_values=IdentityConfigurationType.USER_IDENTITY,
+        casing_transform=camel_to_snake,
+    )
+
+    @post_load
+    def make(self, data, **kwargs):
+        data.pop("type")
+        return UserIdentityConfiguration(**data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/import_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/import_job.py
new file mode 100644
index 00000000..8f7c3908
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/import_job.py
@@ -0,0 +1,54 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+from marshmallow import fields, post_load
+
+from azure.ai.ml._schema.core.fields import NestedField, StringTransformedEnum, UnionField
+from azure.ai.ml._schema.core.schema_meta import PatchedSchemaMeta
+from azure.ai.ml._schema.job.input_output_entry import OutputSchema
+from azure.ai.ml.constants import ImportSourceType, JobType
+
+from .base_job import BaseJobSchema
+
+
+class DatabaseImportSourceSchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(
+        allowed_values=[
+            ImportSourceType.AZURESQLDB,
+            ImportSourceType.AZURESYNAPSEANALYTICS,
+            ImportSourceType.SNOWFLAKE,
+        ],
+        required=True,
+    )
+    connection = fields.Str(required=True)
+    query = fields.Str(required=True)
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.entities._job.import_job import DatabaseImportSource
+
+        return DatabaseImportSource(**data)
+
+
+class FileImportSourceSchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(allowed_values=[ImportSourceType.S3], required=True)
+    connection = fields.Str(required=True)
+    path = fields.Str(required=True)
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.entities._job.import_job import FileImportSource
+
+        return FileImportSource(**data)
+
+
+class ImportJobSchema(BaseJobSchema):
+    class Meta:
+        exclude = ["compute"]  # compute property not applicable to import job
+
+    type = StringTransformedEnum(allowed_values=JobType.IMPORT)
+    source = UnionField([NestedField(DatabaseImportSourceSchema), NestedField(FileImportSourceSchema)], required=True)
+    output = NestedField(OutputSchema, required=True)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_entry.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_entry.py
new file mode 100644
index 00000000..1300ab07
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_entry.py
@@ -0,0 +1,256 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+
+from marshmallow import ValidationError, fields, post_load, pre_dump
+
+from azure.ai.ml._schema.core.fields import (
+    ArmVersionedStr,
+    StringTransformedEnum,
+    UnionField,
+    LocalPathField,
+    NestedField,
+    VersionField,
+)
+
+from azure.ai.ml._schema.core.schema import PatchedSchemaMeta, PathAwareSchema
+from azure.ai.ml.constants._common import (
+    AssetTypes,
+    AzureMLResourceType,
+    InputOutputModes,
+)
+from azure.ai.ml.constants._component import ExternalDataType
+
+module_logger = logging.getLogger(__name__)
+
+
+class InputSchema(metaclass=PatchedSchemaMeta):
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.entities._inputs_outputs import Input
+
+        return Input(**data)
+
+    @pre_dump
+    def check_dict(self, data, **kwargs):
+        from azure.ai.ml.entities._inputs_outputs import Input
+
+        if isinstance(data, Input):
+            return data
+        raise ValidationError("InputSchema needs type Input to dump")
+
+
+def generate_path_property(azureml_type):
+    return UnionField(
+        [
+            ArmVersionedStr(azureml_type=azureml_type),
+            fields.Str(metadata={"pattern": r"^(http(s)?):.*"}),
+            fields.Str(metadata={"pattern": r"^(wasb(s)?):.*"}),
+            LocalPathField(pattern=r"^file:.*"),
+            LocalPathField(
+                pattern=r"^(?!(azureml|http(s)?|wasb(s)?|file):).*",
+            ),
+        ],
+        is_strict=True,
+    )
+
+
+def generate_path_on_compute_property(azureml_type):
+    return UnionField(
+        [
+            LocalPathField(pattern=r"^file:.*"),
+        ],
+        is_strict=True,
+    )
+
+
+def generate_datastore_property():
+    metadata = {
+        "description": "Name of the datastore to upload local paths to.",
+        "arm_type": AzureMLResourceType.DATASTORE,
+    }
+    return fields.Str(metadata=metadata, required=False)
+
+
+class ModelInputSchema(InputSchema):
+    mode = StringTransformedEnum(
+        allowed_values=[
+            InputOutputModes.DOWNLOAD,
+            InputOutputModes.RO_MOUNT,
+            InputOutputModes.DIRECT,
+        ],
+        required=False,
+    )
+    type = StringTransformedEnum(
+        allowed_values=[
+            AssetTypes.CUSTOM_MODEL,
+            AssetTypes.MLFLOW_MODEL,
+            AssetTypes.TRITON_MODEL,
+        ]
+    )
+    path = generate_path_property(azureml_type=AzureMLResourceType.MODEL)
+    datastore = generate_datastore_property()
+
+
+class DataInputSchema(InputSchema):
+    mode = StringTransformedEnum(
+        allowed_values=[
+            InputOutputModes.DOWNLOAD,
+            InputOutputModes.RO_MOUNT,
+            InputOutputModes.DIRECT,
+        ],
+        required=False,
+    )
+    type = StringTransformedEnum(
+        allowed_values=[
+            AssetTypes.URI_FILE,
+            AssetTypes.URI_FOLDER,
+        ]
+    )
+    path = generate_path_property(azureml_type=AzureMLResourceType.DATA)
+    path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA)
+    datastore = generate_datastore_property()
+
+
+class MLTableInputSchema(InputSchema):
+    mode = StringTransformedEnum(
+        allowed_values=[
+            InputOutputModes.DOWNLOAD,
+            InputOutputModes.RO_MOUNT,
+            InputOutputModes.EVAL_MOUNT,
+            InputOutputModes.EVAL_DOWNLOAD,
+            InputOutputModes.DIRECT,
+        ],
+        required=False,
+    )
+    type = StringTransformedEnum(allowed_values=[AssetTypes.MLTABLE])
+    path = generate_path_property(azureml_type=AzureMLResourceType.DATA)
+    path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA)
+    datastore = generate_datastore_property()
+
+
+class InputLiteralValueSchema(metaclass=PatchedSchemaMeta):
+    value = UnionField([fields.Str(), fields.Bool(), fields.Int(), fields.Float()])
+
+    @post_load
+    def make(self, data, **kwargs):
+        return data["value"]
+
+    @pre_dump
+    def check_dict(self, data, **kwargs):
+        if hasattr(data, "value"):
+            return data
+        raise ValidationError("InputLiteralValue must have a field value")
+
+
+class OutputSchema(PathAwareSchema):
+    name = fields.Str()
+    version = VersionField()
+    mode = StringTransformedEnum(
+        allowed_values=[
+            InputOutputModes.MOUNT,
+            InputOutputModes.UPLOAD,
+            InputOutputModes.RW_MOUNT,
+            InputOutputModes.DIRECT,
+        ],
+        required=False,
+    )
+    type = StringTransformedEnum(
+        allowed_values=[
+            AssetTypes.URI_FILE,
+            AssetTypes.URI_FOLDER,
+            AssetTypes.CUSTOM_MODEL,
+            AssetTypes.MLFLOW_MODEL,
+            AssetTypes.MLTABLE,
+            AssetTypes.TRITON_MODEL,
+        ]
+    )
+    path = fields.Str()
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.entities._inputs_outputs import Output
+
+        return Output(**data)
+
+    @pre_dump
+    def check_dict(self, data, **kwargs):
+        from azure.ai.ml.entities._inputs_outputs import Output
+
+        if isinstance(data, Output):
+            return data
+        # Assists with union schema
+        raise ValidationError("OutputSchema needs type Output to dump")
+
+
+class StoredProcedureParamsSchema(metaclass=PatchedSchemaMeta):
+    name = fields.Str()
+    value = fields.Str()
+    type = fields.Str()
+
+    @pre_dump
+    def check_dict(self, data, **kwargs):
+        for key in self.dump_fields.keys():  # pylint: disable=no-member
+            if data.get(key, None) is None:
+                msg = "StoredProcedureParams must have a {!r} value."
+                raise ValidationError(msg.format(key))
+        return data
+
+
+class DatabaseSchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(allowed_values=[ExternalDataType.DATABASE], required=True)
+    table_name = fields.Str()
+    query = fields.Str(
+        metadata={"description": "The sql query command."},
+    )
+    stored_procedure = fields.Str()
+    stored_procedure_params = fields.List(NestedField(StoredProcedureParamsSchema))
+
+    connection = fields.Str(required=True)
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.data_transfer import Database
+
+        data.pop("type", None)
+        return Database(**data)
+
+    @pre_dump
+    def check_dict(self, data, **kwargs):
+        from azure.ai.ml.data_transfer import Database
+
+        if isinstance(data, Database):
+            return data
+        msg = "DatabaseSchema needs type Database to dump, but got {!r}."
+        raise ValidationError(msg.format(type(data)))
+
+
+class FileSystemSchema(metaclass=PatchedSchemaMeta):
+    type = StringTransformedEnum(
+        allowed_values=[
+            ExternalDataType.FILE_SYSTEM,
+        ],
+    )
+    path = fields.Str()
+
+    connection = fields.Str(required=True)
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.data_transfer import FileSystem
+
+        data.pop("type", None)
+        return FileSystem(**data)
+
+    @pre_dump
+    def check_dict(self, data, **kwargs):
+        from azure.ai.ml.data_transfer import FileSystem
+
+        if isinstance(data, FileSystem):
+            return data
+        msg = "FileSystemSchema needs type FileSystem to dump, but got {!r}."
+        raise ValidationError(msg.format(type(data)))
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_fields_provider.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_fields_provider.py
new file mode 100644
index 00000000..7fb2e8e0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_fields_provider.py
@@ -0,0 +1,50 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import fields
+
+from azure.ai.ml._schema._utils.data_binding_expression import support_data_binding_expression_for_fields
+from azure.ai.ml._schema.core.fields import NestedField, PrimitiveValueField, UnionField
+from azure.ai.ml._schema.job.input_output_entry import (
+    DataInputSchema,
+    InputLiteralValueSchema,
+    MLTableInputSchema,
+    ModelInputSchema,
+    OutputSchema,
+)
+
+
+def InputsField(*, support_databinding: bool = False, **kwargs):
+    value_fields = [
+        NestedField(DataInputSchema),
+        NestedField(ModelInputSchema),
+        NestedField(MLTableInputSchema),
+        NestedField(InputLiteralValueSchema),
+        PrimitiveValueField(is_strict=False),
+        # This ordering of types for the values keyword is intentional. The ordering of types
+        # determines what order schema values are matched and cast in. Changing the current ordering can
+        # result in values being mis-cast such as 1.0 translating into True.
+    ]
+
+    # As is_strict is set to True, 1 and only 1 value field must be matched.
+    # root level data-binding expression has already been covered by PrimitiveValueField;
+    # If support_databinding is True, we should only add data-binding expression support for nested fields.
+    if support_databinding:
+        for field_obj in value_fields:
+            if isinstance(field_obj, NestedField):
+                support_data_binding_expression_for_fields(field_obj.schema)
+
+    return fields.Dict(
+        keys=fields.Str(),
+        values=UnionField(value_fields, metadata={"description": "Inputs to a job."}, is_strict=True, **kwargs),
+    )
+
+
+def OutputsField(**kwargs):
+    return fields.Dict(
+        keys=fields.Str(),
+        values=NestedField(nested=OutputSchema, allow_none=True),
+        metadata={"description": "Outputs of a job."},
+        **kwargs
+    )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_port.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_port.py
new file mode 100644
index 00000000..f37b2a16
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_port.py
@@ -0,0 +1,29 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+import logging
+
+from marshmallow import fields, post_load, validate
+
+from azure.ai.ml.entities import InputPort
+
+from ..core.schema import PatchedSchemaMeta
+
+module_logger = logging.getLogger(__name__)
+
+
+class InputPortSchema(metaclass=PatchedSchemaMeta):
+    type_string = fields.Str(
+        data_key="type",
+        validate=validate.OneOf(["path", "number", "null"]),
+        dump_default="null",
+    )
+    default = fields.Str()
+    optional = fields.Bool()
+
+    @post_load
+    def make(self, data, **kwargs):
+        return InputPort(**data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_limits.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_limits.py
new file mode 100644
index 00000000..850e9b3d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_limits.py
@@ -0,0 +1,45 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=unused-argument
+
+from marshmallow import fields, post_load, validate
+
+from azure.ai.ml._schema.core.schema_meta import PatchedSchemaMeta
+
+
+class CommandJobLimitsSchema(metaclass=PatchedSchemaMeta):
+    timeout = fields.Int()
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.entities import CommandJobLimits
+
+        return CommandJobLimits(**data)
+
+
+class SweepJobLimitsSchema(metaclass=PatchedSchemaMeta):
+    max_concurrent_trials = fields.Int(metadata={"description": "Sweep Job max concurrent trials."})
+    max_total_trials = fields.Int(
+        metadata={"description": "Sweep Job max total trials."},
+        required=True,
+    )
+    timeout = fields.Int(
+        metadata={"description": "The max run duration in Seconds, after which the job will be cancelled."}
+    )
+    trial_timeout = fields.Int(metadata={"description": "Sweep Job Trial timeout value."})
+
+    @post_load
+    def make(self, data, **kwargs):
+        from azure.ai.ml.sweep import SweepJobLimits
+
+        return SweepJobLimits(**data)
+
+
+class DoWhileLimitsSchema(metaclass=PatchedSchemaMeta):
+    max_iteration_count = fields.Int(
+        metadata={"description": "The max iteration for do_while loop."},
+        validate=validate.Range(min=1, max=1000),
+        required=True,
+    )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_output.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_output.py
new file mode 100644
index 00000000..80679119
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_output.py
@@ -0,0 +1,18 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import logging
+
+from marshmallow import fields
+
+from azure.ai.ml._schema.core.fields import ArmStr
+from azure.ai.ml._schema.core.schema import PatchedSchemaMeta
+from azure.ai.ml.constants._common import AzureMLResourceType
+
+module_logger = logging.getLogger(__name__)
+
+
+class JobOutputSchema(metaclass=PatchedSchemaMeta):
+    datastore_id = ArmStr(azureml_type=AzureMLResourceType.DATASTORE)
+    path = fields.Str()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parallel_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parallel_job.py
new file mode 100644
index 00000000..c539e407
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parallel_job.py
@@ -0,0 +1,15 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from azure.ai.ml._schema.core.fields import StringTransformedEnum
+from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField
+from azure.ai.ml.constants import JobType
+
+from .base_job import BaseJobSchema
+from .parameterized_parallel import ParameterizedParallelSchema
+
+
+class ParallelJobSchema(ParameterizedParallelSchema, BaseJobSchema):
+    type = StringTransformedEnum(allowed_values=JobType.PARALLEL)
+    inputs = InputsField()
+    outputs = OutputsField()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_command.py
new file mode 100644
index 00000000..1c011bc9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_command.py
@@ -0,0 +1,41 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import fields
+
+from azure.ai.ml._schema.core.fields import (
+    CodeField,
+    DistributionField,
+    EnvironmentField,
+    ExperimentalField,
+    NestedField,
+)
+from azure.ai.ml._schema.core.schema import PathAwareSchema
+from azure.ai.ml._schema.job.input_output_entry import InputLiteralValueSchema
+from azure.ai.ml._schema.job_resource_configuration import JobResourceConfigurationSchema
+from azure.ai.ml._schema.queue_settings import QueueSettingsSchema
+
+from ..core.fields import UnionField
+
+
+class ParameterizedCommandSchema(PathAwareSchema):
+    command = fields.Str(
+        metadata={
+            # pylint: disable=line-too-long
+            "description": "The command run and the parameters passed. This string may contain place holders of inputs in {}. "
+        },
+        required=True,
+    )
+    code = CodeField()
+    environment = EnvironmentField(required=True)
+    environment_variables = UnionField(
+        [
+            fields.Dict(keys=fields.Str(), values=fields.Str()),
+            # Used for binding environment variables
+            NestedField(InputLiteralValueSchema),
+        ]
+    )
+    resources = NestedField(JobResourceConfigurationSchema)
+    distribution = DistributionField()
+    queue_settings = ExperimentalField(NestedField(QueueSettingsSchema))
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_parallel.py
new file mode 100644
index 00000000..bb5cd063
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_parallel.py
@@ -0,0 +1,72 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from marshmallow import INCLUDE, fields
+
+from azure.ai.ml._schema.component.parallel_task import ComponentParallelTaskSchema
+from azure.ai.ml._schema.component.retry_settings import RetrySettingsSchema
+from azure.ai.ml._schema.core.fields import DumpableEnumField, NestedField
+from azure.ai.ml._schema.core.schema import PathAwareSchema
+from azure.ai.ml._schema.job.input_output_entry import InputLiteralValueSchema
+from azure.ai.ml._schema.job_resource_configuration import JobResourceConfigurationSchema
+from azure.ai.ml.constants._common import LoggingLevel
+
+from ..core.fields import UnionField
+
+
+class ParameterizedParallelSchema(PathAwareSchema):
+    logging_level = DumpableEnumField(
+        allowed_values=[LoggingLevel.DEBUG, LoggingLevel.INFO, LoggingLevel.WARN],
+        dump_default=LoggingLevel.INFO,
+        metadata={
+            "description": (
+                "A string of the logging level name, which is defined in 'logging'. "
+                "Possible values are 'WARNING', 'INFO', and 'DEBUG'."
+            )
+        },
+    )
+    task = NestedField(ComponentParallelTaskSchema, unknown=INCLUDE)
+    mini_batch_size = fields.Str(
+        metadata={"description": "The batch size of current job."},
+    )
+    partition_keys = fields.List(
+        fields.Str(), metadata={"description": "The keys used to partition input data into mini-batches"}
+    )
+    input_data = fields.Str()
+    resources = NestedField(JobResourceConfigurationSchema)
+    retry_settings = NestedField(RetrySettingsSchema, unknown=INCLUDE)
+    max_concurrency_per_instance = fields.Integer(
+        dump_default=1,
+        metadata={"description": "The max parallellism that each compute instance has."},
+    )
+    error_threshold = fields.Integer(
+        dump_default=-1,
+        metadata={
+            "description": (
+                "The number of item processing failures should be ignored. "
+                "If the error_threshold is reached, the job terminates. "
+                "For a list of files as inputs, one item means one file reference. "
+                "This setting doesn't apply to command parallelization."
+            )
+        },
+    )
+    mini_batch_error_threshold = fields.Integer(
+        dump_default=-1,
+        metadata={
+            "description": (
+                "The number of mini batch processing failures should be ignored. "
+                "If the mini_batch_error_threshold is reached, the job terminates. "
+                "For a list of files as inputs, one item means one file reference. "
+                "This setting can be used by either command or python function parallelization. "
+                "Only one error_threshold setting can be used in one job."
+            )
+        },
+    )
+    environment_variables = UnionField(
+        [
+            fields.Dict(keys=fields.Str(), values=fields.Str()),
+            # Used for binding environment variables
+            NestedField(InputLiteralValueSchema),
+        ]
+    )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_spark.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_spark.py
new file mode 100644
index 00000000..49e9560a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_spark.py
@@ -0,0 +1,151 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=unused-argument
+
+import re
+from typing import Any, Dict, List
+
+from marshmallow import ValidationError, fields, post_dump, post_load, pre_dump, pre_load, validates
+
+from azure.ai.ml._schema.core.fields import CodeField, EnvironmentField, NestedField
+from azure.ai.ml._schema.core.schema import PathAwareSchema
+from azure.ai.ml._schema.core.schema_meta import PatchedSchemaMeta
+
+from ..core.fields import UnionField
+
+re_memory_pattern = re.compile("^\\d+[kKmMgGtTpP]$")
+
+
+class SparkEntryFileSchema(metaclass=PatchedSchemaMeta):
+    file = fields.Str(required=True)
+    # add spark_job_entry_type and make it dump only to align with model definition,
+    # this will make us get expected value when call spark._from_rest_object()
+    spark_job_entry_type = fields.Str(dump_only=True)
+
+    @pre_dump
+    def to_dict(self, data, **kwargs):
+        return {"file": data.entry}
+
+
+class SparkEntryClassSchema(metaclass=PatchedSchemaMeta):
+    class_name = fields.Str(required=True)
+    # add spark_job_entry_type and make it dump only to align with model definition,
+    # this will make us get expected value when call spark._from_rest_object()
+    spark_job_entry_type = fields.Str(dump_only=True)
+
+    @pre_dump
+    def to_dict(self, data, **kwargs):
+        return {"class_name": data.entry}
+
+
+CONF_KEY_MAP = {
+    "driver_cores": "spark.driver.cores",
+    "driver_memory": "spark.driver.memory",
+    "executor_cores": "spark.executor.cores",
+    "executor_memory": "spark.executor.memory",
+    "executor_instances": "spark.executor.instances",
+    "dynamic_allocation_enabled": "spark.dynamicAllocation.enabled",
+    "dynamic_allocation_min_executors": "spark.dynamicAllocation.minExecutors",
+    "dynamic_allocation_max_executors": "spark.dynamicAllocation.maxExecutors",
+}
+
+
+def no_duplicates(name: str, value: List):
+    if len(value) != len(set(value)):
+        raise ValidationError(f"{name} must not contain duplicate entries.")
+
+
+class ParameterizedSparkSchema(PathAwareSchema):
+    code = CodeField()
+    entry = UnionField(
+        [NestedField(SparkEntryFileSchema), NestedField(SparkEntryClassSchema)],
+        required=True,
+        metadata={"description": "Entry."},
+    )
+    py_files = fields.List(fields.Str(required=True))
+    jars = fields.List(fields.Str(required=True))
+    files = fields.List(fields.Str(required=True))
+    archives = fields.List(fields.Str(required=True))
+    conf = fields.Dict(keys=fields.Str(), values=fields.Raw())
+    properties = fields.Dict(keys=fields.Str(), values=fields.Raw())
+    environment = EnvironmentField(allow_none=True)
+    args = fields.Str(metadata={"description": "Command Line arguments."})
+
+    @validates("py_files")
+    def no_duplicate_py_files(self, value):
+        no_duplicates("py_files", value)
+
+    @validates("jars")
+    def no_duplicate_jars(self, value):
+        no_duplicates("jars", value)
+
+    @validates("files")
+    def no_duplicate_files(self, value):
+        no_duplicates("files", value)
+
+    @validates("archives")
+    def no_duplicate_archives(self, value):
+        no_duplicates("archives", value)
+
+    @pre_load
+    # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+    def map_conf_field_names(self, data, **kwargs):
+        """Map the field names in the conf dictionary.
+        This function must be called after YamlFileSchema.load_from_file.
+        Given marshmallow executes the pre_load functions in the alphabetical order (marshmallow\\schema.py:L166,
+        functions will be checked in alphabetical order when inject to cls._hooks), we must make sure the function
+        name is alphabetically after "load_from_file".
+        """
+        # TODO: it's dangerous to depend on an alphabetical order, we'd better move related logic out of Schema.
+        conf = data["conf"] if "conf" in data else None
+        if conf is not None:
+            for field_key, dict_key in CONF_KEY_MAP.items():
+                value = conf.get(dict_key, None)
+                if dict_key in conf and value is not None:
+                    del conf[dict_key]
+                    conf[field_key] = value
+            data["conf"] = conf
+        return data
+
+    @post_dump(pass_original=True)
+    def serialize_field_names(self, data: Dict[str, Any], original_data: Dict[str, Any], **kwargs):
+        conf = data["conf"] if "conf" in data else {}
+        if original_data.conf is not None and conf is not None:
+            for field_name, value in original_data.conf.items():
+                if field_name not in conf:
+                    if isinstance(value, str) and value.isdigit():
+                        value = int(value)
+                    conf[field_name] = value
+        if conf is not None:
+            for field_name, dict_name in CONF_KEY_MAP.items():
+                val = conf.get(field_name, None)
+                if field_name in conf and val is not None:
+                    if isinstance(val, str) and val.isdigit():
+                        val = int(val)
+                    del conf[field_name]
+                    conf[dict_name] = val
+            data["conf"] = conf
+        return data
+
+    @post_load
+    def demote_conf_fields(self, data, **kwargs):
+        conf = data["conf"] if "conf" in data else None
+        if conf is not None:
+            for field_name, _ in CONF_KEY_MAP.items():
+                value = conf.get(field_name, None)
+                if field_name in conf and value is not None:
+                    del conf[field_name]
+                    data[field_name] = value
+        return data
+
+    @pre_dump
+    def promote_conf_fields(self, data: object, **kwargs):
+        # copy fields from root object into the 'conf'
+        conf = data.conf or {}
+        for field_name, _ in CONF_KEY_MAP.items():
+            value = data.__getattribute__(field_name)
+            if value is not None:
+                conf[field_name] = value
+        data.__setattr__("conf", conf)
+        return data
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/services.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/services.py
new file mode 100644
index 00000000..f6fed8c2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/services.py
@@ -0,0 +1,100 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import logging
+
+from marshmallow import fields, post_load
+
+from azure.ai.ml.entities._job.job_service import (
+    JobService,
+    SshJobService,
+    JupyterLabJobService,
+    VsCodeJobService,
+    TensorBoardJobService,
+)
+from azure.ai.ml.constants._job.job import JobServiceTypeNames
+from azure.ai.ml._schema.core.fields import StringTransformedEnum, UnionField
+
+from ..core.schema import PathAwareSchema
+
+module_logger = logging.getLogger(__name__)
+
+
+class JobServiceBaseSchema(PathAwareSchema):
+    port = fields.Int()
+    endpoint = fields.Str(dump_only=True)
+    status = fields.Str(dump_only=True)
+    nodes = fields.Str()
+    error_message = fields.Str(dump_only=True)
+    properties = fields.Dict()
+
+
+class JobServiceSchema(JobServiceBaseSchema):
+    """This is to support tansformation of job services passed as dict type and internal job services like Custom,
+    Tracking, Studio set by the system."""
+
+    type = UnionField(
+        [
+            StringTransformedEnum(
+                allowed_values=JobServiceTypeNames.NAMES_ALLOWED_FOR_PUBLIC,
+                pass_original=True,
+            ),
+            fields.Str(),
+        ]
+    )
+
+    @post_load
+    def make(self, data, **kwargs):  # pylint: disable=unused-argument
+        data.pop("type", None)
+        return JobService(**data)
+
+
+class TensorBoardJobServiceSchema(JobServiceBaseSchema):
+    type = StringTransformedEnum(
+        allowed_values=JobServiceTypeNames.EntityNames.TENSOR_BOARD,
+        pass_original=True,
+    )
+    log_dir = fields.Str()
+
+    @post_load
+    def make(self, data, **kwargs):  # pylint: disable=unused-argument
+        data.pop("type", None)
+        return TensorBoardJobService(**data)
+
+
+class SshJobServiceSchema(JobServiceBaseSchema):
+    type = StringTransformedEnum(
+        allowed_values=JobServiceTypeNames.EntityNames.SSH,
+        pass_original=True,
+    )
+    ssh_public_keys = fields.Str()
+
+    @post_load
+    def make(self, data, **kwargs):  # pylint: disable=unused-argument
+        data.pop("type", None)
+        return SshJobService(**data)
+
+
+class VsCodeJobServiceSchema(JobServiceBaseSchema):
+    type = StringTransformedEnum(
+        allowed_values=JobServiceTypeNames.EntityNames.VS_CODE,
+        pass_original=True,
+    )
+
+    @post_load
+    def make(self, data, **kwargs):  # pylint: disable=unused-argument
+        data.pop("type", None)
+        return VsCodeJobService(**data)
+
+
+class JupyterLabJobServiceSchema(JobServiceBaseSchema):
+    type = StringTransformedEnum(
+        allowed_values=JobServiceTypeNames.EntityNames.JUPYTER_LAB,
+        pass_original=True,
+    )
+
+    @post_load
+    def make(self, data, **kwargs):  # pylint: disable=unused-argument
+        data.pop("type", None)
+        return JupyterLabJobService(**data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/spark_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/spark_job.py
new file mode 100644
index 00000000..f9363175
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/spark_job.py
@@ -0,0 +1,28 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from azure.ai.ml._schema.core.fields import NestedField
+from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema
+from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField
+from azure.ai.ml._schema.spark_resource_configuration import SparkResourceConfigurationSchema
+from azure.ai.ml.constants import JobType
+
+from ..core.fields import ComputeField, StringTransformedEnum, UnionField
+from .base_job import BaseJobSchema
+from .parameterized_spark import ParameterizedSparkSchema
+
+
+class SparkJobSchema(ParameterizedSparkSchema, BaseJobSchema):
+    type = StringTransformedEnum(required=True, allowed_values=JobType.SPARK)
+    compute = ComputeField()
+    inputs = InputsField()
+    outputs = OutputsField()
+    resources = NestedField(SparkResourceConfigurationSchema)
+    identity = UnionField(
+        [
+            NestedField(ManagedIdentitySchema),
+            NestedField(AMLTokenIdentitySchema),
+            NestedField(UserIdentitySchema),
+        ]
+    )