diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job')
19 files changed, 1226 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/__init__.py new file mode 100644 index 00000000..11687396 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/__init__.py @@ -0,0 +1,28 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore + +from azure.ai.ml._schema.job.creation_context import CreationContextSchema + +from .base_job import BaseJobSchema +from .command_job import CommandJobSchema +from .import_job import ImportJobSchema +from .parallel_job import ParallelJobSchema +from .parameterized_command import ParameterizedCommandSchema +from .parameterized_parallel import ParameterizedParallelSchema +from .parameterized_spark import ParameterizedSparkSchema +from .spark_job import SparkJobSchema + +__all__ = [ + "BaseJobSchema", + "ParameterizedCommandSchema", + "ParameterizedParallelSchema", + "CommandJobSchema", + "ImportJobSchema", + "SparkJobSchema", + "ParallelJobSchema", + "CreationContextSchema", + "ParameterizedSparkSchema", +] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/base_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/base_job.py new file mode 100644 index 00000000..852d3921 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/base_job.py @@ -0,0 +1,69 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import logging + +from marshmallow import fields + +from azure.ai.ml._schema.core.fields import ArmStr, ComputeField, NestedField, UnionField +from azure.ai.ml._schema.core.resource import ResourceSchema +from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema +from azure.ai.ml.constants._common import AzureMLResourceType + +from .creation_context import CreationContextSchema +from .services import ( + JobServiceSchema, + SshJobServiceSchema, + VsCodeJobServiceSchema, + TensorBoardJobServiceSchema, + JupyterLabJobServiceSchema, +) + +module_logger = logging.getLogger(__name__) + + +class BaseJobSchema(ResourceSchema): + creation_context = NestedField(CreationContextSchema, dump_only=True) + services = fields.Dict( + keys=fields.Str(), + values=UnionField( + [ + NestedField(SshJobServiceSchema), + NestedField(TensorBoardJobServiceSchema), + NestedField(VsCodeJobServiceSchema), + NestedField(JupyterLabJobServiceSchema), + # JobServiceSchema should be the last in the list. + # To support types not set by users like Custom, Tracking, Studio. + NestedField(JobServiceSchema), + ], + is_strict=True, + ), + ) + name = fields.Str() + id = ArmStr(azureml_type=AzureMLResourceType.JOB, dump_only=True, required=False) + display_name = fields.Str(required=False) + tags = fields.Dict(keys=fields.Str(), values=fields.Str(allow_none=True)) + status = fields.Str(dump_only=True) + experiment_name = fields.Str() + properties = fields.Dict(keys=fields.Str(), values=fields.Str(allow_none=True)) + description = fields.Str() + log_files = fields.Dict( + keys=fields.Str(), + values=fields.Str(), + dump_only=True, + metadata={ + "description": ( + "The list of log files associated with this run. This section is only populated " + "by the service and will be ignored if contained in a yaml sent to the service " + "(e.g. via `az ml job create` ...)" + ) + }, + ) + compute = ComputeField(required=False) + identity = UnionField( + [ + NestedField(ManagedIdentitySchema), + NestedField(AMLTokenIdentitySchema), + NestedField(UserIdentitySchema), + ] + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/command_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/command_job.py new file mode 100644 index 00000000..9cce7de7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/command_job.py @@ -0,0 +1,23 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import fields + +from azure.ai.ml._schema.core.fields import NestedField, StringTransformedEnum +from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField +from azure.ai.ml.constants import JobType + +from .base_job import BaseJobSchema +from .job_limits import CommandJobLimitsSchema +from .parameterized_command import ParameterizedCommandSchema + + +class CommandJobSchema(ParameterizedCommandSchema, BaseJobSchema): + type = StringTransformedEnum(allowed_values=JobType.COMMAND) + # do not promote it as CommandComponent has no field named 'limits' + limits = NestedField(CommandJobLimitsSchema) + parameters = fields.Dict(dump_only=True) + inputs = InputsField() + outputs = OutputsField() + parent_job_name = fields.Str() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/creation_context.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/creation_context.py new file mode 100644 index 00000000..79956e1c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/creation_context.py @@ -0,0 +1,16 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import fields + +from azure.ai.ml._schema.core.schema import PatchedSchemaMeta + + +class CreationContextSchema(metaclass=PatchedSchemaMeta): + created_at = fields.DateTime() + created_by = fields.Str() + created_by_type = fields.Str() + last_modified_at = fields.DateTime() + last_modified_by = fields.Str() + last_modified_by_type = fields.Str() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/data_transfer_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/data_transfer_job.py new file mode 100644 index 00000000..6ea54df6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/data_transfer_job.py @@ -0,0 +1,60 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import validates, ValidationError, fields +from azure.ai.ml._schema.core.fields import NestedField +from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField +from azure.ai.ml._schema.job.input_output_entry import DatabaseSchema, FileSystemSchema, OutputSchema +from azure.ai.ml.constants import JobType +from azure.ai.ml.constants._component import DataTransferTaskType, DataCopyMode + +from ..core.fields import ComputeField, StringTransformedEnum, UnionField +from .base_job import BaseJobSchema + + +class DataTransferCopyJobSchema(BaseJobSchema): + type = StringTransformedEnum(required=True, allowed_values=JobType.DATA_TRANSFER) + task = StringTransformedEnum(allowed_values=[DataTransferTaskType.COPY_DATA], required=True) + data_copy_mode = StringTransformedEnum( + allowed_values=[DataCopyMode.MERGE_WITH_OVERWRITE, DataCopyMode.FAIL_IF_CONFLICT] + ) + compute = ComputeField() + inputs = InputsField() + outputs = OutputsField() + + +class DataTransferImportJobSchema(BaseJobSchema): + type = StringTransformedEnum(required=True, allowed_values=JobType.DATA_TRANSFER) + task = StringTransformedEnum(allowed_values=[DataTransferTaskType.IMPORT_DATA], required=True) + compute = ComputeField() + outputs = fields.Dict( + keys=fields.Str(), + values=NestedField(nested=OutputSchema, allow_none=False), + metadata={"description": "Outputs of a data transfer job."}, + ) + source = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False) + + @validates("outputs") + def outputs_key(self, value): + if len(value) != 1 or list(value.keys())[0] != "sink": + raise ValidationError( + f"outputs field only support one output called sink in task type " + f"{DataTransferTaskType.IMPORT_DATA}." + ) + + +class DataTransferExportJobSchema(BaseJobSchema): + type = StringTransformedEnum(required=True, allowed_values=JobType.DATA_TRANSFER) + task = StringTransformedEnum(allowed_values=[DataTransferTaskType.EXPORT_DATA], required=True) + compute = ComputeField() + inputs = InputsField(allow_none=False) + sink = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False) + + @validates("inputs") + def inputs_key(self, value): + if len(value) != 1 or list(value.keys())[0] != "source": + raise ValidationError( + f"inputs field only support one input called source in task type " + f"{DataTransferTaskType.EXPORT_DATA}." + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/distribution.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/distribution.py new file mode 100644 index 00000000..475792a3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/distribution.py @@ -0,0 +1,104 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging + +from marshmallow import ValidationError, fields, post_load, pre_dump + +from azure.ai.ml._schema.core.fields import StringTransformedEnum +from azure.ai.ml.constants import DistributionType +from azure.ai.ml._utils._experimental import experimental + +from ..core.schema import PatchedSchemaMeta + +module_logger = logging.getLogger(__name__) + + +class MPIDistributionSchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum(required=True, allowed_values=DistributionType.MPI) + process_count_per_instance = fields.Int() + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml import MpiDistribution + + data.pop("type", None) + return MpiDistribution(**data) + + @pre_dump + def predump(self, data, **kwargs): + from azure.ai.ml import MpiDistribution + + if not isinstance(data, MpiDistribution): + raise ValidationError("Cannot dump non-MpiDistribution object into MpiDistributionSchema") + return data + + +class TensorFlowDistributionSchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum(required=True, allowed_values=DistributionType.TENSORFLOW) + parameter_server_count = fields.Int() + worker_count = fields.Int() + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml import TensorFlowDistribution + + data.pop("type", None) + return TensorFlowDistribution(**data) + + @pre_dump + def predump(self, data, **kwargs): + from azure.ai.ml import TensorFlowDistribution + + if not isinstance(data, TensorFlowDistribution): + raise ValidationError("Cannot dump non-TensorFlowDistribution object into TensorFlowDistributionSchema") + return data + + +class PyTorchDistributionSchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum(required=True, allowed_values=DistributionType.PYTORCH) + process_count_per_instance = fields.Int() + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml import PyTorchDistribution + + data.pop("type", None) + return PyTorchDistribution(**data) + + @pre_dump + def predump(self, data, **kwargs): + from azure.ai.ml import PyTorchDistribution + + if not isinstance(data, PyTorchDistribution): + raise ValidationError("Cannot dump non-PyTorchDistribution object into PyTorchDistributionSchema") + return data + + +@experimental +class RayDistributionSchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum(required=True, allowed_values=DistributionType.RAY) + port = fields.Int() + address = fields.Str() + include_dashboard = fields.Bool() + dashboard_port = fields.Int() + head_node_additional_args = fields.Str() + worker_node_additional_args = fields.Str() + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml import RayDistribution + + data.pop("type", None) + return RayDistribution(**data) + + @pre_dump + def predump(self, data, **kwargs): + from azure.ai.ml import RayDistribution + + if not isinstance(data, RayDistribution): + raise ValidationError("Cannot dump non-RayDistribution object into RayDistributionSchema") + return data diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/identity.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/identity.py new file mode 100644 index 00000000..2f2be676 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/identity.py @@ -0,0 +1,67 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging + +from marshmallow import fields, post_load + +from azure.ai.ml._restclient.v2023_04_01_preview.models import ( + ConnectionAuthType, + IdentityConfigurationType, +) +from azure.ai.ml._schema.core.fields import StringTransformedEnum +from azure.ai.ml._utils.utils import camel_to_snake +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, +) + +from ..core.schema import PatchedSchemaMeta + +module_logger = logging.getLogger(__name__) + + +class ManagedIdentitySchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum( + required=True, + allowed_values=[IdentityConfigurationType.MANAGED, ConnectionAuthType.MANAGED_IDENTITY], + casing_transform=camel_to_snake, + ) + client_id = fields.Str() + object_id = fields.Str() + msi_resource_id = fields.Str() + + @post_load + def make(self, data, **kwargs): + data.pop("type") + return ManagedIdentityConfiguration(**data) + + +class AMLTokenIdentitySchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum( + required=True, + allowed_values=IdentityConfigurationType.AML_TOKEN, + casing_transform=camel_to_snake, + ) + + @post_load + def make(self, data, **kwargs): + data.pop("type") + return AmlTokenConfiguration(**data) + + +class UserIdentitySchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum( + required=True, + allowed_values=IdentityConfigurationType.USER_IDENTITY, + casing_transform=camel_to_snake, + ) + + @post_load + def make(self, data, **kwargs): + data.pop("type") + return UserIdentityConfiguration(**data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/import_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/import_job.py new file mode 100644 index 00000000..8f7c3908 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/import_job.py @@ -0,0 +1,54 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +from marshmallow import fields, post_load + +from azure.ai.ml._schema.core.fields import NestedField, StringTransformedEnum, UnionField +from azure.ai.ml._schema.core.schema_meta import PatchedSchemaMeta +from azure.ai.ml._schema.job.input_output_entry import OutputSchema +from azure.ai.ml.constants import ImportSourceType, JobType + +from .base_job import BaseJobSchema + + +class DatabaseImportSourceSchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum( + allowed_values=[ + ImportSourceType.AZURESQLDB, + ImportSourceType.AZURESYNAPSEANALYTICS, + ImportSourceType.SNOWFLAKE, + ], + required=True, + ) + connection = fields.Str(required=True) + query = fields.Str(required=True) + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.entities._job.import_job import DatabaseImportSource + + return DatabaseImportSource(**data) + + +class FileImportSourceSchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum(allowed_values=[ImportSourceType.S3], required=True) + connection = fields.Str(required=True) + path = fields.Str(required=True) + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.entities._job.import_job import FileImportSource + + return FileImportSource(**data) + + +class ImportJobSchema(BaseJobSchema): + class Meta: + exclude = ["compute"] # compute property not applicable to import job + + type = StringTransformedEnum(allowed_values=JobType.IMPORT) + source = UnionField([NestedField(DatabaseImportSourceSchema), NestedField(FileImportSourceSchema)], required=True) + output = NestedField(OutputSchema, required=True) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_entry.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_entry.py new file mode 100644 index 00000000..1300ab07 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_entry.py @@ -0,0 +1,256 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging + +from marshmallow import ValidationError, fields, post_load, pre_dump + +from azure.ai.ml._schema.core.fields import ( + ArmVersionedStr, + StringTransformedEnum, + UnionField, + LocalPathField, + NestedField, + VersionField, +) + +from azure.ai.ml._schema.core.schema import PatchedSchemaMeta, PathAwareSchema +from azure.ai.ml.constants._common import ( + AssetTypes, + AzureMLResourceType, + InputOutputModes, +) +from azure.ai.ml.constants._component import ExternalDataType + +module_logger = logging.getLogger(__name__) + + +class InputSchema(metaclass=PatchedSchemaMeta): + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.entities._inputs_outputs import Input + + return Input(**data) + + @pre_dump + def check_dict(self, data, **kwargs): + from azure.ai.ml.entities._inputs_outputs import Input + + if isinstance(data, Input): + return data + raise ValidationError("InputSchema needs type Input to dump") + + +def generate_path_property(azureml_type): + return UnionField( + [ + ArmVersionedStr(azureml_type=azureml_type), + fields.Str(metadata={"pattern": r"^(http(s)?):.*"}), + fields.Str(metadata={"pattern": r"^(wasb(s)?):.*"}), + LocalPathField(pattern=r"^file:.*"), + LocalPathField( + pattern=r"^(?!(azureml|http(s)?|wasb(s)?|file):).*", + ), + ], + is_strict=True, + ) + + +def generate_path_on_compute_property(azureml_type): + return UnionField( + [ + LocalPathField(pattern=r"^file:.*"), + ], + is_strict=True, + ) + + +def generate_datastore_property(): + metadata = { + "description": "Name of the datastore to upload local paths to.", + "arm_type": AzureMLResourceType.DATASTORE, + } + return fields.Str(metadata=metadata, required=False) + + +class ModelInputSchema(InputSchema): + mode = StringTransformedEnum( + allowed_values=[ + InputOutputModes.DOWNLOAD, + InputOutputModes.RO_MOUNT, + InputOutputModes.DIRECT, + ], + required=False, + ) + type = StringTransformedEnum( + allowed_values=[ + AssetTypes.CUSTOM_MODEL, + AssetTypes.MLFLOW_MODEL, + AssetTypes.TRITON_MODEL, + ] + ) + path = generate_path_property(azureml_type=AzureMLResourceType.MODEL) + datastore = generate_datastore_property() + + +class DataInputSchema(InputSchema): + mode = StringTransformedEnum( + allowed_values=[ + InputOutputModes.DOWNLOAD, + InputOutputModes.RO_MOUNT, + InputOutputModes.DIRECT, + ], + required=False, + ) + type = StringTransformedEnum( + allowed_values=[ + AssetTypes.URI_FILE, + AssetTypes.URI_FOLDER, + ] + ) + path = generate_path_property(azureml_type=AzureMLResourceType.DATA) + path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA) + datastore = generate_datastore_property() + + +class MLTableInputSchema(InputSchema): + mode = StringTransformedEnum( + allowed_values=[ + InputOutputModes.DOWNLOAD, + InputOutputModes.RO_MOUNT, + InputOutputModes.EVAL_MOUNT, + InputOutputModes.EVAL_DOWNLOAD, + InputOutputModes.DIRECT, + ], + required=False, + ) + type = StringTransformedEnum(allowed_values=[AssetTypes.MLTABLE]) + path = generate_path_property(azureml_type=AzureMLResourceType.DATA) + path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA) + datastore = generate_datastore_property() + + +class InputLiteralValueSchema(metaclass=PatchedSchemaMeta): + value = UnionField([fields.Str(), fields.Bool(), fields.Int(), fields.Float()]) + + @post_load + def make(self, data, **kwargs): + return data["value"] + + @pre_dump + def check_dict(self, data, **kwargs): + if hasattr(data, "value"): + return data + raise ValidationError("InputLiteralValue must have a field value") + + +class OutputSchema(PathAwareSchema): + name = fields.Str() + version = VersionField() + mode = StringTransformedEnum( + allowed_values=[ + InputOutputModes.MOUNT, + InputOutputModes.UPLOAD, + InputOutputModes.RW_MOUNT, + InputOutputModes.DIRECT, + ], + required=False, + ) + type = StringTransformedEnum( + allowed_values=[ + AssetTypes.URI_FILE, + AssetTypes.URI_FOLDER, + AssetTypes.CUSTOM_MODEL, + AssetTypes.MLFLOW_MODEL, + AssetTypes.MLTABLE, + AssetTypes.TRITON_MODEL, + ] + ) + path = fields.Str() + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.entities._inputs_outputs import Output + + return Output(**data) + + @pre_dump + def check_dict(self, data, **kwargs): + from azure.ai.ml.entities._inputs_outputs import Output + + if isinstance(data, Output): + return data + # Assists with union schema + raise ValidationError("OutputSchema needs type Output to dump") + + +class StoredProcedureParamsSchema(metaclass=PatchedSchemaMeta): + name = fields.Str() + value = fields.Str() + type = fields.Str() + + @pre_dump + def check_dict(self, data, **kwargs): + for key in self.dump_fields.keys(): # pylint: disable=no-member + if data.get(key, None) is None: + msg = "StoredProcedureParams must have a {!r} value." + raise ValidationError(msg.format(key)) + return data + + +class DatabaseSchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum(allowed_values=[ExternalDataType.DATABASE], required=True) + table_name = fields.Str() + query = fields.Str( + metadata={"description": "The sql query command."}, + ) + stored_procedure = fields.Str() + stored_procedure_params = fields.List(NestedField(StoredProcedureParamsSchema)) + + connection = fields.Str(required=True) + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.data_transfer import Database + + data.pop("type", None) + return Database(**data) + + @pre_dump + def check_dict(self, data, **kwargs): + from azure.ai.ml.data_transfer import Database + + if isinstance(data, Database): + return data + msg = "DatabaseSchema needs type Database to dump, but got {!r}." + raise ValidationError(msg.format(type(data))) + + +class FileSystemSchema(metaclass=PatchedSchemaMeta): + type = StringTransformedEnum( + allowed_values=[ + ExternalDataType.FILE_SYSTEM, + ], + ) + path = fields.Str() + + connection = fields.Str(required=True) + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.data_transfer import FileSystem + + data.pop("type", None) + return FileSystem(**data) + + @pre_dump + def check_dict(self, data, **kwargs): + from azure.ai.ml.data_transfer import FileSystem + + if isinstance(data, FileSystem): + return data + msg = "FileSystemSchema needs type FileSystem to dump, but got {!r}." + raise ValidationError(msg.format(type(data))) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_fields_provider.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_fields_provider.py new file mode 100644 index 00000000..7fb2e8e0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_output_fields_provider.py @@ -0,0 +1,50 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import fields + +from azure.ai.ml._schema._utils.data_binding_expression import support_data_binding_expression_for_fields +from azure.ai.ml._schema.core.fields import NestedField, PrimitiveValueField, UnionField +from azure.ai.ml._schema.job.input_output_entry import ( + DataInputSchema, + InputLiteralValueSchema, + MLTableInputSchema, + ModelInputSchema, + OutputSchema, +) + + +def InputsField(*, support_databinding: bool = False, **kwargs): + value_fields = [ + NestedField(DataInputSchema), + NestedField(ModelInputSchema), + NestedField(MLTableInputSchema), + NestedField(InputLiteralValueSchema), + PrimitiveValueField(is_strict=False), + # This ordering of types for the values keyword is intentional. The ordering of types + # determines what order schema values are matched and cast in. Changing the current ordering can + # result in values being mis-cast such as 1.0 translating into True. + ] + + # As is_strict is set to True, 1 and only 1 value field must be matched. + # root level data-binding expression has already been covered by PrimitiveValueField; + # If support_databinding is True, we should only add data-binding expression support for nested fields. + if support_databinding: + for field_obj in value_fields: + if isinstance(field_obj, NestedField): + support_data_binding_expression_for_fields(field_obj.schema) + + return fields.Dict( + keys=fields.Str(), + values=UnionField(value_fields, metadata={"description": "Inputs to a job."}, is_strict=True, **kwargs), + ) + + +def OutputsField(**kwargs): + return fields.Dict( + keys=fields.Str(), + values=NestedField(nested=OutputSchema, allow_none=True), + metadata={"description": "Outputs of a job."}, + **kwargs + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_port.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_port.py new file mode 100644 index 00000000..f37b2a16 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/input_port.py @@ -0,0 +1,29 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging + +from marshmallow import fields, post_load, validate + +from azure.ai.ml.entities import InputPort + +from ..core.schema import PatchedSchemaMeta + +module_logger = logging.getLogger(__name__) + + +class InputPortSchema(metaclass=PatchedSchemaMeta): + type_string = fields.Str( + data_key="type", + validate=validate.OneOf(["path", "number", "null"]), + dump_default="null", + ) + default = fields.Str() + optional = fields.Bool() + + @post_load + def make(self, data, **kwargs): + return InputPort(**data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_limits.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_limits.py new file mode 100644 index 00000000..850e9b3d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_limits.py @@ -0,0 +1,45 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +from marshmallow import fields, post_load, validate + +from azure.ai.ml._schema.core.schema_meta import PatchedSchemaMeta + + +class CommandJobLimitsSchema(metaclass=PatchedSchemaMeta): + timeout = fields.Int() + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.entities import CommandJobLimits + + return CommandJobLimits(**data) + + +class SweepJobLimitsSchema(metaclass=PatchedSchemaMeta): + max_concurrent_trials = fields.Int(metadata={"description": "Sweep Job max concurrent trials."}) + max_total_trials = fields.Int( + metadata={"description": "Sweep Job max total trials."}, + required=True, + ) + timeout = fields.Int( + metadata={"description": "The max run duration in Seconds, after which the job will be cancelled."} + ) + trial_timeout = fields.Int(metadata={"description": "Sweep Job Trial timeout value."}) + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.sweep import SweepJobLimits + + return SweepJobLimits(**data) + + +class DoWhileLimitsSchema(metaclass=PatchedSchemaMeta): + max_iteration_count = fields.Int( + metadata={"description": "The max iteration for do_while loop."}, + validate=validate.Range(min=1, max=1000), + required=True, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_output.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_output.py new file mode 100644 index 00000000..80679119 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/job_output.py @@ -0,0 +1,18 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import logging + +from marshmallow import fields + +from azure.ai.ml._schema.core.fields import ArmStr +from azure.ai.ml._schema.core.schema import PatchedSchemaMeta +from azure.ai.ml.constants._common import AzureMLResourceType + +module_logger = logging.getLogger(__name__) + + +class JobOutputSchema(metaclass=PatchedSchemaMeta): + datastore_id = ArmStr(azureml_type=AzureMLResourceType.DATASTORE) + path = fields.Str() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parallel_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parallel_job.py new file mode 100644 index 00000000..c539e407 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parallel_job.py @@ -0,0 +1,15 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from azure.ai.ml._schema.core.fields import StringTransformedEnum +from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField +from azure.ai.ml.constants import JobType + +from .base_job import BaseJobSchema +from .parameterized_parallel import ParameterizedParallelSchema + + +class ParallelJobSchema(ParameterizedParallelSchema, BaseJobSchema): + type = StringTransformedEnum(allowed_values=JobType.PARALLEL) + inputs = InputsField() + outputs = OutputsField() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_command.py new file mode 100644 index 00000000..1c011bc9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_command.py @@ -0,0 +1,41 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import fields + +from azure.ai.ml._schema.core.fields import ( + CodeField, + DistributionField, + EnvironmentField, + ExperimentalField, + NestedField, +) +from azure.ai.ml._schema.core.schema import PathAwareSchema +from azure.ai.ml._schema.job.input_output_entry import InputLiteralValueSchema +from azure.ai.ml._schema.job_resource_configuration import JobResourceConfigurationSchema +from azure.ai.ml._schema.queue_settings import QueueSettingsSchema + +from ..core.fields import UnionField + + +class ParameterizedCommandSchema(PathAwareSchema): + command = fields.Str( + metadata={ + # pylint: disable=line-too-long + "description": "The command run and the parameters passed. This string may contain place holders of inputs in {}. " + }, + required=True, + ) + code = CodeField() + environment = EnvironmentField(required=True) + environment_variables = UnionField( + [ + fields.Dict(keys=fields.Str(), values=fields.Str()), + # Used for binding environment variables + NestedField(InputLiteralValueSchema), + ] + ) + resources = NestedField(JobResourceConfigurationSchema) + distribution = DistributionField() + queue_settings = ExperimentalField(NestedField(QueueSettingsSchema)) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_parallel.py new file mode 100644 index 00000000..bb5cd063 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_parallel.py @@ -0,0 +1,72 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from marshmallow import INCLUDE, fields + +from azure.ai.ml._schema.component.parallel_task import ComponentParallelTaskSchema +from azure.ai.ml._schema.component.retry_settings import RetrySettingsSchema +from azure.ai.ml._schema.core.fields import DumpableEnumField, NestedField +from azure.ai.ml._schema.core.schema import PathAwareSchema +from azure.ai.ml._schema.job.input_output_entry import InputLiteralValueSchema +from azure.ai.ml._schema.job_resource_configuration import JobResourceConfigurationSchema +from azure.ai.ml.constants._common import LoggingLevel + +from ..core.fields import UnionField + + +class ParameterizedParallelSchema(PathAwareSchema): + logging_level = DumpableEnumField( + allowed_values=[LoggingLevel.DEBUG, LoggingLevel.INFO, LoggingLevel.WARN], + dump_default=LoggingLevel.INFO, + metadata={ + "description": ( + "A string of the logging level name, which is defined in 'logging'. " + "Possible values are 'WARNING', 'INFO', and 'DEBUG'." + ) + }, + ) + task = NestedField(ComponentParallelTaskSchema, unknown=INCLUDE) + mini_batch_size = fields.Str( + metadata={"description": "The batch size of current job."}, + ) + partition_keys = fields.List( + fields.Str(), metadata={"description": "The keys used to partition input data into mini-batches"} + ) + input_data = fields.Str() + resources = NestedField(JobResourceConfigurationSchema) + retry_settings = NestedField(RetrySettingsSchema, unknown=INCLUDE) + max_concurrency_per_instance = fields.Integer( + dump_default=1, + metadata={"description": "The max parallellism that each compute instance has."}, + ) + error_threshold = fields.Integer( + dump_default=-1, + metadata={ + "description": ( + "The number of item processing failures should be ignored. " + "If the error_threshold is reached, the job terminates. " + "For a list of files as inputs, one item means one file reference. " + "This setting doesn't apply to command parallelization." + ) + }, + ) + mini_batch_error_threshold = fields.Integer( + dump_default=-1, + metadata={ + "description": ( + "The number of mini batch processing failures should be ignored. " + "If the mini_batch_error_threshold is reached, the job terminates. " + "For a list of files as inputs, one item means one file reference. " + "This setting can be used by either command or python function parallelization. " + "Only one error_threshold setting can be used in one job." + ) + }, + ) + environment_variables = UnionField( + [ + fields.Dict(keys=fields.Str(), values=fields.Str()), + # Used for binding environment variables + NestedField(InputLiteralValueSchema), + ] + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_spark.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_spark.py new file mode 100644 index 00000000..49e9560a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/parameterized_spark.py @@ -0,0 +1,151 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=unused-argument + +import re +from typing import Any, Dict, List + +from marshmallow import ValidationError, fields, post_dump, post_load, pre_dump, pre_load, validates + +from azure.ai.ml._schema.core.fields import CodeField, EnvironmentField, NestedField +from azure.ai.ml._schema.core.schema import PathAwareSchema +from azure.ai.ml._schema.core.schema_meta import PatchedSchemaMeta + +from ..core.fields import UnionField + +re_memory_pattern = re.compile("^\\d+[kKmMgGtTpP]$") + + +class SparkEntryFileSchema(metaclass=PatchedSchemaMeta): + file = fields.Str(required=True) + # add spark_job_entry_type and make it dump only to align with model definition, + # this will make us get expected value when call spark._from_rest_object() + spark_job_entry_type = fields.Str(dump_only=True) + + @pre_dump + def to_dict(self, data, **kwargs): + return {"file": data.entry} + + +class SparkEntryClassSchema(metaclass=PatchedSchemaMeta): + class_name = fields.Str(required=True) + # add spark_job_entry_type and make it dump only to align with model definition, + # this will make us get expected value when call spark._from_rest_object() + spark_job_entry_type = fields.Str(dump_only=True) + + @pre_dump + def to_dict(self, data, **kwargs): + return {"class_name": data.entry} + + +CONF_KEY_MAP = { + "driver_cores": "spark.driver.cores", + "driver_memory": "spark.driver.memory", + "executor_cores": "spark.executor.cores", + "executor_memory": "spark.executor.memory", + "executor_instances": "spark.executor.instances", + "dynamic_allocation_enabled": "spark.dynamicAllocation.enabled", + "dynamic_allocation_min_executors": "spark.dynamicAllocation.minExecutors", + "dynamic_allocation_max_executors": "spark.dynamicAllocation.maxExecutors", +} + + +def no_duplicates(name: str, value: List): + if len(value) != len(set(value)): + raise ValidationError(f"{name} must not contain duplicate entries.") + + +class ParameterizedSparkSchema(PathAwareSchema): + code = CodeField() + entry = UnionField( + [NestedField(SparkEntryFileSchema), NestedField(SparkEntryClassSchema)], + required=True, + metadata={"description": "Entry."}, + ) + py_files = fields.List(fields.Str(required=True)) + jars = fields.List(fields.Str(required=True)) + files = fields.List(fields.Str(required=True)) + archives = fields.List(fields.Str(required=True)) + conf = fields.Dict(keys=fields.Str(), values=fields.Raw()) + properties = fields.Dict(keys=fields.Str(), values=fields.Raw()) + environment = EnvironmentField(allow_none=True) + args = fields.Str(metadata={"description": "Command Line arguments."}) + + @validates("py_files") + def no_duplicate_py_files(self, value): + no_duplicates("py_files", value) + + @validates("jars") + def no_duplicate_jars(self, value): + no_duplicates("jars", value) + + @validates("files") + def no_duplicate_files(self, value): + no_duplicates("files", value) + + @validates("archives") + def no_duplicate_archives(self, value): + no_duplicates("archives", value) + + @pre_load + # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + def map_conf_field_names(self, data, **kwargs): + """Map the field names in the conf dictionary. + This function must be called after YamlFileSchema.load_from_file. + Given marshmallow executes the pre_load functions in the alphabetical order (marshmallow\\schema.py:L166, + functions will be checked in alphabetical order when inject to cls._hooks), we must make sure the function + name is alphabetically after "load_from_file". + """ + # TODO: it's dangerous to depend on an alphabetical order, we'd better move related logic out of Schema. + conf = data["conf"] if "conf" in data else None + if conf is not None: + for field_key, dict_key in CONF_KEY_MAP.items(): + value = conf.get(dict_key, None) + if dict_key in conf and value is not None: + del conf[dict_key] + conf[field_key] = value + data["conf"] = conf + return data + + @post_dump(pass_original=True) + def serialize_field_names(self, data: Dict[str, Any], original_data: Dict[str, Any], **kwargs): + conf = data["conf"] if "conf" in data else {} + if original_data.conf is not None and conf is not None: + for field_name, value in original_data.conf.items(): + if field_name not in conf: + if isinstance(value, str) and value.isdigit(): + value = int(value) + conf[field_name] = value + if conf is not None: + for field_name, dict_name in CONF_KEY_MAP.items(): + val = conf.get(field_name, None) + if field_name in conf and val is not None: + if isinstance(val, str) and val.isdigit(): + val = int(val) + del conf[field_name] + conf[dict_name] = val + data["conf"] = conf + return data + + @post_load + def demote_conf_fields(self, data, **kwargs): + conf = data["conf"] if "conf" in data else None + if conf is not None: + for field_name, _ in CONF_KEY_MAP.items(): + value = conf.get(field_name, None) + if field_name in conf and value is not None: + del conf[field_name] + data[field_name] = value + return data + + @pre_dump + def promote_conf_fields(self, data: object, **kwargs): + # copy fields from root object into the 'conf' + conf = data.conf or {} + for field_name, _ in CONF_KEY_MAP.items(): + value = data.__getattribute__(field_name) + if value is not None: + conf[field_name] = value + data.__setattr__("conf", conf) + return data diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/services.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/services.py new file mode 100644 index 00000000..f6fed8c2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/services.py @@ -0,0 +1,100 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import logging + +from marshmallow import fields, post_load + +from azure.ai.ml.entities._job.job_service import ( + JobService, + SshJobService, + JupyterLabJobService, + VsCodeJobService, + TensorBoardJobService, +) +from azure.ai.ml.constants._job.job import JobServiceTypeNames +from azure.ai.ml._schema.core.fields import StringTransformedEnum, UnionField + +from ..core.schema import PathAwareSchema + +module_logger = logging.getLogger(__name__) + + +class JobServiceBaseSchema(PathAwareSchema): + port = fields.Int() + endpoint = fields.Str(dump_only=True) + status = fields.Str(dump_only=True) + nodes = fields.Str() + error_message = fields.Str(dump_only=True) + properties = fields.Dict() + + +class JobServiceSchema(JobServiceBaseSchema): + """This is to support tansformation of job services passed as dict type and internal job services like Custom, + Tracking, Studio set by the system.""" + + type = UnionField( + [ + StringTransformedEnum( + allowed_values=JobServiceTypeNames.NAMES_ALLOWED_FOR_PUBLIC, + pass_original=True, + ), + fields.Str(), + ] + ) + + @post_load + def make(self, data, **kwargs): # pylint: disable=unused-argument + data.pop("type", None) + return JobService(**data) + + +class TensorBoardJobServiceSchema(JobServiceBaseSchema): + type = StringTransformedEnum( + allowed_values=JobServiceTypeNames.EntityNames.TENSOR_BOARD, + pass_original=True, + ) + log_dir = fields.Str() + + @post_load + def make(self, data, **kwargs): # pylint: disable=unused-argument + data.pop("type", None) + return TensorBoardJobService(**data) + + +class SshJobServiceSchema(JobServiceBaseSchema): + type = StringTransformedEnum( + allowed_values=JobServiceTypeNames.EntityNames.SSH, + pass_original=True, + ) + ssh_public_keys = fields.Str() + + @post_load + def make(self, data, **kwargs): # pylint: disable=unused-argument + data.pop("type", None) + return SshJobService(**data) + + +class VsCodeJobServiceSchema(JobServiceBaseSchema): + type = StringTransformedEnum( + allowed_values=JobServiceTypeNames.EntityNames.VS_CODE, + pass_original=True, + ) + + @post_load + def make(self, data, **kwargs): # pylint: disable=unused-argument + data.pop("type", None) + return VsCodeJobService(**data) + + +class JupyterLabJobServiceSchema(JobServiceBaseSchema): + type = StringTransformedEnum( + allowed_values=JobServiceTypeNames.EntityNames.JUPYTER_LAB, + pass_original=True, + ) + + @post_load + def make(self, data, **kwargs): # pylint: disable=unused-argument + data.pop("type", None) + return JupyterLabJobService(**data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/spark_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/spark_job.py new file mode 100644 index 00000000..f9363175 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/job/spark_job.py @@ -0,0 +1,28 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from azure.ai.ml._schema.core.fields import NestedField +from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema +from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField +from azure.ai.ml._schema.spark_resource_configuration import SparkResourceConfigurationSchema +from azure.ai.ml.constants import JobType + +from ..core.fields import ComputeField, StringTransformedEnum, UnionField +from .base_job import BaseJobSchema +from .parameterized_spark import ParameterizedSparkSchema + + +class SparkJobSchema(ParameterizedSparkSchema, BaseJobSchema): + type = StringTransformedEnum(required=True, allowed_values=JobType.SPARK) + compute = ComputeField() + inputs = InputsField() + outputs = OutputsField() + resources = NestedField(SparkResourceConfigurationSchema) + identity = UnionField( + [ + NestedField(ManagedIdentitySchema), + NestedField(AMLTokenIdentitySchema), + NestedField(UserIdentitySchema), + ] + ) |
