diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline')
14 files changed, 1560 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/__init__.py new file mode 100644 index 00000000..a19931cd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/__init__.py @@ -0,0 +1,17 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=unused-import +__path__ = __import__("pkgutil").extend_path(__path__, __name__) + +from .component_job import ( + CommandSchema, + ImportSchema, + ParallelSchema, + SparkSchema, + DataTransferCopySchema, + DataTransferImportSchema, + DataTransferExportSchema, +) +from .pipeline_job import PipelineJobSchema +from .settings import PipelineJobSettingsSchema diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/automl_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/automl_node.py new file mode 100644 index 00000000..4b815db7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/automl_node.py @@ -0,0 +1,148 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument,protected-access +from typing import List + +from marshmallow import fields, post_dump, post_load, pre_dump + +from azure.ai.ml._schema._utils.data_binding_expression import support_data_binding_expression_for_fields +from azure.ai.ml._schema.automl import AutoMLClassificationSchema, AutoMLForecastingSchema, AutoMLRegressionSchema +from azure.ai.ml._schema.automl.image_vertical.image_classification import ( + ImageClassificationMultilabelSchema, + ImageClassificationSchema, +) +from azure.ai.ml._schema.automl.image_vertical.image_object_detection import ( + ImageInstanceSegmentationSchema, + ImageObjectDetectionSchema, +) +from azure.ai.ml._schema.automl.nlp_vertical.text_classification import TextClassificationSchema +from azure.ai.ml._schema.automl.nlp_vertical.text_classification_multilabel import TextClassificationMultilabelSchema +from azure.ai.ml._schema.automl.nlp_vertical.text_ner import TextNerSchema +from azure.ai.ml._schema.core.fields import ComputeField, NestedField, UnionField +from azure.ai.ml._schema.core.schema import PathAwareSchema +from azure.ai.ml._schema.job.input_output_entry import MLTableInputSchema, OutputSchema +from azure.ai.ml._schema.pipeline.pipeline_job_io import OutputBindingStr + + +class AutoMLNodeMixin(PathAwareSchema): + """Inherit this mixin to change automl job schemas to automl node schema. + + eg: Compute is required for automl job but not required for automl node in pipeline. + Note: Inherit this before BaseJobSchema to make sure optional takes affect. + """ + + def __init__(self, **kwargs): + super(AutoMLNodeMixin, self).__init__(**kwargs) + # update field objects and add data binding support, won't bind task & type as data binding + support_data_binding_expression_for_fields(self, attrs_to_skip=["task_type", "type"]) + + compute = ComputeField(required=False) + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField([NestedField(OutputSchema), OutputBindingStr], allow_none=True), + ) + + @pre_dump + def resolve_outputs(self, job: "AutoMLJob", **kwargs): + # Try resolve object's inputs & outputs and return a resolved new object + import copy + + result = copy.copy(job) + result._outputs = job._build_outputs() + return result + + @post_dump(pass_original=True) + # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + def resolve_nested_data(self, job_dict: dict, job: "AutoMLJob", **kwargs): + """Resolve nested data into flatten format.""" + from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob + + if not isinstance(job, AutoMLJob): + return job_dict + # change output to rest output dicts + job_dict["outputs"] = job._to_rest_outputs() + return job_dict + + @post_load + def make(self, data, **kwargs): + data["task"] = data.pop("task_type") + return data + + +class AutoMLClassificationNodeSchema(AutoMLNodeMixin, AutoMLClassificationSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + test_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class AutoMLRegressionNodeSchema(AutoMLNodeMixin, AutoMLRegressionSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + test_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class AutoMLForecastingNodeSchema(AutoMLNodeMixin, AutoMLForecastingSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + test_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class AutoMLTextClassificationNode(AutoMLNodeMixin, TextClassificationSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class AutoMLTextClassificationMultilabelNode(AutoMLNodeMixin, TextClassificationMultilabelSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class AutoMLTextNerNode(AutoMLNodeMixin, TextNerSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class ImageClassificationMulticlassNodeSchema(AutoMLNodeMixin, ImageClassificationSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class ImageClassificationMultilabelNodeSchema(AutoMLNodeMixin, ImageClassificationMultilabelSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class ImageObjectDetectionNodeSchema(AutoMLNodeMixin, ImageObjectDetectionSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +class ImageInstanceSegmentationNodeSchema(AutoMLNodeMixin, ImageInstanceSegmentationSchema): + training_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + validation_data = UnionField([fields.Str(), NestedField(MLTableInputSchema)]) + + +def AutoMLNodeSchema(**kwargs) -> List[fields.Field]: + """Get the list of all nested schema for all AutoML nodes. + + :return: The list of fields + :rtype: List[fields.Field] + """ + return [ + # region: automl node schemas + NestedField(AutoMLClassificationNodeSchema, **kwargs), + NestedField(AutoMLRegressionNodeSchema, **kwargs), + NestedField(AutoMLForecastingNodeSchema, **kwargs), + # Vision + NestedField(ImageClassificationMulticlassNodeSchema, **kwargs), + NestedField(ImageClassificationMultilabelNodeSchema, **kwargs), + NestedField(ImageObjectDetectionNodeSchema, **kwargs), + NestedField(ImageInstanceSegmentationNodeSchema, **kwargs), + # NLP + NestedField(AutoMLTextClassificationNode, **kwargs), + NestedField(AutoMLTextClassificationMultilabelNode, **kwargs), + NestedField(AutoMLTextNerNode, **kwargs), + # endregion + ] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/component_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/component_job.py new file mode 100644 index 00000000..8f179479 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/component_job.py @@ -0,0 +1,554 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import logging + +from marshmallow import INCLUDE, ValidationError, fields, post_dump, post_load, pre_dump, validates + +from ..._schema.component import ( + AnonymousCommandComponentSchema, + AnonymousDataTransferCopyComponentSchema, + AnonymousImportComponentSchema, + AnonymousParallelComponentSchema, + AnonymousSparkComponentSchema, + ComponentFileRefField, + ComponentYamlRefField, + DataTransferCopyComponentFileRefField, + ImportComponentFileRefField, + ParallelComponentFileRefField, + SparkComponentFileRefField, +) +from ..._utils.utils import is_data_binding_expression +from ...constants._common import AzureMLResourceType +from ...constants._component import DataTransferTaskType, NodeType +from ...entities._inputs_outputs import Input +from ...entities._job.pipeline._attr_dict import _AttrDict +from ...exceptions import ValidationException +from .._sweep.parameterized_sweep import ParameterizedSweepSchema +from .._utils.data_binding_expression import support_data_binding_expression_for_fields +from ..component.flow import FlowComponentSchema +from ..core.fields import ( + ArmVersionedStr, + ComputeField, + EnvironmentField, + NestedField, + RegistryStr, + StringTransformedEnum, + TypeSensitiveUnionField, + UnionField, +) +from ..core.schema import PathAwareSchema +from ..job import ParameterizedCommandSchema, ParameterizedParallelSchema, ParameterizedSparkSchema +from ..job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema +from ..job.input_output_entry import DatabaseSchema, FileSystemSchema, OutputSchema +from ..job.input_output_fields_provider import InputsField +from ..job.job_limits import CommandJobLimitsSchema +from ..job.parameterized_spark import SparkEntryClassSchema, SparkEntryFileSchema +from ..job.services import ( + JobServiceSchema, + JupyterLabJobServiceSchema, + SshJobServiceSchema, + TensorBoardJobServiceSchema, + VsCodeJobServiceSchema, +) +from ..pipeline.pipeline_job_io import OutputBindingStr +from ..spark_resource_configuration import SparkResourceConfigurationForNodeSchema + +module_logger = logging.getLogger(__name__) + + +# do inherit PathAwareSchema to support relative path & default partial load (allow None value if not specified) +class BaseNodeSchema(PathAwareSchema): + """Base schema for all node schemas.""" + + unknown = INCLUDE + + inputs = InputsField(support_databinding=True) + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField([OutputBindingStr, NestedField(OutputSchema)], allow_none=True), + ) + properties = fields.Dict(keys=fields.Str(), values=fields.Str(allow_none=True)) + comment = fields.Str() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # data binding expression is not supported inside component field, while validation error + # message will be very long when component is an object as error message will include + # str(component), so just add component to skip list. The same to trial in Sweep. + support_data_binding_expression_for_fields(self, ["type", "component", "trial", "inputs"]) + + @post_dump(pass_original=True) + # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + def add_user_setting_attr_dict(self, data, original_data, **kwargs): # pylint: disable=unused-argument + """Support serializing unknown fields for pipeline node.""" + if isinstance(original_data, _AttrDict): + user_setting_attr_dict = original_data._get_attrs() + # TODO: dump _AttrDict values to serializable data like dict instead of original object + # skip fields that are already serialized + for key, value in user_setting_attr_dict.items(): + if key not in data: + data[key] = value + return data + + # an alternative would be set schema property to be load_only, but sub-schemas like CommandSchema usually also + # inherit from other schema classes which also have schema property. Set post dump here would be more efficient. + @post_dump() + def remove_meaningless_key_for_node( + self, + data, + **kwargs, # pylint: disable=unused-argument + ): + data.pop("$schema", None) + return data + + +def _delete_type_for_binding(io): + for key in io: + if isinstance(io[key], Input) and io[key].path and is_data_binding_expression(io[key].path): + io[key].type = None + + +def _resolve_inputs(result, original_job): + result._inputs = original_job._build_inputs() + # delete type for literal binding input + _delete_type_for_binding(result._inputs) + + +def _resolve_outputs(result, original_job): + result._outputs = original_job._build_outputs() + # delete type for literal binding output + _delete_type_for_binding(result._outputs) + + +def _resolve_inputs_outputs(job): + # Try resolve object's inputs & outputs and return a resolved new object + import copy + + result = copy.copy(job) + _resolve_inputs(result, job) + _resolve_outputs(result, job) + + return result + + +class CommandSchema(BaseNodeSchema, ParameterizedCommandSchema): + """Schema for Command.""" + + # pylint: disable=unused-argument + component = TypeSensitiveUnionField( + { + NodeType.COMMAND: [ + # inline component or component file reference starting with FILE prefix + NestedField(AnonymousCommandComponentSchema, unknown=INCLUDE), + # component file reference + ComponentFileRefField(), + ], + }, + plain_union_fields=[ + # for registry type assets + RegistryStr(), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + ], + required=True, + ) + # code is directly linked to component.code, so no need to validate or dump it + code = fields.Str(allow_none=True, load_only=True) + type = StringTransformedEnum(allowed_values=[NodeType.COMMAND]) + compute = ComputeField() + # do not promote it as CommandComponent has no field named 'limits' + limits = NestedField(CommandJobLimitsSchema) + # Change required fields to optional + command = fields.Str( + metadata={ + "description": "The command run and the parameters passed. \ + This string may contain place holders of inputs in {}. " + }, + load_only=True, + ) + environment = EnvironmentField() + services = fields.Dict( + keys=fields.Str(), + values=UnionField( + [ + NestedField(SshJobServiceSchema), + NestedField(JupyterLabJobServiceSchema), + NestedField(TensorBoardJobServiceSchema), + NestedField(VsCodeJobServiceSchema), + # JobServiceSchema should be the last in the list. + # To support types not set by users like Custom, Tracking, Studio. + NestedField(JobServiceSchema), + ], + is_strict=True, + ), + ) + identity = UnionField( + [ + NestedField(ManagedIdentitySchema), + NestedField(AMLTokenIdentitySchema), + NestedField(UserIdentitySchema), + ] + ) + + @post_load + def make(self, data, **kwargs) -> "Command": + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.command_func import command + + # parse inputs/outputs + data = parse_inputs_outputs(data) + try: + command_node = command(**data) + except ValidationException as e: + # It may raise ValidationError during initialization, command._validate_io e.g. raise ValidationError + # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate + raise ValidationError(e.message) from e + return command_node + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): + return _resolve_inputs_outputs(job) + + +class SweepSchema(BaseNodeSchema, ParameterizedSweepSchema): + """Schema for Sweep.""" + + # pylint: disable=unused-argument + type = StringTransformedEnum(allowed_values=[NodeType.SWEEP]) + compute = ComputeField() + trial = TypeSensitiveUnionField( + { + NodeType.SWEEP: [ + # inline component or component file reference starting with FILE prefix + NestedField(AnonymousCommandComponentSchema, unknown=INCLUDE), + # component file reference + ComponentFileRefField(), + ], + }, + plain_union_fields=[ + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + ], + required=True, + ) + + @post_load + def make(self, data, **kwargs) -> "Sweep": + from azure.ai.ml.entities._builders import Sweep, parse_inputs_outputs + + # parse inputs/outputs + data = parse_inputs_outputs(data) + return Sweep(**data) + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): + return _resolve_inputs_outputs(job) + + +class ParallelSchema(BaseNodeSchema, ParameterizedParallelSchema): + """ + Schema for Parallel. + """ + + # pylint: disable=unused-argument + compute = ComputeField() + component = TypeSensitiveUnionField( + { + NodeType.PARALLEL: [ + # inline component or component file reference starting with FILE prefix + NestedField(AnonymousParallelComponentSchema, unknown=INCLUDE), + # component file reference + ParallelComponentFileRefField(), + ], + NodeType.FLOW_PARALLEL: [ + NestedField(FlowComponentSchema, unknown=INCLUDE, dump_only=True), + ComponentYamlRefField(), + ], + }, + plain_union_fields=[ + # for registry type assets + RegistryStr(), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + ], + required=True, + ) + identity = UnionField( + [ + NestedField(ManagedIdentitySchema), + NestedField(AMLTokenIdentitySchema), + NestedField(UserIdentitySchema), + ] + ) + type = StringTransformedEnum(allowed_values=[NodeType.PARALLEL]) + + @post_load + def make(self, data, **kwargs) -> "Parallel": + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.parallel_func import parallel_run_function + + data = parse_inputs_outputs(data) + parallel_node = parallel_run_function(**data) + return parallel_node + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): + return _resolve_inputs_outputs(job) + + +class ImportSchema(BaseNodeSchema): + """ + Schema for Import. + """ + + # pylint: disable=unused-argument + component = TypeSensitiveUnionField( + { + NodeType.IMPORT: [ + # inline component or component file reference starting with FILE prefix + NestedField(AnonymousImportComponentSchema, unknown=INCLUDE), + # component file reference + ImportComponentFileRefField(), + ], + }, + plain_union_fields=[ + # for registry type assets + RegistryStr(), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + ], + required=True, + ) + type = StringTransformedEnum(allowed_values=[NodeType.IMPORT]) + + @post_load + def make(self, data, **kwargs) -> "Import": + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.import_func import import_job + + # parse inputs/outputs + data = parse_inputs_outputs(data) + import_node = import_job(**data) + return import_node + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): + return _resolve_inputs_outputs(job) + + +class SparkSchema(BaseNodeSchema, ParameterizedSparkSchema): + """ + Schema for Spark. + """ + + # pylint: disable=unused-argument + component = TypeSensitiveUnionField( + { + NodeType.SPARK: [ + # inline component or component file reference starting with FILE prefix + NestedField(AnonymousSparkComponentSchema, unknown=INCLUDE), + # component file reference + SparkComponentFileRefField(), + ], + }, + plain_union_fields=[ + # for registry type assets + RegistryStr(), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + ], + required=True, + ) + type = StringTransformedEnum(allowed_values=[NodeType.SPARK]) + compute = ComputeField() + resources = NestedField(SparkResourceConfigurationForNodeSchema) + entry = UnionField( + [NestedField(SparkEntryFileSchema), NestedField(SparkEntryClassSchema)], + metadata={"description": "Entry."}, + ) + py_files = fields.List(fields.Str()) + jars = fields.List(fields.Str()) + files = fields.List(fields.Str()) + archives = fields.List(fields.Str()) + identity = UnionField( + [ + NestedField(ManagedIdentitySchema), + NestedField(AMLTokenIdentitySchema), + NestedField(UserIdentitySchema), + ] + ) + + # code is directly linked to component.code, so no need to validate or dump it + code = fields.Str(allow_none=True, load_only=True) + + @post_load + def make(self, data, **kwargs) -> "Spark": + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.spark_func import spark + + # parse inputs/outputs + data = parse_inputs_outputs(data) + try: + spark_node = spark(**data) + except ValidationException as e: + # It may raise ValidationError during initialization, command._validate_io e.g. raise ValidationError + # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate + raise ValidationError(e.message) from e + return spark_node + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): + return _resolve_inputs_outputs(job) + + +class DataTransferCopySchema(BaseNodeSchema): + """ + Schema for DataTransferCopy. + """ + + # pylint: disable=unused-argument + component = TypeSensitiveUnionField( + { + NodeType.DATA_TRANSFER: [ + # inline component or component file reference starting with FILE prefix + NestedField(AnonymousDataTransferCopyComponentSchema, unknown=INCLUDE), + # component file reference + DataTransferCopyComponentFileRefField(), + ], + }, + plain_union_fields=[ + # for registry type assets + RegistryStr(), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + ], + required=True, + ) + task = StringTransformedEnum(allowed_values=[DataTransferTaskType.COPY_DATA], required=True) + type = StringTransformedEnum(allowed_values=[NodeType.DATA_TRANSFER], required=True) + compute = ComputeField() + + @post_load + def make(self, data, **kwargs) -> "DataTransferCopy": + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.data_transfer_func import copy_data + + # parse inputs/outputs + data = parse_inputs_outputs(data) + try: + data_transfer_node = copy_data(**data) + except ValidationException as e: + # It may raise ValidationError during initialization, data_transfer._validate_io e.g. raise ValidationError + # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate + raise ValidationError(e.message) from e + return data_transfer_node + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): + return _resolve_inputs_outputs(job) + + +class DataTransferImportSchema(BaseNodeSchema): + # pylint: disable=unused-argument + component = UnionField( + [ + # for registry type assets + RegistryStr(), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + ], + required=True, + ) + task = StringTransformedEnum(allowed_values=[DataTransferTaskType.IMPORT_DATA], required=True) + type = StringTransformedEnum(allowed_values=[NodeType.DATA_TRANSFER], required=True) + compute = ComputeField() + source = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False) + outputs = fields.Dict( + keys=fields.Str(), values=UnionField([OutputBindingStr, NestedField(OutputSchema)]), allow_none=False + ) + + @validates("inputs") + def inputs_key(self, value): + raise ValidationError(f"inputs field is not a valid filed in task type " f"{DataTransferTaskType.IMPORT_DATA}.") + + @validates("outputs") + def outputs_key(self, value): + if len(value) != 1 or list(value.keys())[0] != "sink": + raise ValidationError( + f"outputs field only support one output called sink in task type " + f"{DataTransferTaskType.IMPORT_DATA}." + ) + + @post_load + def make(self, data, **kwargs) -> "DataTransferImport": + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.data_transfer_func import import_data + + # parse inputs/outputs + data = parse_inputs_outputs(data) + try: + data_transfer_node = import_data(**data) + except ValidationException as e: + # It may raise ValidationError during initialization, data_transfer._validate_io e.g. raise ValidationError + # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate + raise ValidationError(e.message) from e + return data_transfer_node + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): + return _resolve_inputs_outputs(job) + + +class DataTransferExportSchema(BaseNodeSchema): + # pylint: disable=unused-argument + component = UnionField( + [ + # for registry type assets + RegistryStr(), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + ], + required=True, + ) + task = StringTransformedEnum(allowed_values=[DataTransferTaskType.EXPORT_DATA]) + type = StringTransformedEnum(allowed_values=[NodeType.DATA_TRANSFER]) + compute = ComputeField() + inputs = InputsField(support_databinding=True, allow_none=False) + sink = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False) + + @validates("inputs") + def inputs_key(self, value): + if len(value) != 1 or list(value.keys())[0] != "source": + raise ValidationError( + f"inputs field only support one input called source in task type " + f"{DataTransferTaskType.EXPORT_DATA}." + ) + + @validates("outputs") + def outputs_key(self, value): + raise ValidationError( + f"outputs field is not a valid filed in task type " f"{DataTransferTaskType.EXPORT_DATA}." + ) + + @post_load + def make(self, data, **kwargs) -> "DataTransferExport": + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.data_transfer_func import export_data + + # parse inputs/outputs + data = parse_inputs_outputs(data) + try: + data_transfer_node = export_data(**data) + except ValidationException as e: + # It may raise ValidationError during initialization, data_transfer._validate_io e.g. raise ValidationError + # instead in marshmallow function, so it won't break SchemaValidatable._schema_validate + raise ValidationError(e.message) from e + return data_transfer_node + + @pre_dump + def resolve_inputs_outputs(self, job, **kwargs): + return _resolve_inputs_outputs(job) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/condition_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/condition_node.py new file mode 100644 index 00000000..a1d2901c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/condition_node.py @@ -0,0 +1,48 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from marshmallow import fields, post_dump, ValidationError + +from azure.ai.ml._schema import StringTransformedEnum +from azure.ai.ml._schema.core.fields import DataBindingStr, NodeBindingStr, UnionField +from azure.ai.ml._schema.pipeline.control_flow_job import ControlFlowSchema +from azure.ai.ml.constants._component import ControlFlowType + + +# ConditionNodeSchema did not inherit from BaseNodeSchema since it doesn't have inputs/outputs like other nodes. +class ConditionNodeSchema(ControlFlowSchema): + type = StringTransformedEnum(allowed_values=[ControlFlowType.IF_ELSE]) + condition = UnionField([DataBindingStr(), fields.Bool()]) + true_block = UnionField([NodeBindingStr(), fields.List(NodeBindingStr())]) + false_block = UnionField([NodeBindingStr(), fields.List(NodeBindingStr())]) + + @post_dump + def simplify_blocks(self, data, **kwargs): # pylint: disable=unused-argument + # simplify true_block and false_block to single node if there is only one node in the list + # this is to make sure the request to backend won't change after we support list true/false blocks + block_keys = ["true_block", "false_block"] + for block in block_keys: + if isinstance(data.get(block), list) and len(data.get(block)) == 1: + data[block] = data.get(block)[0] + + # validate blocks intersection + def _normalize_blocks(key): + blocks = data.get(key, []) + if blocks: + if not isinstance(blocks, list): + blocks = [blocks] + else: + blocks = [] + return blocks + + true_block = _normalize_blocks("true_block") + false_block = _normalize_blocks("false_block") + + if not true_block and not false_block: + raise ValidationError("True block and false block cannot be empty at the same time.") + + intersection = set(true_block).intersection(set(false_block)) + if intersection: + raise ValidationError(f"True block and false block cannot contain same nodes: {intersection}") + + return data diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/control_flow_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/control_flow_job.py new file mode 100644 index 00000000..3d1e3e4a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/control_flow_job.py @@ -0,0 +1,147 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import copy +import json + +from marshmallow import INCLUDE, fields, pre_dump, pre_load + +from azure.ai.ml._schema.core.fields import DataBindingStr, NestedField, StringTransformedEnum, UnionField +from azure.ai.ml._schema.core.schema import PathAwareSchema +from azure.ai.ml.constants._component import ControlFlowType + +from ..job.input_output_entry import OutputSchema +from ..job.input_output_fields_provider import InputsField +from ..job.job_limits import DoWhileLimitsSchema +from .component_job import _resolve_outputs +from .pipeline_job_io import OutputBindingStr + +# pylint: disable=protected-access + + +class ControlFlowSchema(PathAwareSchema): + unknown = INCLUDE + + +class BaseLoopSchema(ControlFlowSchema): + unknown = INCLUDE + body = DataBindingStr() + + @pre_dump + def convert_control_flow_body_to_binding_str(self, data, **kwargs): # pylint: disable= unused-argument + result = copy.copy(data) + # Update body object to data_binding_str + result._body = data._get_body_binding_str() + return result + + +class DoWhileSchema(BaseLoopSchema): + # pylint: disable=unused-argument + type = StringTransformedEnum(allowed_values=[ControlFlowType.DO_WHILE]) + condition = UnionField( + [ + DataBindingStr(), + fields.Str(), + ] + ) + mapping = fields.Dict( + keys=fields.Str(), + values=UnionField( + [ + fields.List(fields.Str()), + fields.Str(), + ] + ), + required=True, + ) + limits = NestedField(DoWhileLimitsSchema, required=True) + + @pre_dump + def resolve_inputs_outputs(self, data, **kwargs): + # Try resolve object's mapping and condition and return a resolved new object + result = copy.copy(data) + mapping = {} + for k, v in result.mapping.items(): + v = v if isinstance(v, list) else [v] + mapping[k] = [item._port_name for item in v] + result._mapping = mapping + + try: + result._condition = result._condition._port_name + except AttributeError: + result._condition = result._condition + + return result + + @pre_dump + def convert_control_flow_body_to_binding_str(self, data, **kwargs): + return super(DoWhileSchema, self).convert_control_flow_body_to_binding_str(data, **kwargs) + + +class ParallelForSchema(BaseLoopSchema): + type = StringTransformedEnum(allowed_values=[ControlFlowType.PARALLEL_FOR]) + items = UnionField( + [ + fields.Dict(keys=fields.Str(), values=InputsField()), + fields.List(InputsField()), + # put str in last to make sure other type items won't become string when dumps. + # TODO: only support binding here + fields.Str(), + ], + required=True, + ) + max_concurrency = fields.Int() + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField([OutputBindingStr, NestedField(OutputSchema)], allow_none=True), + ) + + @pre_load + def load_items(self, data, **kwargs): # pylint: disable= unused-argument + # load items from json to convert the assets in it to rest + try: + items = data["items"] + if isinstance(items, str): + items = json.loads(items) + data["items"] = items + except Exception: # pylint: disable=W0718 + pass + return data + + @pre_dump + def convert_control_flow_body_to_binding_str(self, data, **kwargs): + return super(ParallelForSchema, self).convert_control_flow_body_to_binding_str(data, **kwargs) + + @pre_dump + def resolve_outputs(self, job, **kwargs): # pylint: disable=unused-argument + result = copy.copy(job) + _resolve_outputs(result, job) + return result + + @pre_dump + def serialize_items(self, data, **kwargs): # pylint: disable= unused-argument + # serialize items to json string to avoid being removed by _dump_for_validation + from azure.ai.ml.entities._job.pipeline._io import InputOutputBase + + def _binding_handler(obj): + if isinstance(obj, InputOutputBase): + return str(obj) + return repr(obj) + + result = copy.copy(data) + if isinstance(result.items, (dict, list)): + # use str to serialize input/output builder + result._items = json.dumps(result.items, default=_binding_handler) + return result + + +class FLScatterGatherSchema(ControlFlowSchema): + # TODO determine serialization, or if this is actually needed + + # @pre_dump + def serialize_items(self, data, **kwargs): + pass + + # @pre_dump + def resolve_outputs(self, job, **kwargs): + pass diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_command_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_command_job.py new file mode 100644 index 00000000..c2b96f85 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_command_job.py @@ -0,0 +1,31 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging +from typing import Any + +from marshmallow import fields, post_load + +from azure.ai.ml._schema.core.fields import ComputeField, EnvironmentField, NestedField, UnionField +from azure.ai.ml._schema.job.command_job import CommandJobSchema +from azure.ai.ml._schema.job.input_output_entry import OutputSchema + +module_logger = logging.getLogger(__name__) + + +class PipelineCommandJobSchema(CommandJobSchema): + compute = ComputeField() + environment = EnvironmentField() + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField([NestedField(OutputSchema), fields.Str()], allow_none=True), + ) + + @post_load + def make(self, data: Any, **kwargs: Any): + from azure.ai.ml.entities import CommandJob + + return CommandJob(**data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_component.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_component.py new file mode 100644 index 00000000..05096e99 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_component.py @@ -0,0 +1,297 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access +from copy import deepcopy + +import yaml +from marshmallow import INCLUDE, fields, post_load, pre_dump + +from azure.ai.ml._schema._utils.utils import _resolve_group_inputs_for_component +from azure.ai.ml._schema.assets.asset import AnonymousAssetSchema +from azure.ai.ml._schema.component.component import ComponentSchema +from azure.ai.ml._schema.component.input_output import OutputPortSchema, PrimitiveOutputSchema +from azure.ai.ml._schema.core.fields import ( + ArmVersionedStr, + FileRefField, + NestedField, + PipelineNodeNameStr, + RegistryStr, + StringTransformedEnum, + TypeSensitiveUnionField, + UnionField, +) +from azure.ai.ml._schema.pipeline.automl_node import AutoMLNodeSchema +from azure.ai.ml._schema.pipeline.component_job import ( + BaseNodeSchema, + CommandSchema, + DataTransferCopySchema, + DataTransferExportSchema, + DataTransferImportSchema, + ImportSchema, + ParallelSchema, + SparkSchema, + SweepSchema, + _resolve_inputs_outputs, +) +from azure.ai.ml._schema.pipeline.condition_node import ConditionNodeSchema +from azure.ai.ml._schema.pipeline.control_flow_job import DoWhileSchema, ParallelForSchema +from azure.ai.ml._schema.pipeline.pipeline_command_job import PipelineCommandJobSchema +from azure.ai.ml._schema.pipeline.pipeline_datatransfer_job import ( + PipelineDataTransferCopyJobSchema, + PipelineDataTransferExportJobSchema, + PipelineDataTransferImportJobSchema, +) +from azure.ai.ml._schema.pipeline.pipeline_import_job import PipelineImportJobSchema +from azure.ai.ml._schema.pipeline.pipeline_parallel_job import PipelineParallelJobSchema +from azure.ai.ml._schema.pipeline.pipeline_spark_job import PipelineSparkJobSchema +from azure.ai.ml._utils.utils import is_private_preview_enabled +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AzureMLResourceType +from azure.ai.ml.constants._component import ( + CONTROL_FLOW_TYPES, + ComponentSource, + ControlFlowType, + DataTransferTaskType, + NodeType, +) + + +class NodeNameStr(PipelineNodeNameStr): + def _get_field_name(self) -> str: + return "Pipeline node" + + +def PipelineJobsField(): + pipeline_enable_job_type = { + NodeType.COMMAND: [ + NestedField(CommandSchema, unknown=INCLUDE), + NestedField(PipelineCommandJobSchema), + ], + NodeType.IMPORT: [ + NestedField(ImportSchema, unknown=INCLUDE), + NestedField(PipelineImportJobSchema), + ], + NodeType.SWEEP: [NestedField(SweepSchema, unknown=INCLUDE)], + NodeType.PARALLEL: [ + # ParallelSchema support parallel pipeline yml with "component" + NestedField(ParallelSchema, unknown=INCLUDE), + NestedField(PipelineParallelJobSchema, unknown=INCLUDE), + ], + NodeType.PIPELINE: [NestedField("PipelineSchema", unknown=INCLUDE)], + NodeType.AUTOML: AutoMLNodeSchema(unknown=INCLUDE), + NodeType.SPARK: [ + NestedField(SparkSchema, unknown=INCLUDE), + NestedField(PipelineSparkJobSchema), + ], + } + + # Note: the private node types only available when private preview flag opened before init of pipeline job + # schema class. + if is_private_preview_enabled(): + pipeline_enable_job_type[ControlFlowType.DO_WHILE] = [NestedField(DoWhileSchema, unknown=INCLUDE)] + pipeline_enable_job_type[ControlFlowType.IF_ELSE] = [NestedField(ConditionNodeSchema, unknown=INCLUDE)] + pipeline_enable_job_type[ControlFlowType.PARALLEL_FOR] = [NestedField(ParallelForSchema, unknown=INCLUDE)] + + # Todo: Put data_transfer logic to the last to avoid error message conflict, open a item to track: + # https://msdata.visualstudio.com/Vienna/_workitems/edit/2244262/ + pipeline_enable_job_type[NodeType.DATA_TRANSFER] = [ + TypeSensitiveUnionField( + { + DataTransferTaskType.COPY_DATA: [ + NestedField(DataTransferCopySchema, unknown=INCLUDE), + NestedField(PipelineDataTransferCopyJobSchema), + ], + DataTransferTaskType.IMPORT_DATA: [ + NestedField(DataTransferImportSchema, unknown=INCLUDE), + NestedField(PipelineDataTransferImportJobSchema), + ], + DataTransferTaskType.EXPORT_DATA: [ + NestedField(DataTransferExportSchema, unknown=INCLUDE), + NestedField(PipelineDataTransferExportJobSchema), + ], + }, + type_field_name="task", + unknown=INCLUDE, + ) + ] + + pipeline_job_field = fields.Dict( + keys=NodeNameStr(), + values=TypeSensitiveUnionField(pipeline_enable_job_type), + ) + return pipeline_job_field + + +# pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype +def _post_load_pipeline_jobs(context, data: dict) -> dict: + """Silently convert Job in pipeline jobs to node.""" + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.condition_node import ConditionNode + from azure.ai.ml.entities._builders.do_while import DoWhile + from azure.ai.ml.entities._builders.parallel_for import ParallelFor + from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob + from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin + + # parse inputs/outputs + data = parse_inputs_outputs(data) + # convert JobNode to Component here + jobs = data.get("jobs", {}) + + for key, job_instance in jobs.items(): + if isinstance(job_instance, dict): + # convert AutoML job dict to instance + if job_instance.get("type") == NodeType.AUTOML: + job_instance = AutoMLJob._create_instance_from_schema_dict( + loaded_data=job_instance, + ) + elif job_instance.get("type") in CONTROL_FLOW_TYPES: + # Set source to yaml job for control flow node. + job_instance["_source"] = ComponentSource.YAML_JOB + + job_type = job_instance.get("type") + if job_type == ControlFlowType.IF_ELSE: + # Convert to if-else node. + job_instance = ConditionNode._create_instance_from_schema_dict(loaded_data=job_instance) + elif job_instance.get("type") == ControlFlowType.DO_WHILE: + # Convert to do-while node. + job_instance = DoWhile._create_instance_from_schema_dict( + pipeline_jobs=jobs, loaded_data=job_instance + ) + elif job_instance.get("type") == ControlFlowType.PARALLEL_FOR: + # Convert to do-while node. + job_instance = ParallelFor._create_instance_from_schema_dict( + pipeline_jobs=jobs, loaded_data=job_instance + ) + jobs[key] = job_instance + + for key, job_instance in jobs.items(): + # Translate job to node if translatable and overrides to_node. + if isinstance(job_instance, ComponentTranslatableMixin) and "_to_node" in type(job_instance).__dict__: + # set source as YAML + job_instance = job_instance._to_node( + context=context, + pipeline_job_dict=data, + ) + if job_instance.type == NodeType.DATA_TRANSFER and job_instance.task != DataTransferTaskType.COPY_DATA: + job_instance._source = ComponentSource.BUILTIN + else: + job_instance.component._source = ComponentSource.YAML_JOB + job_instance._source = job_instance.component._source + jobs[key] = job_instance + # update job instance name to key + job_instance.name = key + return data + + +class PipelineComponentSchema(ComponentSchema): + type = StringTransformedEnum(allowed_values=[NodeType.PIPELINE]) + jobs = PipelineJobsField() + + # primitive output is only supported for command component & pipeline component + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField( + [ + NestedField(PrimitiveOutputSchema, unknown=INCLUDE), + NestedField(OutputPortSchema), + ] + ), + ) + + @post_load + def make(self, data, **kwargs): # pylint: disable=unused-argument + return _post_load_pipeline_jobs(self.context, data) + + +class RestPipelineComponentSchema(PipelineComponentSchema): + """When component load from rest, won't validate on name since there might + be existing component with invalid name.""" + + name = fields.Str(required=True) + + +class _AnonymousPipelineComponentSchema(AnonymousAssetSchema, PipelineComponentSchema): + """Anonymous pipeline component schema. + + Note that do not support inline define anonymous pipeline component + directly. Inheritance follows order: AnonymousAssetSchema, + PipelineComponentSchema because we need name and version to be + dump_only(marshmallow collects fields follows method resolution + order). + """ + + @post_load + def make(self, data, **kwargs): + from azure.ai.ml.entities._component.pipeline_component import PipelineComponent + + # pipeline jobs post process is required before init of pipeline component: it converts control node dict + # to entity. + # however @post_load invocation order is not guaranteed, so we need to call it explicitly here. + _post_load_pipeline_jobs(self.context, data) + + return PipelineComponent( + base_path=self.context[BASE_PATH_CONTEXT_KEY], + **data, + ) + + +class PipelineComponentFileRefField(FileRefField): + # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + def _serialize(self, value, attr, obj, **kwargs): + """FileRefField does not support serialize. + + Call AnonymousPipelineComponent schema to serialize. This + function is overwrite because we need Pipeline can be dumped. + """ + # Update base_path to parent path of component file. + component_schema_context = deepcopy(self.context) + value = _resolve_group_inputs_for_component(value) + return _AnonymousPipelineComponentSchema(context=component_schema_context)._serialize(value, **kwargs) + + def _deserialize(self, value, attr, data, **kwargs): + # Get component info from component yaml file. + data = super()._deserialize(value, attr, data, **kwargs) + component_dict = yaml.safe_load(data) + source_path = self.context[BASE_PATH_CONTEXT_KEY] / value + + # Update base_path to parent path of component file. + component_schema_context = deepcopy(self.context) + component_schema_context[BASE_PATH_CONTEXT_KEY] = source_path.parent + component = _AnonymousPipelineComponentSchema(context=component_schema_context).load( + component_dict, unknown=INCLUDE + ) + component._source_path = source_path + component._source = ComponentSource.YAML_COMPONENT + return component + + +# Note: PipelineSchema is defined here instead of component_job.py is to +# resolve circular import and support recursive schema. +class PipelineSchema(BaseNodeSchema): + # pylint: disable=unused-argument + # do not support inline define a pipeline node + component = UnionField( + [ + # for registry type assets + RegistryStr(azureml_type=AzureMLResourceType.COMPONENT), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + # component file reference + PipelineComponentFileRefField(), + ], + required=True, + ) + type = StringTransformedEnum(allowed_values=[NodeType.PIPELINE]) + + @post_load + def make(self, data, **kwargs) -> "Pipeline": + from azure.ai.ml.entities._builders import parse_inputs_outputs + from azure.ai.ml.entities._builders.pipeline import Pipeline + + data = parse_inputs_outputs(data) + return Pipeline(**data) + + @pre_dump + def resolve_inputs_outputs(self, data, **kwargs): + return _resolve_inputs_outputs(data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_datatransfer_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_datatransfer_job.py new file mode 100644 index 00000000..a63e687d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_datatransfer_job.py @@ -0,0 +1,55 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging +from typing import Any + +from marshmallow import fields, post_load + +from azure.ai.ml._schema.core.fields import NestedField, UnionField +from azure.ai.ml._schema.job.input_output_entry import OutputSchema +from azure.ai.ml._schema.pipeline.pipeline_job_io import OutputBindingStr +from azure.ai.ml._schema.job.data_transfer_job import ( + DataTransferCopyJobSchema, + DataTransferImportJobSchema, + DataTransferExportJobSchema, +) + +module_logger = logging.getLogger(__name__) + + +class PipelineDataTransferCopyJobSchema(DataTransferCopyJobSchema): + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField([NestedField(OutputSchema), OutputBindingStr], allow_none=True), + ) + + @post_load + def make(self, data: Any, **kwargs: Any): + from azure.ai.ml.entities._job.data_transfer.data_transfer_job import DataTransferCopyJob + + return DataTransferCopyJob(**data) + + +class PipelineDataTransferImportJobSchema(DataTransferImportJobSchema): + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField([NestedField(OutputSchema), OutputBindingStr], allow_none=True), + ) + + @post_load + def make(self, data: Any, **kwargs: Any): + from azure.ai.ml.entities._job.data_transfer.data_transfer_job import DataTransferImportJob + + return DataTransferImportJob(**data) + + +class PipelineDataTransferExportJobSchema(DataTransferExportJobSchema): + @post_load + def make(self, data: Any, **kwargs: Any): + from azure.ai.ml.entities._job.data_transfer.data_transfer_job import DataTransferExportJob + + return DataTransferExportJob(**data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_import_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_import_job.py new file mode 100644 index 00000000..ae338597 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_import_job.py @@ -0,0 +1,25 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging +from typing import Any + +from marshmallow import post_load + +from azure.ai.ml._schema.job.import_job import ImportJobSchema + +module_logger = logging.getLogger(__name__) + + +class PipelineImportJobSchema(ImportJobSchema): + class Meta: + exclude = ["compute"] # compute property not applicable to import job + + @post_load + def make(self, data: Any, **kwargs: Any): + from azure.ai.ml.entities._job.import_job import ImportJob + + return ImportJob(**data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job.py new file mode 100644 index 00000000..46daeb92 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job.py @@ -0,0 +1,76 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging + +from marshmallow import INCLUDE, ValidationError, post_load, pre_dump, pre_load + +from azure.ai.ml._schema.core.fields import ( + ArmVersionedStr, + ComputeField, + NestedField, + RegistryStr, + StringTransformedEnum, + UnionField, +) +from azure.ai.ml._schema.job import BaseJobSchema +from azure.ai.ml._schema.job.input_output_fields_provider import InputsField, OutputsField +from azure.ai.ml._schema.pipeline.component_job import _resolve_inputs_outputs +from azure.ai.ml._schema.pipeline.pipeline_component import ( + PipelineComponentFileRefField, + PipelineJobsField, + _post_load_pipeline_jobs, +) +from azure.ai.ml._schema.pipeline.settings import PipelineJobSettingsSchema +from azure.ai.ml.constants import JobType +from azure.ai.ml.constants._common import AzureMLResourceType + +module_logger = logging.getLogger(__name__) + + +class PipelineJobSchema(BaseJobSchema): + type = StringTransformedEnum(allowed_values=[JobType.PIPELINE]) + compute = ComputeField() + settings = NestedField(PipelineJobSettingsSchema, unknown=INCLUDE) + # Support databinding in inputs as we support macro like ${{name}} + inputs = InputsField(support_databinding=True) + outputs = OutputsField() + jobs = PipelineJobsField() + component = UnionField( + [ + # for registry type assets + RegistryStr(azureml_type=AzureMLResourceType.COMPONENT), + # existing component + ArmVersionedStr(azureml_type=AzureMLResourceType.COMPONENT, allow_default_version=True), + # component file reference + PipelineComponentFileRefField(), + ], + ) + + @pre_dump() + def backup_jobs_and_remove_component(self, job, **kwargs): + # pylint: disable=protected-access + job_copy = _resolve_inputs_outputs(job) + if not isinstance(job_copy.component, str): + # If component is pipeline component object, + # copy jobs to job and remove component. + if not job_copy._jobs: + job_copy._jobs = job_copy.component.jobs + job_copy.component = None + return job_copy + + @pre_load() + def check_exclusive_fields(self, data: dict, **kwargs) -> dict: + error_msg = "'jobs' and 'component' are mutually exclusive fields in pipeline job." + # When loading from yaml, data["component"] must be a local path (str) + # Otherwise, data["component"] can be a PipelineComponent so data["jobs"] must exist + if isinstance(data.get("component"), str) and data.get("jobs"): + raise ValidationError(error_msg) + return data + + @post_load + def make(self, data: dict, **kwargs) -> dict: + return _post_load_pipeline_jobs(self.context, data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job_io.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job_io.py new file mode 100644 index 00000000..3fb6a7b7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_job_io.py @@ -0,0 +1,51 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import logging +import re + +from marshmallow import ValidationError, fields + +from azure.ai.ml.constants._component import ComponentJobConstants + +module_logger = logging.getLogger(__name__) + + +class OutputBindingStr(fields.Field): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def _jsonschema_type_mapping(self): + schema = {"type": "string", "pattern": ComponentJobConstants.OUTPUT_PATTERN} + if self.name is not None: + schema["title"] = self.name + if self.dump_only: + schema["readonly"] = True + return schema + + def _serialize(self, value, attr, obj, **kwargs): + if isinstance(value, str) and re.match(ComponentJobConstants.OUTPUT_PATTERN, value): + return value + # _to_job_output in io.py will return Output + # add this branch to judge whether original value is a simple binding or Output + if ( + isinstance(value.path, str) + and re.match(ComponentJobConstants.OUTPUT_PATTERN, value.path) + and value.mode is None + ): + return value.path + raise ValidationError(f"Invalid output binding string '{value}' passed") + + def _deserialize(self, value, attr, data, **kwargs): + if ( + isinstance(value, dict) + and "path" in value + and "mode" not in value + and "name" not in value + and "version" not in value + ): + value = value["path"] + if isinstance(value, str) and re.match(ComponentJobConstants.OUTPUT_PATTERN, value): + return value + raise ValidationError(f"Invalid output binding string '{value}' passed") diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_parallel_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_parallel_job.py new file mode 100644 index 00000000..3b30fb66 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_parallel_job.py @@ -0,0 +1,40 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging +from typing import Any + +from marshmallow import post_load + +from azure.ai.ml._schema.core.fields import ComputeField, EnvironmentField, StringTransformedEnum +from azure.ai.ml._schema.job import ParameterizedParallelSchema +from azure.ai.ml._schema.pipeline.component_job import BaseNodeSchema + +from ...constants._component import NodeType + +module_logger = logging.getLogger(__name__) + + +# parallel job inherits parallel attributes from ParameterizedParallelSchema and node functionality from BaseNodeSchema +class PipelineParallelJobSchema(BaseNodeSchema, ParameterizedParallelSchema): + """Schema for ParallelJob in PipelineJob/PipelineComponent.""" + + type = StringTransformedEnum(allowed_values=NodeType.PARALLEL) + compute = ComputeField() + environment = EnvironmentField() + + @post_load + def make(self, data: Any, **kwargs: Any): + """Construct a ParallelJob from deserialized data. + + :param data: The deserialized data. + :type data: dict[str, Any] + :return: A ParallelJob. + :rtype: azure.ai.ml.entities._job.parallel.ParallelJob + """ + from azure.ai.ml.entities._job.parallel.parallel_job import ParallelJob + + return ParallelJob(**data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_spark_job.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_spark_job.py new file mode 100644 index 00000000..69d58255 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/pipeline_spark_job.py @@ -0,0 +1,29 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +import logging +from typing import Any + +from marshmallow import fields, post_load + +from azure.ai.ml._schema.core.fields import NestedField, UnionField +from azure.ai.ml._schema.job.input_output_entry import OutputSchema +from azure.ai.ml._schema.job.spark_job import SparkJobSchema + +module_logger = logging.getLogger(__name__) + + +class PipelineSparkJobSchema(SparkJobSchema): + outputs = fields.Dict( + keys=fields.Str(), + values=UnionField([NestedField(OutputSchema), fields.Str()], allow_none=True), + ) + + @post_load + def make(self, data: Any, **kwargs: Any): + from azure.ai.ml.entities._job.spark_job import SparkJob + + return SparkJob(**data) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/settings.py new file mode 100644 index 00000000..1e5227b0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_schema/pipeline/settings.py @@ -0,0 +1,42 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=unused-argument + +from marshmallow import INCLUDE, Schema, fields, post_dump, post_load + +from azure.ai.ml._schema.core.fields import ArmStr, StringTransformedEnum, UnionField +from azure.ai.ml._schema.pipeline.pipeline_component import NodeNameStr +from azure.ai.ml._utils.utils import is_private_preview_enabled +from azure.ai.ml.constants._common import AzureMLResourceType, SERVERLESS_COMPUTE + + +class PipelineJobSettingsSchema(Schema): + class Meta: + unknown = INCLUDE + + default_datastore = ArmStr(azureml_type=AzureMLResourceType.DATASTORE) + default_compute = UnionField( + [ + StringTransformedEnum(allowed_values=[SERVERLESS_COMPUTE]), + ArmStr(azureml_type=AzureMLResourceType.COMPUTE), + ] + ) + continue_on_step_failure = fields.Bool() + force_rerun = fields.Bool() + + # move init/finalize under private preview flag to hide them in spec + if is_private_preview_enabled(): + on_init = NodeNameStr() + on_finalize = NodeNameStr() + + @post_load + def make(self, data, **kwargs) -> "PipelineJobSettings": + from azure.ai.ml.entities import PipelineJobSettings + + return PipelineJobSettings(**data) + + @post_dump + def remove_none(self, data, **kwargs): + return {key: value for key, value in data.items() if value is not None} |
