diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py')
| -rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py | 575 |
1 files changed, 575 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py new file mode 100644 index 00000000..83e88a48 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py @@ -0,0 +1,575 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import logging +from typing import Any, Dict, List, Optional, Tuple, Union, cast + +from marshmallow import Schema + +from azure.ai.ml._restclient.v2022_10_01_preview.models import JobBase +from azure.ai.ml._schema.job.data_transfer_job import ( + DataTransferCopyJobSchema, + DataTransferExportJobSchema, + DataTransferImportJobSchema, +) +from azure.ai.ml._utils._experimental import experimental +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AssetTypes +from azure.ai.ml.constants._component import DataTransferTaskType, ExternalDataType, NodeType +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._component.datatransfer_component import ( + DataTransferComponent, + DataTransferCopyComponent, + DataTransferExportComponent, + DataTransferImportComponent, +) +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem +from azure.ai.ml.entities._job.data_transfer.data_transfer_job import ( + DataTransferCopyJob, + DataTransferExportJob, + DataTransferImportJob, +) +from azure.ai.ml.entities._validation.core import MutableValidationResult +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException + +from ..._schema import PathAwareSchema +from .._job.pipeline._io import NodeOutput +from .._util import convert_ordered_dict_to_dict, load_from_dict, validate_attribute_type +from .base_node import BaseNode + +module_logger = logging.getLogger(__name__) + + +def _build_source_sink(io_dict: Optional[Union[Dict, Database, FileSystem]]) -> Optional[Union[Database, FileSystem]]: + if io_dict is None: + return io_dict + if isinstance(io_dict, (Database, FileSystem)): + component_io = io_dict + else: + if isinstance(io_dict, dict): + data_type = io_dict.pop("type", None) + if data_type == ExternalDataType.DATABASE: + component_io = Database(**io_dict) + elif data_type == ExternalDataType.FILE_SYSTEM: + component_io = FileSystem(**io_dict) + else: + msg = "Type in source or sink only support {} and {}, currently got {}." + raise ValidationException( + message=msg.format( + ExternalDataType.DATABASE, + ExternalDataType.FILE_SYSTEM, + data_type, + ), + no_personal_data_message=msg.format( + ExternalDataType.DATABASE, + ExternalDataType.FILE_SYSTEM, + "data_type", + ), + target=ErrorTarget.DATA_TRANSFER_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + else: + msg = "Source or sink only support dict, Database and FileSystem" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.DATA_TRANSFER_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + return component_io + + +class DataTransfer(BaseNode): + """Base class for data transfer node, used for data transfer component version consumption. + + You should not instantiate this class directly. + """ + + def __init__( + self, + *, + component: Union[str, DataTransferCopyComponent, DataTransferImportComponent], + compute: Optional[str] = None, + inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None, + outputs: Optional[Dict[str, Union[str, Output]]] = None, + **kwargs: Any, + ): + # resolve normal dict to dict[str, JobService] + kwargs.pop("type", None) + super().__init__( + type=NodeType.DATA_TRANSFER, + inputs=inputs, + outputs=outputs, + component=component, + compute=compute, + **kwargs, + ) + + @property + def component(self) -> Union[str, DataTransferComponent]: + res: Union[str, DataTransferComponent] = self._component + return res + + @classmethod + def _load_from_rest_job(cls, obj: JobBase) -> "DataTransfer": + # Todo: need update rest api + raise NotImplementedError("Not support submit standalone job for now") + + @classmethod + def _get_supported_outputs_types(cls) -> Tuple: + return str, Output + + def _build_inputs(self) -> Dict: + inputs = super(DataTransfer, self)._build_inputs() + built_inputs = {} + # Validate and remove non-specified inputs + for key, value in inputs.items(): + if value is not None: + built_inputs[key] = value + + return built_inputs + + +@experimental +class DataTransferCopy(DataTransfer): + """Base class for data transfer copy node. + + You should not instantiate this class directly. Instead, you should + create from builder function: copy_data. + + :param component: Id or instance of the data transfer component/job to be run for the step + :type component: DataTransferCopyComponent + :param inputs: Inputs to the data transfer. + :type inputs: Dict[str, Union[NodeOutput, Input, str]] + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Dict[str, Union[str, Output, dict]] + :param name: Name of the data transfer. + :type name: str + :param description: Description of the data transfer. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict[str, str] + :param display_name: Display name of the job. + :type display_name: str + :param experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + :type experiment_name: str + :param compute: The compute target the job runs on. + :type compute: str + :param data_copy_mode: data copy mode in copy task, possible value is "merge_with_overwrite", "fail_if_conflict". + :type data_copy_mode: str + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferCopy cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + component: Union[str, DataTransferCopyComponent], + compute: Optional[str] = None, + inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None, + outputs: Optional[Dict[str, Union[str, Output]]] = None, + data_copy_mode: Optional[str] = None, + **kwargs: Any, + ): + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + super().__init__( + inputs=inputs, + outputs=outputs, + component=component, + compute=compute, + **kwargs, + ) + # init mark for _AttrDict + self._init = True + self.task = DataTransferTaskType.COPY_DATA + self.data_copy_mode = data_copy_mode + is_component = isinstance(component, DataTransferCopyComponent) + if is_component: + _component: DataTransferCopyComponent = cast(DataTransferCopyComponent, component) + self.task = _component.task or self.task + self.data_copy_mode = _component.data_copy_mode or self.data_copy_mode + self._init = False + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, DataTransferCopyComponent), + } + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import DataTransferCopySchema + + return DataTransferCopySchema(context=context) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["type", "task", "data_copy_mode"] + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + for key, value in { + "componentId": self._get_component_id(), + "data_copy_mode": self.data_copy_mode, + }.items(): + if value is not None: + rest_obj[key] = value + return cast(dict, convert_ordered_dict_to_dict(rest_obj)) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> Any: + from .data_transfer_func import copy_data + + loaded_data = load_from_dict(DataTransferCopyJobSchema, data, context, additional_message, **kwargs) + data_transfer_job = copy_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + return data_transfer_job + + def _to_job(self) -> DataTransferCopyJob: + return DataTransferCopyJob( + experiment_name=self.experiment_name, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + status=self.status, + inputs=self._job_inputs, + outputs=self._job_outputs, + services=self.services, + compute=self.compute, + data_copy_mode=self.data_copy_mode, + ) + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> "DataTransferCopy": + """Call DataTransferCopy as a function will return a new instance each time. + + :return: A DataTransferCopy node + :rtype: DataTransferCopy + """ + if isinstance(self._component, Component): + # call this to validate inputs + node: DataTransferCopy = self._component(*args, **kwargs) + # merge inputs + for name, original_input in self.inputs.items(): + if name not in kwargs: + # use setattr here to make sure owner of input won't change + setattr(node.inputs, name, original_input._data) + node._job_inputs[name] = original_input._data + # get outputs + for name, original_output in self.outputs.items(): + # use setattr here to make sure owner of input won't change + if not isinstance(original_output, str): + setattr(node.outputs, name, original_output._data) + self._refine_optional_inputs_with_no_value(node, kwargs) + # set default values: compute, environment_variables, outputs + node._name = self.name + node.compute = self.compute + node.tags = self.tags + # Pass through the display name only if the display name is not system generated. + node.display_name = self.display_name if self.display_name != self.name else None + return node + msg = "copy_data can be called as a function only when referenced component is {}, currently got {}." + raise ValidationException( + message=msg.format(type(Component), self._component), + no_personal_data_message=msg.format(type(Component), "self._component"), + target=ErrorTarget.DATA_TRANSFER_JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + +@experimental +class DataTransferImport(DataTransfer): + """Base class for data transfer import node. + + You should not instantiate this class directly. Instead, you should + create from builder function: import_data. + + :param component: Id of the data transfer built in component to be run for the step + :type component: str + :param source: The data source of file system or database + :type source: Union[Dict, Database, FileSystem] + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Dict[str, Union[str, Output, dict]] + :param name: Name of the data transfer. + :type name: str + :param description: Description of the data transfer. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict[str, str] + :param display_name: Display name of the job. + :type display_name: str + :param experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + :type experiment_name: str + :param compute: The compute target the job runs on. + :type compute: str + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferImport cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + component: Union[str, DataTransferImportComponent], + compute: Optional[str] = None, + source: Optional[Union[Dict, Database, FileSystem]] = None, + outputs: Optional[Dict[str, Union[str, Output]]] = None, + **kwargs: Any, + ): + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + super(DataTransferImport, self).__init__( + component=component, + outputs=outputs, + compute=compute, + **kwargs, + ) + # init mark for _AttrDict + self._init = True + self.task = DataTransferTaskType.IMPORT_DATA + is_component = isinstance(component, DataTransferImportComponent) + if is_component: + _component: DataTransferImportComponent = cast(DataTransferImportComponent, component) + self.task = _component.task or self.task + self.source = _build_source_sink(source) + self._init = False + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, DataTransferImportComponent), + } + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import DataTransferImportSchema + + return DataTransferImportSchema(context=context) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["type", "task", "source"] + + def _customized_validate(self) -> MutableValidationResult: + result = super()._customized_validate() + if self.source is None: + result.append_error( + yaml_path="source", + message="Source is a required field for import data task in DataTransfer job", + ) + if len(self.outputs) != 1 or list(self.outputs.keys())[0] != "sink": + result.append_error( + yaml_path="outputs.sink", + message="Outputs field only support one output called sink in import task", + ) + if ( + "sink" in self.outputs + and not isinstance(self.outputs["sink"], str) + and isinstance(self.outputs["sink"]._data, Output) + ): + sink_output = self.outputs["sink"]._data + if self.source is not None: + + if (self.source.type == ExternalDataType.DATABASE and sink_output.type != AssetTypes.MLTABLE) or ( + self.source.type == ExternalDataType.FILE_SYSTEM and sink_output.type != AssetTypes.URI_FOLDER + ): + result.append_error( + yaml_path="outputs.sink.type", + message="Outputs field only support type {} for {} and {} for {}".format( + AssetTypes.MLTABLE, + ExternalDataType.DATABASE, + AssetTypes.URI_FOLDER, + ExternalDataType.FILE_SYSTEM, + ), + ) + return result + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + for key, value in { + "componentId": self._get_component_id(), + }.items(): + if value is not None: + rest_obj[key] = value + return cast(dict, convert_ordered_dict_to_dict(rest_obj)) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DataTransferImport": + from .data_transfer_func import import_data + + loaded_data = load_from_dict(DataTransferImportJobSchema, data, context, additional_message, **kwargs) + data_transfer_job: DataTransferImport = import_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + return data_transfer_job + + def _to_job(self) -> DataTransferImportJob: + return DataTransferImportJob( + experiment_name=self.experiment_name, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + status=self.status, + source=self.source, + outputs=self._job_outputs, + services=self.services, + compute=self.compute, + ) + + +@experimental +class DataTransferExport(DataTransfer): + """Base class for data transfer export node. + + You should not instantiate this class directly. Instead, you should + create from builder function: export_data. + + :param component: Id of the data transfer built in component to be run for the step + :type component: str + :param sink: The sink of external data and databases. + :type sink: Union[Dict, Database, FileSystem] + :param inputs: Mapping of input data bindings used in the job. + :type inputs: Dict[str, Union[NodeOutput, Input, str, Input]] + :param name: Name of the data transfer. + :type name: str + :param description: Description of the data transfer. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict[str, str] + :param display_name: Display name of the job. + :type display_name: str + :param experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + :type experiment_name: str + :param compute: The compute target the job runs on. + :type compute: str + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferExport cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + component: Union[str, DataTransferCopyComponent, DataTransferImportComponent], + compute: Optional[str] = None, + sink: Optional[Union[Dict, Database, FileSystem]] = None, + inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None, + **kwargs: Any, + ): + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + super(DataTransferExport, self).__init__( + component=component, + inputs=inputs, + compute=compute, + **kwargs, + ) + # init mark for _AttrDict + self._init = True + self.task = DataTransferTaskType.EXPORT_DATA + is_component = isinstance(component, DataTransferExportComponent) + if is_component: + _component: DataTransferExportComponent = cast(DataTransferExportComponent, component) + self.task = _component.task or self.task + self.sink = sink + self._init = False + + @property + def sink(self) -> Optional[Union[Dict, Database, FileSystem]]: + """The sink of external data and databases. + + :return: The sink of external data and databases. + :rtype: Union[None, Database, FileSystem] + """ + return self._sink + + @sink.setter + def sink(self, value: Union[Dict, Database, FileSystem]) -> None: + self._sink = _build_source_sink(value) + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, DataTransferExportComponent), + } + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import DataTransferExportSchema + + return DataTransferExportSchema(context=context) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["type", "task", "sink"] + + def _customized_validate(self) -> MutableValidationResult: + result = super()._customized_validate() + if self.sink is None: + result.append_error( + yaml_path="sink", + message="Sink is a required field for export data task in DataTransfer job", + ) + if len(self.inputs) != 1 or list(self.inputs.keys())[0] != "source": + result.append_error( + yaml_path="inputs.source", + message="Inputs field only support one input called source in export task", + ) + if "source" in self.inputs and isinstance(self.inputs["source"]._data, Input): + source_input = self.inputs["source"]._data + if self.sink is not None and not isinstance(self.sink, Dict): + if (self.sink.type == ExternalDataType.DATABASE and source_input.type != AssetTypes.URI_FILE) or ( + self.sink.type == ExternalDataType.FILE_SYSTEM and source_input.type != AssetTypes.URI_FOLDER + ): + result.append_error( + yaml_path="inputs.source.type", + message="Inputs field only support type {} for {} and {} for {}".format( + AssetTypes.URI_FILE, + ExternalDataType.DATABASE, + AssetTypes.URI_FOLDER, + ExternalDataType.FILE_SYSTEM, + ), + ) + + return result + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + for key, value in { + "componentId": self._get_component_id(), + }.items(): + if value is not None: + rest_obj[key] = value + return cast(dict, convert_ordered_dict_to_dict(rest_obj)) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DataTransferExport": + from .data_transfer_func import export_data + + loaded_data = load_from_dict(DataTransferExportJobSchema, data, context, additional_message, **kwargs) + data_transfer_job: DataTransferExport = export_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + return data_transfer_job + + def _to_job(self) -> DataTransferExportJob: + return DataTransferExportJob( + experiment_name=self.experiment_name, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + status=self.status, + sink=self.sink, + inputs=self._job_inputs, + services=self.services, + compute=self.compute, + ) |
