diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders')
20 files changed, 7599 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/__init__.py new file mode 100644 index 00000000..95dfca0a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/__init__.py @@ -0,0 +1,28 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from .base_node import BaseNode, parse_inputs_outputs +from .command import Command +from .do_while import DoWhile +from .import_node import Import +from .parallel import Parallel +from .pipeline import Pipeline +from .spark import Spark +from .sweep import Sweep +from .data_transfer import DataTransfer, DataTransferCopy, DataTransferImport, DataTransferExport + +__all__ = [ + "BaseNode", + "Sweep", + "Parallel", + "Command", + "Import", + "Spark", + "Pipeline", + "parse_inputs_outputs", + "DoWhile", + "DataTransfer", + "DataTransferCopy", + "DataTransferImport", + "DataTransferExport", +] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/base_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/base_node.py new file mode 100644 index 00000000..98eba6a5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/base_node.py @@ -0,0 +1,568 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access + +import logging +import os +import uuid +from abc import abstractmethod +from enum import Enum +from functools import wraps +from typing import Any, Dict, List, Optional, Union + +from azure.ai.ml._utils._arm_id_utils import get_resource_name_from_arm_id_safe +from azure.ai.ml.constants import JobType +from azure.ai.ml.constants._common import CommonYamlFields +from azure.ai.ml.constants._component import NodeType +from azure.ai.ml.entities import Data, Model +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job._input_output_helpers import build_input_output +from azure.ai.ml.entities._job.job import Job +from azure.ai.ml.entities._job.pipeline._attr_dict import _AttrDict +from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput +from azure.ai.ml.entities._job.pipeline._io.mixin import NodeWithGroupInputMixin +from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression +from azure.ai.ml.entities._job.sweep.search_space import SweepDistribution +from azure.ai.ml.entities._mixins import YamlTranslatableMixin +from azure.ai.ml.entities._util import convert_ordered_dict_to_dict, resolve_pipeline_parameters +from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin +from azure.ai.ml.exceptions import ErrorTarget, ValidationException + +module_logger = logging.getLogger(__name__) + + +def parse_inputs_outputs(data: dict) -> dict: + """Parse inputs and outputs from data. If data is a list, parse each item in the list. + + :param data: A dict that may contain "inputs" or "outputs" keys + :type data: dict + :return: Dict with parsed "inputs" and "outputs" keys + :rtype: Dict + """ + + if "inputs" in data: + data["inputs"] = {key: build_input_output(val) for key, val in data["inputs"].items()} + if "outputs" in data: + data["outputs"] = {key: build_input_output(val, inputs=False) for key, val in data["outputs"].items()} + return data + + +def pipeline_node_decorator(func: Any) -> Any: + """Wrap a function and add its return value to the current DSL pipeline. + + :param func: The function to be wrapped. + :type func: callable + :return: The wrapped function. + :rtype: callable + """ + + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + automl_job = func(*args, **kwargs) + from azure.ai.ml.dsl._pipeline_component_builder import ( + _add_component_to_current_definition_builder, + _is_inside_dsl_pipeline_func, + ) + + if _is_inside_dsl_pipeline_func(): + # Build automl job to automl node if it's defined inside DSL pipeline func. + automl_job._instance_id = str(uuid.uuid4()) + _add_component_to_current_definition_builder(automl_job) + return automl_job + + return wrapper + + +# pylint: disable=too-many-instance-attributes +class BaseNode(Job, YamlTranslatableMixin, _AttrDict, PathAwareSchemaValidatableMixin, NodeWithGroupInputMixin): + """Base class for node in pipeline, used for component version consumption. Can't be instantiated directly. + + You should not instantiate this class directly. Instead, you should + create from a builder function. + + :param type: Type of pipeline node. Defaults to JobType.COMPONENT. + :type type: str + :param component: Id or instance of the component version to be run for the step + :type component: Component + :param inputs: The inputs for the node. + :type inputs: Optional[Dict[str, Union[ + ~azure.ai.ml.entities._job.pipeline._io.PipelineInput, + ~azure.ai.ml.entities._job.pipeline._io.NodeOutput, + ~azure.ai.ml.entities.Input, + str, + bool, + int, + float, + Enum, + 'Input']]] + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Optional[Dict[str, Union[str, ~azure.ai.ml.entities.Output, 'Output']]] + :param name: The name of the node. + :type name: Optional[str] + :param display_name: The display name of the node. + :type display_name: Optional[str] + :param description: The description of the node. + :type description: Optional[str] + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: Optional[Dict] + :param properties: The properties of the job. + :type properties: Optional[Dict] + :param comment: Comment of the pipeline node, which will be shown in designer canvas. + :type comment: Optional[str] + :param compute: Compute definition containing the compute information for the step. + :type compute: Optional[str] + :param experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + Will be ignored as a pipeline step. + :type experiment_name: Optional[str] + :param kwargs: Additional keyword arguments for future compatibility. + """ + + def __init__( + self, + *, + type: str = JobType.COMPONENT, # pylint: disable=redefined-builtin + component: Any, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + name: Optional[str] = None, + display_name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + properties: Optional[Dict] = None, + comment: Optional[str] = None, + compute: Optional[str] = None, + experiment_name: Optional[str] = None, + **kwargs: Any, + ) -> None: + self._init = True + # property _source can't be set + source = kwargs.pop("_source", None) + _from_component_func = kwargs.pop("_from_component_func", False) + self._name: Optional[str] = None + super(BaseNode, self).__init__( + type=type, + name=name, + display_name=display_name, + description=description, + tags=tags, + properties=properties, + compute=compute, + experiment_name=experiment_name, + **kwargs, + ) + self.comment = comment + + # initialize io + inputs = resolve_pipeline_parameters(inputs) + inputs, outputs = inputs or {}, outputs or {} + # parse empty dict to None so we won't pass default mode, type to backend + # add `isinstance` to avoid converting to expression + for k, v in inputs.items(): + if isinstance(v, dict) and v == {}: + inputs[k] = None + + # TODO: get rid of self._job_inputs, self._job_outputs once we have unified Input + self._job_inputs, self._job_outputs = inputs, outputs + if isinstance(component, Component): + # Build the inputs from component input definition and given inputs, unfilled inputs will be None + self._inputs = self._build_inputs_dict(inputs or {}, input_definition_dict=component.inputs) + # Build the outputs from component output definition and given outputs, unfilled outputs will be None + self._outputs = self._build_outputs_dict(outputs or {}, output_definition_dict=component.outputs) + else: + # Build inputs/outputs dict without meta when definition not available + self._inputs = self._build_inputs_dict(inputs or {}) + self._outputs = self._build_outputs_dict(outputs or {}) + + self._component = component + self._referenced_control_flow_node_instance_id: Optional[str] = None + self.kwargs = kwargs + + # Generate an id for every instance + self._instance_id = str(uuid.uuid4()) + if _from_component_func: + # add current component in pipeline stack for dsl scenario + self._register_in_current_pipeline_component_builder() + + if source is None: + if isinstance(component, Component): + source = self._component._source + else: + source = Component._resolve_component_source_from_id(id=self._component) + self._source = source + self._validate_required_input_not_provided = True + self._init = False + + @property + def name(self) -> Optional[str]: + """Get the name of the node. + + :return: The name of the node. + :rtype: str + """ + return self._name + + @name.setter + def name(self, value: str) -> None: + """Set the name of the node. + + :param value: The name to set for the node. + :type value: str + :return: None + """ + # when name is not lower case, lower it to make sure it's a valid node name + if value and value != value.lower(): + module_logger.warning( + "Changing node name %s to lower case: %s since upper case is not allowed node name.", + value, + value.lower(), + ) + value = value.lower() + self._name = value + + @classmethod + def _get_supported_inputs_types(cls) -> Any: + """Get the supported input types for node input. + + :param cls: The class (or instance) to retrieve supported input types for. + :type cls: object + + :return: A tuple of supported input types. + :rtype: tuple + """ + # supported input types for node input + return ( + PipelineInput, + NodeOutput, + Input, + Data, + Model, + str, + bool, + int, + float, + Enum, + PipelineExpression, + ) + + @property + def _skip_required_compute_missing_validation(self) -> bool: + return False + + def _initializing(self) -> bool: + # use this to indicate ongoing init process so all attributes set during init process won't be set as + # arbitrary attribute in _AttrDict + # TODO: replace this hack + return self._init + + def _set_base_path(self, base_path: Optional[Union[str, os.PathLike]]) -> None: + """Set the base path for the node. + + Will be used for schema validation. If not set, will use Path.cwd() as the base path + (default logic defined in SchemaValidatableMixin._base_path_for_validation). + + :param base_path: The new base path + :type base_path: Union[str, os.PathLike] + """ + self._base_path = base_path + + def _set_referenced_control_flow_node_instance_id(self, instance_id: str) -> None: + """Set the referenced control flow node instance id. + + If this node is referenced to a control flow node, the instance_id will not be modified. + + :param instance_id: The new instance id + :type instance_id: str + """ + if not self._referenced_control_flow_node_instance_id: + self._referenced_control_flow_node_instance_id = instance_id + + def _get_component_id(self) -> Union[str, Component]: + """Return component id if possible. + + :return: The component id + :rtype: Union[str, Component] + """ + if isinstance(self._component, Component) and self._component.id: + # If component is remote, return it's asset id + return self._component.id + # Otherwise, return the component version or arm id. + res: Union[str, Component] = self._component + return res + + def _get_component_name(self) -> Optional[str]: + # first use component version/job's display name or name as component name + # make it unique when pipeline build finished. + if self._component is None: + return None + if isinstance(self._component, str): + return self._component + return str(self._component.name) + + def _to_dict(self) -> Dict: + return dict(convert_ordered_dict_to_dict(self._dump_for_validation())) + + @classmethod + def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException: + return ValidationException( + message=message, + no_personal_data_message=no_personal_data_message, + target=ErrorTarget.PIPELINE, + ) + + def _validate_inputs(self) -> MutableValidationResult: + validation_result = self._create_empty_validation_result() + if self._validate_required_input_not_provided: + # validate required inputs not provided + if isinstance(self._component, Component): + for key, meta in self._component.inputs.items(): + # raise error when required input with no default value not set + if ( + not self._is_input_set(input_name=key) # input not provided + and meta.optional is not True # and it's required + and meta.default is None # and it does not have default + ): + validation_result.append_error( + yaml_path=f"inputs.{key}", + message=f"Required input {key!r} for component {self.name!r} not provided.", + ) + + inputs = self._build_inputs() + for input_name, input_obj in inputs.items(): + if isinstance(input_obj, SweepDistribution): + validation_result.append_error( + yaml_path=f"inputs.{input_name}", + message=f"Input of command {self.name} is a SweepDistribution, " + f"please use command.sweep to transform the command into a sweep node.", + ) + return validation_result + + def _customized_validate(self) -> MutableValidationResult: + """Validate the resource with customized logic. + + Override this method to add customized validation logic. + + :return: The validation result + :rtype: MutableValidationResult + """ + validate_result = self._validate_inputs() + return validate_result + + @classmethod + def _get_skip_fields_in_schema_validation(cls) -> List[str]: + return [ + "inputs", # processed separately + "outputs", # processed separately + "name", + "display_name", + "experiment_name", # name is not part of schema but may be set in dsl/yml file + "kwargs", + ] + + @classmethod + def _get_component_attr_name(cls) -> str: + return "component" + + @abstractmethod + def _to_job(self) -> Job: + """This private function is used by the CLI to get a plain job object + so that the CLI can properly serialize the object. + + It is needed as BaseNode._to_dict() dumps objects using pipeline child job schema instead of standalone job + schema, for example Command objects dump have a nested component property, which doesn't apply to stand alone + command jobs. BaseNode._to_dict() needs to be able to dump to both pipeline child job dict as well as stand + alone job dict base on context. + """ + + @classmethod + def _from_rest_object(cls, obj: dict) -> "BaseNode": + if CommonYamlFields.TYPE not in obj: + obj[CommonYamlFields.TYPE] = NodeType.COMMAND + + from azure.ai.ml.entities._job.pipeline._load_component import pipeline_node_factory + + # todo: refine Hard code for now to support different task type for DataTransfer node + _type = obj[CommonYamlFields.TYPE] + if _type == NodeType.DATA_TRANSFER: + _type = "_".join([NodeType.DATA_TRANSFER, obj.get("task", "")]) + instance: BaseNode = pipeline_node_factory.get_create_instance_func(_type)() + init_kwargs = instance._from_rest_object_to_init_params(obj) + # TODO: Bug Item number: 2883415 + instance.__init__(**init_kwargs) # type: ignore + return instance + + @classmethod + def _from_rest_object_to_init_params(cls, obj: dict) -> Dict: + """Convert the rest object to a dict containing items to init the node. + + Will be used in _from_rest_object. Please override this method instead of _from_rest_object to make the logic + reusable. + + :param obj: The REST object + :type obj: dict + :return: The init params + :rtype: Dict + """ + inputs = obj.get("inputs", {}) + outputs = obj.get("outputs", {}) + + obj["inputs"] = BaseNode._from_rest_inputs(inputs) + obj["outputs"] = BaseNode._from_rest_outputs(outputs) + + # Change computeId -> compute + compute_id = obj.pop("computeId", None) + obj["compute"] = get_resource_name_from_arm_id_safe(compute_id) + + # Change componentId -> component. Note that sweep node has no componentId. + if "componentId" in obj: + obj["component"] = obj.pop("componentId") + + # distribution, sweep won't have distribution + if "distribution" in obj and obj["distribution"]: + from azure.ai.ml.entities._job.distribution import DistributionConfiguration + + obj["distribution"] = DistributionConfiguration._from_rest_object(obj["distribution"]) + + return obj + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + """List of fields to be picked from self._to_dict() in self._to_rest_object(). + + By default, returns an empty list. + + Override this method to add custom fields. + + :return: List of fields to pick + :rtype: List[str] + """ + + return [] + + def _to_rest_object(self, **kwargs: Any) -> dict: # pylint: disable=unused-argument + """Convert self to a rest object for remote call. + + :return: The rest object + :rtype: dict + """ + base_dict, rest_obj = self._to_dict(), {} + for key in self._picked_fields_from_dict_to_rest_object(): + if key in base_dict: + rest_obj[key] = base_dict.get(key) + + rest_obj.update( + dict( # pylint: disable=use-dict-literal + name=self.name, + type=self.type, + display_name=self.display_name, + tags=self.tags, + computeId=self.compute, + inputs=self._to_rest_inputs(), + outputs=self._to_rest_outputs(), + properties=self.properties, + _source=self._source, + # add all arbitrary attributes to support setting unknown attributes + **self._get_attrs(), + ) + ) + # only add comment in REST object when it is set + if self.comment is not None: + rest_obj.update({"comment": self.comment}) + + return dict(convert_ordered_dict_to_dict(rest_obj)) + + @property + def inputs(self) -> Dict: + """Get the inputs for the object. + + :return: A dictionary containing the inputs for the object. + :rtype: Dict[str, Union[Input, str, bool, int, float]] + """ + return self._inputs # type: ignore + + @property + def outputs(self) -> Dict: + """Get the outputs of the object. + + :return: A dictionary containing the outputs for the object. + :rtype: Dict[str, Union[str, Output]] + """ + return self._outputs # type: ignore + + def __str__(self) -> str: + try: + return str(self._to_yaml()) + except BaseException: # pylint: disable=W0718 + # add try catch in case component job failed in schema parse + _obj: _AttrDict = _AttrDict() + return _obj.__str__() + + def __hash__(self) -> int: # type: ignore + return hash(self.__str__()) + + def __help__(self) -> Any: + # only show help when component has definition + if isinstance(self._component, Component): + # TODO: Bug Item number: 2883422 + return self._component.__help__() # type: ignore + return None + + def __bool__(self) -> bool: + # _attr_dict will return False if no extra attributes are set + return True + + def _get_origin_job_outputs(self) -> Dict[str, Union[str, Output]]: + """Restore outputs to JobOutput/BindingString and return them. + + :return: The origin job outputs + :rtype: Dict[str, Union[str, Output]] + """ + outputs: Dict = {} + if self.outputs is not None: + for output_name, output_obj in self.outputs.items(): + if isinstance(output_obj, NodeOutput): + outputs[output_name] = output_obj._data + else: + raise TypeError("unsupported built output type: {}: {}".format(output_name, type(output_obj))) + return outputs + + def _get_telemetry_values(self) -> Dict: + telemetry_values = {"type": self.type, "source": self._source} + return telemetry_values + + def _register_in_current_pipeline_component_builder(self) -> None: + """Register this node in current pipeline component builder by adding self to a global stack.""" + from azure.ai.ml.dsl._pipeline_component_builder import _add_component_to_current_definition_builder + + # TODO: would it be better if we make _add_component_to_current_definition_builder a public function of + # _PipelineComponentBuilderStack and make _PipelineComponentBuilderStack a singleton? + _add_component_to_current_definition_builder(self) + + def _is_input_set(self, input_name: str) -> bool: + built_inputs = self._build_inputs() + return input_name in built_inputs and built_inputs[input_name] is not None + + @classmethod + def _refine_optional_inputs_with_no_value(cls, node: "BaseNode", kwargs: Any) -> None: + """Refine optional inputs that have no default value and no value is provided when calling command/parallel + function. + + This is to align with behavior of calling component to generate a pipeline node. + + :param node: The node + :type node: BaseNode + :param kwargs: The kwargs + :type kwargs: dict + """ + for key, value in node.inputs.items(): + meta = value._data + if ( + isinstance(meta, Input) + and meta._is_primitive_type is False + and meta.optional is True + and not meta.path + and key not in kwargs + ): + value._data = None diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command.py new file mode 100644 index 00000000..0073307c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command.py @@ -0,0 +1,1017 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access,too-many-lines +import copy +import logging +import os +from enum import Enum +from os import PathLike +from typing import Any, Dict, List, Optional, Tuple, Union, cast, overload + +from marshmallow import INCLUDE, Schema + +from azure.ai.ml._restclient.v2025_01_01_preview.models import CommandJob as RestCommandJob +from azure.ai.ml._restclient.v2025_01_01_preview.models import JobBase +from azure.ai.ml._schema.core.fields import ExperimentalField, NestedField, UnionField +from azure.ai.ml._schema.job.command_job import CommandJobSchema +from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema +from azure.ai.ml._schema.job.services import JobServiceSchema +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, LOCAL_COMPUTE_PROPERTY, LOCAL_COMPUTE_TARGET +from azure.ai.ml.constants._component import ComponentSource, NodeType +from azure.ai.ml.entities._assets import Environment +from azure.ai.ml.entities._component.command_component import CommandComponent +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, + _BaseJobIdentityConfiguration, +) +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job._input_output_helpers import from_rest_data_outputs, from_rest_inputs_to_dataset_literal +from azure.ai.ml.entities._job.command_job import CommandJob +from azure.ai.ml.entities._job.distribution import ( + DistributionConfiguration, + MpiDistribution, + PyTorchDistribution, + RayDistribution, + TensorFlowDistribution, +) +from azure.ai.ml.entities._job.job_limits import CommandJobLimits +from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration +from azure.ai.ml.entities._job.job_service import ( + JobService, + JobServiceBase, + JupyterLabJobService, + SshJobService, + TensorBoardJobService, + VsCodeJobService, +) +from azure.ai.ml.entities._job.queue_settings import QueueSettings +from azure.ai.ml.entities._job.sweep.early_termination_policy import EarlyTerminationPolicy +from azure.ai.ml.entities._job.sweep.objective import Objective +from azure.ai.ml.entities._job.sweep.search_space import ( + Choice, + LogNormal, + LogUniform, + Normal, + QLogNormal, + QLogUniform, + QNormal, + QUniform, + Randint, + SweepDistribution, + Uniform, +) +from azure.ai.ml.entities._system_data import SystemData +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException + +from ..._schema import PathAwareSchema +from ..._schema.job.distribution import ( + MPIDistributionSchema, + PyTorchDistributionSchema, + RayDistributionSchema, + TensorFlowDistributionSchema, +) +from .._job.pipeline._io import NodeWithGroupInputMixin +from .._util import ( + convert_ordered_dict_to_dict, + from_rest_dict_to_dummy_rest_object, + get_rest_dict_for_node_attrs, + load_from_dict, + validate_attribute_type, +) +from .base_node import BaseNode +from .sweep import Sweep + +module_logger = logging.getLogger(__name__) + + +class Command(BaseNode, NodeWithGroupInputMixin): + """Base class for command node, used for command component version consumption. + + You should not instantiate this class directly. Instead, you should create it using the builder function: command(). + + :keyword component: The ID or instance of the command component or job to be run for the step. + :paramtype component: Union[str, ~azure.ai.ml.entities.CommandComponent] + :keyword compute: The compute target the job will run on. + :paramtype compute: Optional[str] + :keyword inputs: A mapping of input names to input data sources used in the job. + :paramtype inputs: Optional[dict[str, Union[ + ~azure.ai.ml.Input, str, bool, int, float, Enum]]] + :keyword outputs: A mapping of output names to output data sources used in the job. + :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]] + :keyword limits: The limits for the command component or job. + :paramtype limits: ~azure.ai.ml.entities.CommandJobLimits + :keyword identity: The identity that the command job will use while running on compute. + :paramtype identity: Optional[Union[ + dict[str, str], + ~azure.ai.ml.entities.ManagedIdentityConfiguration, + ~azure.ai.ml.entities.AmlTokenConfiguration, + ~azure.ai.ml.entities.UserIdentityConfiguration]] + :keyword distribution: The configuration for distributed jobs. + :paramtype distribution: Optional[Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution, + ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]] + :keyword environment: The environment that the job will run in. + :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]] + :keyword environment_variables: A dictionary of environment variable names and values. + These environment variables are set on the process where the user script is being executed. + :paramtype environment_variables: Optional[dict[str, str]] + :keyword resources: The compute resource configuration for the command. + :paramtype resources: Optional[~azure.ai.ml.entities.JobResourceConfiguration] + :keyword services: The interactive services for the node. This is an experimental parameter, and may change at any + time. Please see https://aka.ms/azuremlexperimental for more information. + :paramtype services: Optional[dict[str, Union[~azure.ai.ml.entities.JobService, + ~azure.ai.ml.entities.JupyterLabJobService, + ~azure.ai.ml.entities.SshJobService, ~azure.ai.ml.entities.TensorBoardJobService, + ~azure.ai.ml.entities.VsCodeJobService]]] + :keyword queue_settings: Queue settings for the job. + :paramtype queue_settings: Optional[~azure.ai.ml.entities.QueueSettings] + :keyword parent_job_name: parent job id for command job + :paramtype parent_job_name: Optional[str] + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if Command cannot be successfully validated. + Details will be provided in the error message. + """ + + # pylint: disable=too-many-instance-attributes + def __init__( + self, + *, + component: Union[str, CommandComponent], + compute: Optional[str] = None, + inputs: Optional[ + Dict[ + str, + Union[ + Input, + str, + bool, + int, + float, + Enum, + ], + ] + ] = None, + outputs: Optional[Dict[str, Union[str, Output]]] = None, + limits: Optional[CommandJobLimits] = None, + identity: Optional[ + Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration] + ] = None, + distribution: Optional[ + Union[ + Dict, + MpiDistribution, + TensorFlowDistribution, + PyTorchDistribution, + RayDistribution, + DistributionConfiguration, + ] + ] = None, + environment: Optional[Union[Environment, str]] = None, + environment_variables: Optional[Dict] = None, + resources: Optional[JobResourceConfiguration] = None, + services: Optional[ + Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]] + ] = None, + queue_settings: Optional[QueueSettings] = None, + parent_job_name: Optional[str] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + + # resolve normal dict to dict[str, JobService] + services = _resolve_job_services(services) + kwargs.pop("type", None) + self._parameters: dict = kwargs.pop("parameters", {}) + BaseNode.__init__( + self, + type=NodeType.COMMAND, + inputs=inputs, + outputs=outputs, + component=component, + compute=compute, + services=services, + **kwargs, + ) + + # init mark for _AttrDict + self._init = True + # initialize command job properties + self.limits = limits + self.identity = identity + self._distribution = distribution + self.environment_variables = {} if environment_variables is None else environment_variables + self.environment: Any = environment + self._resources = resources + self._services = services + self.queue_settings = queue_settings + self.parent_job_name = parent_job_name + + if isinstance(self.component, CommandComponent): + self.resources = self.resources or self.component.resources # type: ignore[assignment] + self.distribution = self.distribution or self.component.distribution + + self._swept: bool = False + self._init = False + + @classmethod + def _get_supported_inputs_types(cls) -> Tuple: + supported_types = super()._get_supported_inputs_types() or () + return ( + SweepDistribution, + *supported_types, + ) + + @classmethod + def _get_supported_outputs_types(cls) -> Tuple: + return str, Output + + @property + def parameters(self) -> Dict[str, str]: + """MLFlow parameters to be logged during the job. + + :return: The MLFlow parameters to be logged during the job. + :rtype: dict[str, str] + """ + return self._parameters + + @property + def distribution( + self, + ) -> Optional[ + Union[ + Dict, + MpiDistribution, + TensorFlowDistribution, + PyTorchDistribution, + RayDistribution, + DistributionConfiguration, + ] + ]: + """The configuration for the distributed command component or job. + + :return: The configuration for distributed jobs. + :rtype: Union[~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution, + ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution] + """ + return self._distribution + + @distribution.setter + def distribution( + self, + value: Union[Dict, PyTorchDistribution, TensorFlowDistribution, MpiDistribution, RayDistribution], + ) -> None: + """Sets the configuration for the distributed command component or job. + + :param value: The configuration for distributed jobs. + :type value: Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution, + ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution] + """ + if isinstance(value, dict): + dist_schema = UnionField( + [ + NestedField(PyTorchDistributionSchema, unknown=INCLUDE), + NestedField(TensorFlowDistributionSchema, unknown=INCLUDE), + NestedField(MPIDistributionSchema, unknown=INCLUDE), + ExperimentalField(NestedField(RayDistributionSchema, unknown=INCLUDE)), + ] + ) + value = dist_schema._deserialize(value=value, attr=None, data=None) + self._distribution = value + + @property + def resources(self) -> JobResourceConfiguration: + """The compute resource configuration for the command component or job. + + :rtype: ~azure.ai.ml.entities.JobResourceConfiguration + """ + return cast(JobResourceConfiguration, self._resources) + + @resources.setter + def resources(self, value: Union[Dict, JobResourceConfiguration]) -> None: + """Sets the compute resource configuration for the command component or job. + + :param value: The compute resource configuration for the command component or job. + :type value: Union[dict, ~azure.ai.ml.entities.JobResourceConfiguration] + """ + if isinstance(value, dict): + value = JobResourceConfiguration(**value) + self._resources = value + + @property + def queue_settings(self) -> Optional[QueueSettings]: + """The queue settings for the command component or job. + + :return: The queue settings for the command component or job. + :rtype: ~azure.ai.ml.entities.QueueSettings + """ + return self._queue_settings + + @queue_settings.setter + def queue_settings(self, value: Union[Dict, QueueSettings]) -> None: + """Sets the queue settings for the command component or job. + + :param value: The queue settings for the command component or job. + :type value: Union[dict, ~azure.ai.ml.entities.QueueSettings] + """ + if isinstance(value, dict): + value = QueueSettings(**value) + self._queue_settings = value + + @property + def identity( + self, + ) -> Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]: + """The identity that the job will use while running on compute. + + :return: The identity that the job will use while running on compute. + :rtype: Optional[Union[~azure.ai.ml.ManagedIdentityConfiguration, ~azure.ai.ml.AmlTokenConfiguration, + ~azure.ai.ml.UserIdentityConfiguration]] + """ + return self._identity + + @identity.setter + def identity( + self, + value: Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]], + ) -> None: + """Sets the identity that the job will use while running on compute. + + :param value: The identity that the job will use while running on compute. + :type value: Union[dict[str, str], ~azure.ai.ml.ManagedIdentityConfiguration, + ~azure.ai.ml.AmlTokenConfiguration, ~azure.ai.ml.UserIdentityConfiguration] + """ + if isinstance(value, dict): + identity_schema = UnionField( + [ + NestedField(ManagedIdentitySchema, unknown=INCLUDE), + NestedField(AMLTokenIdentitySchema, unknown=INCLUDE), + NestedField(UserIdentitySchema, unknown=INCLUDE), + ] + ) + value = identity_schema._deserialize(value=value, attr=None, data=None) + self._identity = value + + @property + def services( + self, + ) -> Optional[ + Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]] + ]: + """The interactive services for the node. + + This is an experimental parameter, and may change at any time. + Please see https://aka.ms/azuremlexperimental for more information. + + :rtype: dict[str, Union[~azure.ai.ml.entities.JobService, ~azure.ai.ml.entities.JupyterLabJobService, + ~azure.ai.ml.entities.SshJobService, ~azure.ai.ml.entities.TensorBoardJobService, + ~azure.ai.ml.entities.VsCodeJobService]] + """ + return self._services + + @services.setter + def services( + self, + value: Dict, + ) -> None: + """Sets the interactive services for the node. + + This is an experimental parameter, and may change at any time. + Please see https://aka.ms/azuremlexperimental for more information. + + :param value: The interactive services for the node. + :type value: dict[str, Union[~azure.ai.ml.entities.JobService, ~azure.ai.ml.entities.JupyterLabJobService, + ~azure.ai.ml.entities.SshJobService, ~azure.ai.ml.entities.TensorBoardJobService, + ~azure.ai.ml.entities.VsCodeJobService]] + """ + self._services = _resolve_job_services(value) # type: ignore[assignment] + + @property + def component(self) -> Union[str, CommandComponent]: + """The ID or instance of the command component or job to be run for the step. + + :return: The ID or instance of the command component or job to be run for the step. + :rtype: Union[str, ~azure.ai.ml.entities.CommandComponent] + """ + return self._component + + @property + def command(self) -> Optional[str]: + """The command to be executed. + + :rtype: Optional[str] + """ + # the same as code + if not isinstance(self.component, CommandComponent): + return None + + if self.component.command is None: + return None + return str(self.component.command) + + @command.setter + def command(self, value: str) -> None: + """Sets the command to be executed. + + :param value: The command to be executed. + :type value: str + """ + if isinstance(self.component, CommandComponent): + self.component.command = value + else: + msg = "Can't set command property for a registered component {}. Tried to set it to {}." + raise ValidationException( + message=msg.format(self.component, value), + no_personal_data_message=msg, + target=ErrorTarget.COMMAND_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + @property + def code(self) -> Optional[Union[str, PathLike]]: + """The source code to run the job. + + :rtype: Optional[Union[str, os.PathLike]] + """ + # BaseNode is an _AttrDict to allow dynamic attributes, so that lower version of SDK can work with attributes + # added in higher version of SDK. + # self.code will be treated as an Arbitrary attribute if it raises AttributeError in getting + # (when self.component doesn't have attribute code, self.component = 'azureml:xxx:1' e.g. + # you may check _AttrDict._is_arbitrary_attr for detailed logic for Arbitrary judgement), + # then its value will be set to _AttrDict and be deserialized as {"shape": {}} instead of None, + # which is invalid in schema validation. + if not isinstance(self.component, CommandComponent): + return None + + if self.component.code is None: + return None + + return str(self.component.code) + + @code.setter + def code(self, value: str) -> None: + """Sets the source code to run the job. + + :param value: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url + pointing to a remote location. + :type value: str + """ + if isinstance(self.component, CommandComponent): + self.component.code = value + else: + msg = "Can't set code property for a registered component {}" + raise ValidationException( + message=msg.format(self.component), + no_personal_data_message=msg, + target=ErrorTarget.COMMAND_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + def set_resources( + self, + *, + instance_type: Optional[Union[str, List[str]]] = None, + instance_count: Optional[int] = None, + locations: Optional[List[str]] = None, + properties: Optional[Dict] = None, + docker_args: Optional[Union[str, List[str]]] = None, + shm_size: Optional[str] = None, + # pylint: disable=unused-argument + **kwargs: Any, + ) -> None: + """Set resources for Command. + + :keyword instance_type: The type of compute instance to run the job on. If not specified, the job will run on + the default compute target. + :paramtype instance_type: Optional[Union[str, List[str]]] + :keyword instance_count: The number of instances to run the job on. If not specified, the job will run on a + single instance. + :paramtype instance_count: Optional[int] + :keyword locations: The list of locations where the job will run. If not specified, the job will run on the + default compute target. + :paramtype locations: Optional[List[str]] + :keyword properties: The properties of the job. + :paramtype properties: Optional[dict] + :keyword docker_args: The Docker arguments for the job. + :paramtype docker_args: Optional[Union[str,List[str]]] + :keyword shm_size: The size of the docker container's shared memory block. This should be in the + format of (number)(unit) where the number has to be greater than 0 and the unit can be one of + b(bytes), k(kilobytes), m(megabytes), or g(gigabytes). + :paramtype shm_size: Optional[str] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_command_configurations.py + :start-after: [START command_set_resources] + :end-before: [END command_set_resources] + :language: python + :dedent: 8 + :caption: Setting resources on a Command. + """ + if self.resources is None: + self.resources = JobResourceConfiguration() + + if locations is not None: + self.resources.locations = locations + if instance_type is not None: + self.resources.instance_type = instance_type + if instance_count is not None: + self.resources.instance_count = instance_count + if properties is not None: + self.resources.properties = properties + if docker_args is not None: + self.resources.docker_args = docker_args + if shm_size is not None: + self.resources.shm_size = shm_size + + # Save the resources to internal component as well, otherwise calling sweep() will loose the settings + if isinstance(self.component, CommandComponent): + self.component.resources = self.resources + + def set_limits(self, *, timeout: int, **kwargs: Any) -> None: # pylint: disable=unused-argument + """Set limits for Command. + + :keyword timeout: The timeout for the job in seconds. + :paramtype timeout: int + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_command_configurations.py + :start-after: [START command_set_limits] + :end-before: [END command_set_limits] + :language: python + :dedent: 8 + :caption: Setting a timeout limit of 10 seconds on a Command. + """ + if isinstance(self.limits, CommandJobLimits): + self.limits.timeout = timeout + else: + self.limits = CommandJobLimits(timeout=timeout) + + def set_queue_settings(self, *, job_tier: Optional[str] = None, priority: Optional[str] = None) -> None: + """Set QueueSettings for the job. + + :keyword job_tier: The job tier. Accepted values are "Spot", "Basic", "Standard", or "Premium". + :paramtype job_tier: Optional[str] + :keyword priority: The priority of the job on the compute. Defaults to "Medium". + :paramtype priority: Optional[str] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_command_configurations.py + :start-after: [START command_set_queue_settings] + :end-before: [END command_set_queue_settings] + :language: python + :dedent: 8 + :caption: Configuring queue settings on a Command. + """ + if isinstance(self.queue_settings, QueueSettings): + self.queue_settings.job_tier = job_tier + self.queue_settings.priority = priority + else: + self.queue_settings = QueueSettings(job_tier=job_tier, priority=priority) + + def sweep( + self, + *, + primary_metric: str, + goal: str, + sampling_algorithm: str = "random", + compute: Optional[str] = None, + max_concurrent_trials: Optional[int] = None, + max_total_trials: Optional[int] = None, + timeout: Optional[int] = None, + trial_timeout: Optional[int] = None, + early_termination_policy: Optional[Union[EarlyTerminationPolicy, str]] = None, + search_space: Optional[ + Dict[ + str, + Union[ + Choice, LogNormal, LogUniform, Normal, QLogNormal, QLogUniform, QNormal, QUniform, Randint, Uniform + ], + ] + ] = None, + identity: Optional[ + Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration] + ] = None, + queue_settings: Optional[QueueSettings] = None, + job_tier: Optional[str] = None, + priority: Optional[str] = None, + ) -> Sweep: + """Turns the command into a sweep node with extra sweep run setting. The command component + in the current command node will be used as its trial component. A command node can sweep + multiple times, and the generated sweep node will share the same trial component. + + :keyword primary_metric: The primary metric of the sweep objective - e.g. AUC (Area Under the Curve). + The metric must be logged while running the trial component. + :paramtype primary_metric: str + :keyword goal: The goal of the Sweep objective. Accepted values are "minimize" or "maximize". + :paramtype goal: str + :keyword sampling_algorithm: The sampling algorithm to use inside the search space. + Acceptable values are "random", "grid", or "bayesian". Defaults to "random". + :paramtype sampling_algorithm: str + :keyword compute: The target compute to run the node on. If not specified, the current node's compute + will be used. + :paramtype compute: Optional[str] + :keyword max_total_trials: The maximum number of total trials to run. This value will overwrite the value in + CommandJob.limits if specified. + :paramtype max_total_trials: Optional[int] + :keyword max_concurrent_trials: The maximum number of concurrent trials for the Sweep job. + :paramtype max_concurrent_trials: Optional[int] + :keyword timeout: The maximum run duration in seconds, after which the job will be cancelled. + :paramtype timeout: Optional[int] + :keyword trial_timeout: The Sweep Job trial timeout value, in seconds. + :paramtype trial_timeout: Optional[int] + :keyword early_termination_policy: The early termination policy of the sweep node. Acceptable + values are "bandit", "median_stopping", or "truncation_selection". Defaults to None. + :paramtype early_termination_policy: Optional[Union[~azure.ai.ml.sweep.BanditPolicy, + ~azure.ai.ml.sweep.TruncationSelectionPolicy, ~azure.ai.ml.sweep.MedianStoppingPolicy, str]] + :keyword identity: The identity that the job will use while running on compute. + :paramtype identity: Optional[Union[ + ~azure.ai.ml.ManagedIdentityConfiguration, + ~azure.ai.ml.AmlTokenConfiguration, + ~azure.ai.ml.UserIdentityConfiguration]] + :keyword search_space: The search space to use for the sweep job. + :paramtype search_space: Optional[Dict[str, Union[ + Choice, + LogNormal, + LogUniform, + Normal, + QLogNormal, + QLogUniform, + QNormal, + QUniform, + Randint, + Uniform + + ]]] + + :keyword queue_settings: The queue settings for the job. + :paramtype queue_settings: Optional[~azure.ai.ml.entities.QueueSettings] + :keyword job_tier: **Experimental** The job tier. Accepted values are "Spot", "Basic", + "Standard", or "Premium". + :paramtype job_tier: Optional[str] + :keyword priority: **Experimental** The compute priority. Accepted values are "low", + "medium", and "high". + :paramtype priority: Optional[str] + :return: A Sweep node with the component from current Command node as its trial component. + :rtype: ~azure.ai.ml.entities.Sweep + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_sweep_configurations.py + :start-after: [START configure_sweep_job_bandit_policy] + :end-before: [END configure_sweep_job_bandit_policy] + :language: python + :dedent: 8 + :caption: Creating a Sweep node from a Command job. + """ + self._swept = True + # inputs & outputs are already built in source Command obj + inputs, inputs_search_space = Sweep._get_origin_inputs_and_search_space(self.inputs) + if search_space: + inputs_search_space.update(search_space) + + if not queue_settings: + queue_settings = self.queue_settings + if queue_settings is not None: + if job_tier is not None: + queue_settings.job_tier = job_tier + if priority is not None: + queue_settings.priority = priority + + sweep_node = Sweep( + trial=copy.deepcopy( + self.component + ), # Make a copy of the underneath Component so that the original node can still be used. + compute=self.compute if compute is None else compute, + objective=Objective(goal=goal, primary_metric=primary_metric), + sampling_algorithm=sampling_algorithm, + inputs=inputs, + outputs=self._get_origin_job_outputs(), + search_space=inputs_search_space, + early_termination=early_termination_policy, + name=self.name, + description=self.description, + display_name=self.display_name, + tags=self.tags, + properties=self.properties, + experiment_name=self.experiment_name, + identity=self.identity if not identity else identity, + _from_component_func=True, + queue_settings=queue_settings, + ) + sweep_node.set_limits( + max_total_trials=max_total_trials, + max_concurrent_trials=max_concurrent_trials, + timeout=timeout, + trial_timeout=trial_timeout, + ) + return sweep_node + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, CommandComponent), + "environment": (str, Environment), + "environment_variables": dict, + "resources": (dict, JobResourceConfiguration), + "limits": (dict, CommandJobLimits), + "code": (str, os.PathLike), + } + + def _to_job(self) -> CommandJob: + if isinstance(self.component, CommandComponent): + return CommandJob( + id=self.id, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + properties=self.properties, + command=self.component.command, + experiment_name=self.experiment_name, + code=self.component.code, + compute=self.compute, + status=self.status, + environment=self.environment, + distribution=self.distribution, + identity=self.identity, + environment_variables=self.environment_variables, + resources=self.resources, + limits=self.limits, + inputs=self._job_inputs, + outputs=self._job_outputs, + services=self.services, + creation_context=self.creation_context, + parameters=self.parameters, + queue_settings=self.queue_settings, + parent_job_name=self.parent_job_name, + ) + + return CommandJob( + id=self.id, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + properties=self.properties, + command=None, + experiment_name=self.experiment_name, + code=None, + compute=self.compute, + status=self.status, + environment=self.environment, + distribution=self.distribution, + identity=self.identity, + environment_variables=self.environment_variables, + resources=self.resources, + limits=self.limits, + inputs=self._job_inputs, + outputs=self._job_outputs, + services=self.services, + creation_context=self.creation_context, + parameters=self.parameters, + queue_settings=self.queue_settings, + parent_job_name=self.parent_job_name, + ) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["resources", "distribution", "limits", "environment_variables", "queue_settings"] + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + for key, value in { + "componentId": self._get_component_id(), + "distribution": get_rest_dict_for_node_attrs(self.distribution, clear_empty_value=True), + "limits": get_rest_dict_for_node_attrs(self.limits, clear_empty_value=True), + "resources": get_rest_dict_for_node_attrs(self.resources, clear_empty_value=True), + "services": get_rest_dict_for_node_attrs(self.services), + "identity": get_rest_dict_for_node_attrs(self.identity), + "queue_settings": get_rest_dict_for_node_attrs(self.queue_settings, clear_empty_value=True), + }.items(): + if value is not None: + rest_obj[key] = value + return cast(dict, convert_ordered_dict_to_dict(rest_obj)) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Command": + from .command_func import command + + loaded_data = load_from_dict(CommandJobSchema, data, context, additional_message, **kwargs) + + # resources a limits properties are flatten in command() function, exact them and set separately + resources = loaded_data.pop("resources", None) + limits = loaded_data.pop("limits", None) + + command_job: Command = command(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + command_job.resources = resources + command_job.limits = limits + return command_job + + @classmethod + def _from_rest_object_to_init_params(cls, obj: dict) -> Dict: + obj = BaseNode._from_rest_object_to_init_params(obj) + + if "resources" in obj and obj["resources"]: + obj["resources"] = JobResourceConfiguration._from_rest_object(obj["resources"]) + + # services, sweep won't have services + if "services" in obj and obj["services"]: + # pipeline node rest object are dicts while _from_rest_job_services expect RestJobService + services = {} + for service_name, service in obj["services"].items(): + # in rest object of a pipeline job, service will be transferred to a dict as + # it's attributes of a node, but JobService._from_rest_object expect a + # RestJobService, so we need to convert it back. Here we convert the dict to a + # dummy rest object which may work as a RestJobService instead. + services[service_name] = from_rest_dict_to_dummy_rest_object(service) + obj["services"] = JobServiceBase._from_rest_job_services(services) + + # handle limits + if "limits" in obj and obj["limits"]: + obj["limits"] = CommandJobLimits._from_rest_object(obj["limits"]) + + if "identity" in obj and obj["identity"]: + obj["identity"] = _BaseJobIdentityConfiguration._from_rest_object(obj["identity"]) + + if "queue_settings" in obj and obj["queue_settings"]: + obj["queue_settings"] = QueueSettings._from_rest_object(obj["queue_settings"]) + + return obj + + @classmethod + def _load_from_rest_job(cls, obj: JobBase) -> "Command": + from .command_func import command + + rest_command_job: RestCommandJob = obj.properties + + command_job: Command = command( + name=obj.name, + display_name=rest_command_job.display_name, + description=rest_command_job.description, + tags=rest_command_job.tags, + properties=rest_command_job.properties, + command=rest_command_job.command, + experiment_name=rest_command_job.experiment_name, + services=JobServiceBase._from_rest_job_services(rest_command_job.services), + status=rest_command_job.status, + creation_context=SystemData._from_rest_object(obj.system_data) if obj.system_data else None, + code=rest_command_job.code_id, + compute=rest_command_job.compute_id, + environment=rest_command_job.environment_id, + distribution=DistributionConfiguration._from_rest_object(rest_command_job.distribution), + parameters=rest_command_job.parameters, + identity=( + _BaseJobIdentityConfiguration._from_rest_object(rest_command_job.identity) + if rest_command_job.identity + else None + ), + environment_variables=rest_command_job.environment_variables, + inputs=from_rest_inputs_to_dataset_literal(rest_command_job.inputs), + outputs=from_rest_data_outputs(rest_command_job.outputs), + ) + command_job._id = obj.id + command_job.resources = cast( + JobResourceConfiguration, JobResourceConfiguration._from_rest_object(rest_command_job.resources) + ) + command_job.limits = CommandJobLimits._from_rest_object(rest_command_job.limits) + command_job.queue_settings = QueueSettings._from_rest_object(rest_command_job.queue_settings) + if isinstance(command_job.component, CommandComponent): + command_job.component._source = ( + ComponentSource.REMOTE_WORKSPACE_JOB + ) # This is used by pipeline job telemetries. + + # Handle special case of local job + if ( + command_job.resources is not None + and command_job.resources.properties is not None + and command_job.resources.properties.get(LOCAL_COMPUTE_PROPERTY, None) + ): + command_job.compute = LOCAL_COMPUTE_TARGET + command_job.resources.properties.pop(LOCAL_COMPUTE_PROPERTY) + return command_job + + def _build_inputs(self) -> Dict: + inputs = super(Command, self)._build_inputs() + built_inputs = {} + # Validate and remove non-specified inputs + for key, value in inputs.items(): + if value is not None: + built_inputs[key] = value + + return built_inputs + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import CommandSchema + + return CommandSchema(context=context) + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> "Command": + """Call Command as a function will return a new instance each time. + + :return: A Command node + :rtype: Command + """ + if isinstance(self._component, CommandComponent): + # call this to validate inputs + node: Command = self._component(*args, **kwargs) + # merge inputs + for name, original_input in self.inputs.items(): + if name not in kwargs: + # use setattr here to make sure owner of input won't change + setattr(node.inputs, name, original_input._data) + node._job_inputs[name] = original_input._data + # get outputs + for name, original_output in self.outputs.items(): + # use setattr here to make sure owner of input won't change + if not isinstance(original_output, str): + setattr(node.outputs, name, original_output._data) + node._job_outputs[name] = original_output._data + self._refine_optional_inputs_with_no_value(node, kwargs) + # set default values: compute, environment_variables, outputs + # won't copy name to be able to distinguish if a node's name is assigned by user + # e.g. node_1 = command_func() + # In above example, node_1.name will be None so we can apply node_1 as it's name + node.compute = self.compute + node.tags = self.tags + # Pass through the display name only if the display name is not system generated. + node.display_name = self.display_name if self.display_name != self.name else None + node.environment = copy.deepcopy(self.environment) + # deep copy for complex object + node.environment_variables = copy.deepcopy(self.environment_variables) + node.limits = copy.deepcopy(self.limits) + node.distribution = copy.deepcopy(self.distribution) + node.resources = copy.deepcopy(self.resources) + node.queue_settings = copy.deepcopy(self.queue_settings) + node.services = copy.deepcopy(self.services) + node.identity = copy.deepcopy(self.identity) + return node + msg = "Command can be called as a function only when referenced component is {}, currently got {}." + raise ValidationException( + message=msg.format(type(CommandComponent), self._component), + no_personal_data_message=msg.format(type(CommandComponent), "self._component"), + target=ErrorTarget.COMMAND_JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + +@overload +def _resolve_job_services(services: Optional[Dict]): ... + + +@overload +def _resolve_job_services( + services: Dict[str, Union[JobServiceBase, Dict]], +) -> Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]]: ... + + +def _resolve_job_services( + services: Optional[Dict[str, Union[JobServiceBase, Dict]]], +) -> Optional[Dict]: + """Resolve normal dict to dict[str, JobService] + + :param services: A dict that maps service names to either a JobServiceBase object, or a Dict used to build one + :type services: Optional[Dict[str, Union[JobServiceBase, Dict]]] + :return: + * None if `services` is None + * A map of job service names to job services + :rtype: Optional[ + Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]] + ] + """ + if services is None: + return None + + if not isinstance(services, dict): + msg = f"Services must be a dict, got {type(services)} instead." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.COMMAND_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + + result = {} + for name, service in services.items(): + if isinstance(service, dict): + service = load_from_dict(JobServiceSchema, service, context={BASE_PATH_CONTEXT_KEY: "."}) + elif not isinstance( + service, (JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService) + ): + msg = f"Service value for key {name!r} must be a dict or JobService object, got {type(service)} instead." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.COMMAND_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + result[name] = service + return result diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py new file mode 100644 index 00000000..c542f880 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py @@ -0,0 +1,314 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import os +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +from azure.ai.ml.constants._common import AssetTypes, LegacyAssetTypes +from azure.ai.ml.constants._component import ComponentSource +from azure.ai.ml.entities._assets.environment import Environment +from azure.ai.ml.entities._component.command_component import CommandComponent +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, +) +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job.distribution import ( + DistributionConfiguration, + MpiDistribution, + PyTorchDistribution, + RayDistribution, + TensorFlowDistribution, +) +from azure.ai.ml.entities._job.job_service import ( + JobService, + JupyterLabJobService, + SshJobService, + TensorBoardJobService, + VsCodeJobService, +) +from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin +from azure.ai.ml.entities._job.sweep.search_space import SweepDistribution +from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException + +from .command import Command + +SUPPORTED_INPUTS = [ + LegacyAssetTypes.PATH, + AssetTypes.URI_FILE, + AssetTypes.URI_FOLDER, + AssetTypes.CUSTOM_MODEL, + AssetTypes.MLFLOW_MODEL, + AssetTypes.MLTABLE, + AssetTypes.TRITON_MODEL, +] + + +def _parse_input(input_value: Union[Input, Dict, SweepDistribution, str, bool, int, float]) -> Tuple: + component_input = None + job_input: Optional[Union[Input, Dict, SweepDistribution, str, bool, int, float]] = None + + if isinstance(input_value, Input): + component_input = Input(**input_value._to_dict()) + input_type = input_value.type + if input_type in SUPPORTED_INPUTS: + job_input = Input(**input_value._to_dict()) + elif isinstance(input_value, dict): + # if user provided dict, we try to parse it to Input. + # for job input, only parse for path type + input_type = input_value.get("type", None) + if input_type in SUPPORTED_INPUTS: + job_input = Input(**input_value) + component_input = Input(**input_value) + elif isinstance(input_value, (SweepDistribution, str, bool, int, float)): + # Input bindings are not supported + component_input = ComponentTranslatableMixin._to_input_builder_function(input_value) + job_input = input_value + else: + msg = f"Unsupported input type: {type(input_value)}" + msg += ", only Input, dict, str, bool, int and float are supported." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + return component_input, job_input + + +def _parse_output(output_value: Optional[Union[Output, Dict, str]]) -> Tuple: + component_output = None + job_output: Optional[Union[Output, Dict, str]] = None + + if isinstance(output_value, Output): + component_output = Output(**output_value._to_dict()) + job_output = Output(**output_value._to_dict()) + elif not output_value: + # output value can be None or empty dictionary + # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder + component_output = ComponentTranslatableMixin._to_output(output_value) + job_output = output_value + elif isinstance(output_value, dict): # When output value is a non-empty dictionary + job_output = Output(**output_value) + component_output = Output(**output_value) + elif isinstance(output_value, str): # When output is passed in from pipeline job yaml + job_output = output_value + else: + msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + return component_output, job_output + + +def _parse_inputs_outputs(io_dict: Dict, parse_func: Callable) -> Tuple[Dict, Dict]: + component_io_dict, job_io_dict = {}, {} + if io_dict: + for key, val in io_dict.items(): + component_io, job_io = parse_func(val) + component_io_dict[key] = component_io + job_io_dict[key] = job_io + return component_io_dict, job_io_dict + + +def command( + *, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + properties: Optional[Dict] = None, + display_name: Optional[str] = None, + command: Optional[str] = None, # pylint: disable=redefined-outer-name + experiment_name: Optional[str] = None, + environment: Optional[Union[str, Environment]] = None, + environment_variables: Optional[Dict] = None, + distribution: Optional[ + Union[ + Dict, + MpiDistribution, + TensorFlowDistribution, + PyTorchDistribution, + RayDistribution, + DistributionConfiguration, + ] + ] = None, + compute: Optional[str] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + instance_count: Optional[int] = None, + instance_type: Optional[str] = None, + locations: Optional[List[str]] = None, + docker_args: Optional[Union[str, List[str]]] = None, + shm_size: Optional[str] = None, + timeout: Optional[int] = None, + code: Optional[Union[str, os.PathLike]] = None, + identity: Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]] = None, + is_deterministic: bool = True, + services: Optional[ + Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]] + ] = None, + job_tier: Optional[str] = None, + priority: Optional[str] = None, + parent_job_name: Optional[str] = None, + **kwargs: Any, +) -> Command: + """Creates a Command object which can be used inside a dsl.pipeline function or used as a standalone Command job. + + :keyword name: The name of the Command job or component. + :paramtype name: Optional[str] + :keyword description: The description of the Command. Defaults to None. + :paramtype description: Optional[str] + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None. + :paramtype tags: Optional[dict[str, str]] + :keyword properties: The job property dictionary. Defaults to None. + :paramtype properties: Optional[dict[str, str]] + :keyword display_name: The display name of the job. Defaults to a randomly generated name. + :paramtype display_name: Optional[str] + :keyword command: The command to be executed. Defaults to None. + :paramtype command: Optional[str] + :keyword experiment_name: The name of the experiment that the job will be created under. Defaults to current + directory name. + :paramtype experiment_name: Optional[str] + :keyword environment: The environment that the job will run in. + :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]] + :keyword environment_variables: A dictionary of environment variable names and values. + These environment variables are set on the process where user script is being executed. + Defaults to None. + :paramtype environment_variables: Optional[dict[str, str]] + :keyword distribution: The configuration for distributed jobs. Defaults to None. + :paramtype distribution: Optional[Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution, + ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]] + :keyword compute: The compute target the job will run on. Defaults to default compute. + :paramtype compute: Optional[str] + :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None. + :paramtype inputs: Optional[dict[str, Union[~azure.ai.ml.Input, str, bool, int, float, Enum]]] + :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None. + :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]] + :keyword instance_count: The number of instances or nodes to be used by the compute target. Defaults to 1. + :paramtype instance_count: Optional[int] + :keyword instance_type: The type of VM to be used by the compute target. + :paramtype instance_type: Optional[str] + :keyword locations: The list of locations where the job will run. + :paramtype locations: Optional[List[str]] + :keyword docker_args: Extra arguments to pass to the Docker run command. This would override any + parameters that have already been set by the system, or in this section. This parameter is only + supported for Azure ML compute types. Defaults to None. + :paramtype docker_args: Optional[Union[str,List[str]]] + :keyword shm_size: The size of the Docker container's shared memory block. This should be in the + format of (number)(unit) where the number has to be greater than 0 and the unit can be one of + b(bytes), k(kilobytes), m(megabytes), or g(gigabytes). + :paramtype shm_size: Optional[str] + :keyword timeout: The number, in seconds, after which the job will be cancelled. + :paramtype timeout: Optional[int] + :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url + pointing to a remote location. + :paramtype code: Optional[Union[str, os.PathLike]] + :keyword identity: The identity that the command job will use while running on compute. + :paramtype identity: Optional[Union[ + ~azure.ai.ml.entities.ManagedIdentityConfiguration, + ~azure.ai.ml.entities.AmlTokenConfiguration, + ~azure.ai.ml.entities.UserIdentityConfiguration]] + :keyword is_deterministic: Specifies whether the Command will return the same output given the same input. + Defaults to True. When True, if a Command Component is deterministic and has been run before in the + current workspace with the same input and settings, it will reuse results from a previously submitted + job when used as a node or step in a pipeline. In that scenario, no compute resources will be used. + :paramtype is_deterministic: bool + :keyword services: The interactive services for the node. Defaults to None. This is an experimental parameter, + and may change at any time. Please see https://aka.ms/azuremlexperimental for more information. + :paramtype services: Optional[dict[str, Union[~azure.ai.ml.entities.JobService, + ~azure.ai.ml.entities.JupyterLabJobService, ~azure.ai.ml.entities.SshJobService, + ~azure.ai.ml.entities.TensorBoardJobService, ~azure.ai.ml.entities.VsCodeJobService]]] + :keyword job_tier: The job tier. Accepted values are "Spot", "Basic", "Standard", or "Premium". + :paramtype job_tier: Optional[str] + :keyword priority: The priority of the job on the compute. Accepted values are "low", "medium", and "high". + Defaults to "medium". + :paramtype priority: Optional[str] + :keyword parent_job_name: parent job id for command job + :paramtype parent_job_name: Optional[str] + :return: A Command object. + :rtype: ~azure.ai.ml.entities.Command + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_command_configurations.py + :start-after: [START command_function] + :end-before: [END command_function] + :language: python + :dedent: 8 + :caption: Creating a Command Job using the command() builder method. + """ + # pylint: disable=too-many-locals + inputs = inputs or {} + outputs = outputs or {} + component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) + # job inputs can not be None + job_inputs = {k: v for k, v in job_inputs.items() if v is not None} + component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) + + component = kwargs.pop("component", None) + if component is None: + component = CommandComponent( + name=name, + tags=tags, + code=code, + command=command, + environment=environment, + display_name=display_name, + description=description, + inputs=component_inputs, + outputs=component_outputs, + distribution=distribution, + environment_variables=environment_variables, + _source=ComponentSource.BUILDER, + is_deterministic=is_deterministic, + **kwargs, + ) + command_obj = Command( + component=component, + name=name, + description=description, + tags=tags, + properties=properties, + display_name=display_name, + experiment_name=experiment_name, + compute=compute, + inputs=job_inputs, + outputs=job_outputs, + identity=identity, + distribution=distribution, + environment=environment, + environment_variables=environment_variables, + services=services, + parent_job_name=parent_job_name, + **kwargs, + ) + + if ( + locations is not None + or instance_count is not None + or instance_type is not None + or docker_args is not None + or shm_size is not None + ): + command_obj.set_resources( + locations=locations, + instance_count=instance_count, + instance_type=instance_type, + docker_args=docker_args, + shm_size=shm_size, + ) + + if timeout is not None: + command_obj.set_limits(timeout=timeout) + + if job_tier is not None or priority is not None: + command_obj.set_queue_settings(job_tier=job_tier, priority=priority) + + return command_obj diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/condition_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/condition_node.py new file mode 100644 index 00000000..5a5ad58b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/condition_node.py @@ -0,0 +1,146 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import Any, Dict, List, Optional + +from azure.ai.ml._schema import PathAwareSchema +from azure.ai.ml._utils.utils import is_data_binding_expression +from azure.ai.ml.constants._component import ControlFlowType +from azure.ai.ml.entities._builders import BaseNode +from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode +from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob +from azure.ai.ml.entities._job.pipeline._io import InputOutputBase +from azure.ai.ml.entities._validation import MutableValidationResult + + +class ConditionNode(ControlFlowNode): + """Conditional node in the pipeline. + + Please do not directly use this class. + + :param condition: The condition for the conditional node. + :type condition: Any + :param true_block: The list of nodes to execute when the condition is true. + :type true_block: List[~azure.ai.ml.entities._builders.BaseNode] + :param false_block: The list of nodes to execute when the condition is false. + :type false_block: List[~azure.ai.ml.entities._builders.BaseNode] + """ + + def __init__( + self, condition: Any, *, true_block: Optional[List] = None, false_block: Optional[List] = None, **kwargs: Any + ) -> None: + kwargs.pop("type", None) + super(ConditionNode, self).__init__(type=ControlFlowType.IF_ELSE, **kwargs) + self.condition = condition + if true_block and not isinstance(true_block, list): + true_block = [true_block] + self._true_block = true_block + if false_block and not isinstance(false_block, list): + false_block = [false_block] + self._false_block = false_block + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema: + from azure.ai.ml._schema.pipeline.condition_node import ConditionNodeSchema + + return ConditionNodeSchema(context=context) + + @classmethod + def _from_rest_object(cls, obj: dict) -> "ConditionNode": + return cls(**obj) + + @classmethod + def _create_instance_from_schema_dict(cls, loaded_data: Dict) -> "ConditionNode": + """Create a condition node instance from schema parsed dict. + + :param loaded_data: The loaded data + :type loaded_data: Dict + :return: The ConditionNode node + :rtype: ConditionNode + """ + return cls(**loaded_data) + + @property + def true_block(self) -> Optional[List]: + """Get the list of nodes to execute when the condition is true. + + :return: The list of nodes to execute when the condition is true. + :rtype: List[~azure.ai.ml.entities._builders.BaseNode] + """ + return self._true_block + + @property + def false_block(self) -> Optional[List]: + """Get the list of nodes to execute when the condition is false. + + :return: The list of nodes to execute when the condition is false. + :rtype: List[~azure.ai.ml.entities._builders.BaseNode] + """ + return self._false_block + + def _customized_validate(self) -> MutableValidationResult: + return self._validate_params() + + def _validate_params(self) -> MutableValidationResult: + # pylint disable=protected-access + validation_result = self._create_empty_validation_result() + if not isinstance(self.condition, (str, bool, InputOutputBase)): + validation_result.append_error( + yaml_path="condition", + message=f"'condition' of dsl.condition node must be an instance of " + f"{str}, {bool} or {InputOutputBase}, got {type(self.condition)}.", + ) + + # Check if output is a control output. + # pylint: disable=protected-access + if isinstance(self.condition, InputOutputBase) and self.condition._meta is not None: + # pylint: disable=protected-access + output_definition = self.condition._meta + if output_definition is not None and not output_definition._is_primitive_type: + validation_result.append_error( + yaml_path="condition", + message=f"'condition' of dsl.condition node must be primitive type " + f"with value 'True', got {output_definition._is_primitive_type}", + ) + + # check if condition is valid binding + if isinstance(self.condition, str) and not is_data_binding_expression( + self.condition, ["parent"], is_singular=False + ): + error_tail = "for example, ${{parent.jobs.xxx.outputs.output}}" + validation_result.append_error( + yaml_path="condition", + message=f"'condition' of dsl.condition has invalid binding expression: {self.condition}, {error_tail}", + ) + + error_msg = ( + "{!r} of dsl.condition node must be an instance of " f"{BaseNode}, {AutoMLJob} or {str}," "got {!r}." + ) + blocks = self.true_block if self.true_block else [] + for block in blocks: + if block is not None and not isinstance(block, (BaseNode, AutoMLJob, str)): + validation_result.append_error( + yaml_path="true_block", message=error_msg.format("true_block", type(block)) + ) + blocks = self.false_block if self.false_block else [] + for block in blocks: + if block is not None and not isinstance(block, (BaseNode, AutoMLJob, str)): + validation_result.append_error( + yaml_path="false_block", message=error_msg.format("false_block", type(block)) + ) + + # check if true/false block is valid binding + for name, blocks in {"true_block": self.true_block, "false_block": self.false_block}.items(): # type: ignore + blocks = blocks if blocks else [] + for block in blocks: + if block is None or not isinstance(block, str): + continue + error_tail = "for example, ${{parent.jobs.xxx}}" + if not is_data_binding_expression(block, ["parent", "jobs"], is_singular=False): + validation_result.append_error( + yaml_path=name, + message=f"'{name}' of dsl.condition has invalid binding expression: {block}, {error_tail}", + ) + + return validation_result diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/control_flow_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/control_flow_node.py new file mode 100644 index 00000000..c757a1e4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/control_flow_node.py @@ -0,0 +1,170 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import logging +import re +import uuid +from abc import ABC +from typing import Any, Dict, Union, cast # pylint: disable=unused-import + +from marshmallow import ValidationError + +from azure.ai.ml._utils.utils import is_data_binding_expression +from azure.ai.ml.constants._common import CommonYamlFields +from azure.ai.ml.constants._component import ComponentSource, ControlFlowType +from azure.ai.ml.entities._mixins import YamlTranslatableMixin +from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException + +from .._util import convert_ordered_dict_to_dict +from .base_node import BaseNode + +module_logger = logging.getLogger(__name__) + + +# ControlFlowNode did not inherit from BaseNode since it doesn't have inputs/outputs like other nodes. +class ControlFlowNode(YamlTranslatableMixin, PathAwareSchemaValidatableMixin, ABC): + """Base class for control flow node in the pipeline. + + Please do not directly use this class. + + :param kwargs: Additional keyword arguments. + :type kwargs: Dict[str, Union[Any]] + """ + + def __init__(self, **kwargs: Any) -> None: + # TODO(1979547): refactor this + _source = kwargs.pop("_source", None) + self._source = _source if _source else ComponentSource.DSL + _from_component_func = kwargs.pop("_from_component_func", False) + self._type = kwargs.get("type", None) + self._instance_id = str(uuid.uuid4()) + self.name = None + if _from_component_func: + # add current control flow node in pipeline stack for dsl scenario and remove the body from the pipeline + # stack. + self._register_in_current_pipeline_component_builder() + + @property + def type(self) -> Any: + """Get the type of the control flow node. + + :return: The type of the control flow node. + :rtype: self._type + """ + return self._type + + def _to_dict(self) -> Dict: + return dict(self._dump_for_validation()) + + def _to_rest_object(self, **kwargs: Any) -> dict: # pylint: disable=unused-argument + """Convert self to a rest object for remote call. + + :return: The rest object + :rtype: dict + """ + rest_obj = self._to_dict() + rest_obj["_source"] = self._source + return cast(dict, convert_ordered_dict_to_dict(rest_obj)) + + def _register_in_current_pipeline_component_builder(self) -> None: + """Register this node in current pipeline component builder by adding self to a global stack.""" + from azure.ai.ml.dsl._pipeline_component_builder import _add_component_to_current_definition_builder + + _add_component_to_current_definition_builder(self) # type: ignore[arg-type] + + @classmethod + def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException: + return ValidationException( + message=message, + no_personal_data_message=no_personal_data_message, + target=ErrorTarget.PIPELINE, + ) + + +class LoopNode(ControlFlowNode, ABC): + """Base class for loop node in the pipeline. + + Please do not directly use this class. + + :param body: The body of the loop node. + :type body: ~azure.ai.ml.entities._builders.BaseNode + :param kwargs: Additional keyword arguments. + :type kwargs: Dict[str, Union[Any]] + """ + + def __init__(self, *, body: BaseNode, **kwargs: Any) -> None: + self._body = body + super(LoopNode, self).__init__(**kwargs) + # always set the referenced control flow node instance id to the body. + self.body._set_referenced_control_flow_node_instance_id(self._instance_id) + + @property + def body(self) -> BaseNode: + """Get the body of the loop node. + + :return: The body of the loop node. + :rtype: ~azure.ai.ml.entities._builders.BaseNode + """ + return self._body + + _extra_body_types = None + + @classmethod + def _attr_type_map(cls) -> dict: + from .command import Command + from .pipeline import Pipeline + + enable_body_type = (Command, Pipeline) + if cls._extra_body_types is not None: + enable_body_type = enable_body_type + cls._extra_body_types + return { + "body": enable_body_type, + } + + @classmethod + def _get_body_from_pipeline_jobs(cls, pipeline_jobs: Dict[str, BaseNode], body_name: str) -> BaseNode: + # Get body object from pipeline job list. + if body_name not in pipeline_jobs: + raise ValidationError( + message=f'Cannot find the do-while loop body "{body_name}" in the pipeline.', + target=ErrorTarget.PIPELINE, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + return pipeline_jobs[body_name] + + def _validate_body(self) -> MutableValidationResult: + # pylint: disable=protected-access + validation_result = self._create_empty_validation_result() + + if self._instance_id != self.body._referenced_control_flow_node_instance_id: + # When the body is used in another loop node record the error message in validation result. + validation_result.append_error("body", "The body of loop node cannot be promoted as another loop again.") + return validation_result + + def _get_body_binding_str(self) -> str: + return "${{parent.jobs.%s}}" % self.body.name + + @staticmethod + def _get_data_binding_expression_value(expression: str, regex: str) -> str: + try: + if is_data_binding_expression(expression): + return str(re.findall(regex, expression)[0]) + + return expression + except Exception: # pylint: disable=W0718 + module_logger.warning("Cannot get the value from data binding expression %s.", expression) + return expression + + @staticmethod + def _is_loop_node_dict(obj: Any) -> bool: + return obj.get(CommonYamlFields.TYPE, None) in [ControlFlowType.DO_WHILE, ControlFlowType.PARALLEL_FOR] + + @classmethod + def _from_rest_object(cls, obj: dict, pipeline_jobs: dict) -> "LoopNode": + from azure.ai.ml.entities._job.pipeline._load_component import pipeline_node_factory + + node_type = obj.get(CommonYamlFields.TYPE, None) + load_from_rest_obj_func = pipeline_node_factory.get_load_from_rest_object_func(_type=node_type) + return load_from_rest_obj_func(obj, pipeline_jobs) # type: ignore diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py new file mode 100644 index 00000000..83e88a48 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer.py @@ -0,0 +1,575 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import logging +from typing import Any, Dict, List, Optional, Tuple, Union, cast + +from marshmallow import Schema + +from azure.ai.ml._restclient.v2022_10_01_preview.models import JobBase +from azure.ai.ml._schema.job.data_transfer_job import ( + DataTransferCopyJobSchema, + DataTransferExportJobSchema, + DataTransferImportJobSchema, +) +from azure.ai.ml._utils._experimental import experimental +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AssetTypes +from azure.ai.ml.constants._component import DataTransferTaskType, ExternalDataType, NodeType +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._component.datatransfer_component import ( + DataTransferComponent, + DataTransferCopyComponent, + DataTransferExportComponent, + DataTransferImportComponent, +) +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem +from azure.ai.ml.entities._job.data_transfer.data_transfer_job import ( + DataTransferCopyJob, + DataTransferExportJob, + DataTransferImportJob, +) +from azure.ai.ml.entities._validation.core import MutableValidationResult +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException + +from ..._schema import PathAwareSchema +from .._job.pipeline._io import NodeOutput +from .._util import convert_ordered_dict_to_dict, load_from_dict, validate_attribute_type +from .base_node import BaseNode + +module_logger = logging.getLogger(__name__) + + +def _build_source_sink(io_dict: Optional[Union[Dict, Database, FileSystem]]) -> Optional[Union[Database, FileSystem]]: + if io_dict is None: + return io_dict + if isinstance(io_dict, (Database, FileSystem)): + component_io = io_dict + else: + if isinstance(io_dict, dict): + data_type = io_dict.pop("type", None) + if data_type == ExternalDataType.DATABASE: + component_io = Database(**io_dict) + elif data_type == ExternalDataType.FILE_SYSTEM: + component_io = FileSystem(**io_dict) + else: + msg = "Type in source or sink only support {} and {}, currently got {}." + raise ValidationException( + message=msg.format( + ExternalDataType.DATABASE, + ExternalDataType.FILE_SYSTEM, + data_type, + ), + no_personal_data_message=msg.format( + ExternalDataType.DATABASE, + ExternalDataType.FILE_SYSTEM, + "data_type", + ), + target=ErrorTarget.DATA_TRANSFER_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + else: + msg = "Source or sink only support dict, Database and FileSystem" + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.DATA_TRANSFER_JOB, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + return component_io + + +class DataTransfer(BaseNode): + """Base class for data transfer node, used for data transfer component version consumption. + + You should not instantiate this class directly. + """ + + def __init__( + self, + *, + component: Union[str, DataTransferCopyComponent, DataTransferImportComponent], + compute: Optional[str] = None, + inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None, + outputs: Optional[Dict[str, Union[str, Output]]] = None, + **kwargs: Any, + ): + # resolve normal dict to dict[str, JobService] + kwargs.pop("type", None) + super().__init__( + type=NodeType.DATA_TRANSFER, + inputs=inputs, + outputs=outputs, + component=component, + compute=compute, + **kwargs, + ) + + @property + def component(self) -> Union[str, DataTransferComponent]: + res: Union[str, DataTransferComponent] = self._component + return res + + @classmethod + def _load_from_rest_job(cls, obj: JobBase) -> "DataTransfer": + # Todo: need update rest api + raise NotImplementedError("Not support submit standalone job for now") + + @classmethod + def _get_supported_outputs_types(cls) -> Tuple: + return str, Output + + def _build_inputs(self) -> Dict: + inputs = super(DataTransfer, self)._build_inputs() + built_inputs = {} + # Validate and remove non-specified inputs + for key, value in inputs.items(): + if value is not None: + built_inputs[key] = value + + return built_inputs + + +@experimental +class DataTransferCopy(DataTransfer): + """Base class for data transfer copy node. + + You should not instantiate this class directly. Instead, you should + create from builder function: copy_data. + + :param component: Id or instance of the data transfer component/job to be run for the step + :type component: DataTransferCopyComponent + :param inputs: Inputs to the data transfer. + :type inputs: Dict[str, Union[NodeOutput, Input, str]] + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Dict[str, Union[str, Output, dict]] + :param name: Name of the data transfer. + :type name: str + :param description: Description of the data transfer. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict[str, str] + :param display_name: Display name of the job. + :type display_name: str + :param experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + :type experiment_name: str + :param compute: The compute target the job runs on. + :type compute: str + :param data_copy_mode: data copy mode in copy task, possible value is "merge_with_overwrite", "fail_if_conflict". + :type data_copy_mode: str + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferCopy cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + component: Union[str, DataTransferCopyComponent], + compute: Optional[str] = None, + inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None, + outputs: Optional[Dict[str, Union[str, Output]]] = None, + data_copy_mode: Optional[str] = None, + **kwargs: Any, + ): + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + super().__init__( + inputs=inputs, + outputs=outputs, + component=component, + compute=compute, + **kwargs, + ) + # init mark for _AttrDict + self._init = True + self.task = DataTransferTaskType.COPY_DATA + self.data_copy_mode = data_copy_mode + is_component = isinstance(component, DataTransferCopyComponent) + if is_component: + _component: DataTransferCopyComponent = cast(DataTransferCopyComponent, component) + self.task = _component.task or self.task + self.data_copy_mode = _component.data_copy_mode or self.data_copy_mode + self._init = False + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, DataTransferCopyComponent), + } + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import DataTransferCopySchema + + return DataTransferCopySchema(context=context) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["type", "task", "data_copy_mode"] + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + for key, value in { + "componentId": self._get_component_id(), + "data_copy_mode": self.data_copy_mode, + }.items(): + if value is not None: + rest_obj[key] = value + return cast(dict, convert_ordered_dict_to_dict(rest_obj)) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> Any: + from .data_transfer_func import copy_data + + loaded_data = load_from_dict(DataTransferCopyJobSchema, data, context, additional_message, **kwargs) + data_transfer_job = copy_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + return data_transfer_job + + def _to_job(self) -> DataTransferCopyJob: + return DataTransferCopyJob( + experiment_name=self.experiment_name, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + status=self.status, + inputs=self._job_inputs, + outputs=self._job_outputs, + services=self.services, + compute=self.compute, + data_copy_mode=self.data_copy_mode, + ) + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> "DataTransferCopy": + """Call DataTransferCopy as a function will return a new instance each time. + + :return: A DataTransferCopy node + :rtype: DataTransferCopy + """ + if isinstance(self._component, Component): + # call this to validate inputs + node: DataTransferCopy = self._component(*args, **kwargs) + # merge inputs + for name, original_input in self.inputs.items(): + if name not in kwargs: + # use setattr here to make sure owner of input won't change + setattr(node.inputs, name, original_input._data) + node._job_inputs[name] = original_input._data + # get outputs + for name, original_output in self.outputs.items(): + # use setattr here to make sure owner of input won't change + if not isinstance(original_output, str): + setattr(node.outputs, name, original_output._data) + self._refine_optional_inputs_with_no_value(node, kwargs) + # set default values: compute, environment_variables, outputs + node._name = self.name + node.compute = self.compute + node.tags = self.tags + # Pass through the display name only if the display name is not system generated. + node.display_name = self.display_name if self.display_name != self.name else None + return node + msg = "copy_data can be called as a function only when referenced component is {}, currently got {}." + raise ValidationException( + message=msg.format(type(Component), self._component), + no_personal_data_message=msg.format(type(Component), "self._component"), + target=ErrorTarget.DATA_TRANSFER_JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + +@experimental +class DataTransferImport(DataTransfer): + """Base class for data transfer import node. + + You should not instantiate this class directly. Instead, you should + create from builder function: import_data. + + :param component: Id of the data transfer built in component to be run for the step + :type component: str + :param source: The data source of file system or database + :type source: Union[Dict, Database, FileSystem] + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Dict[str, Union[str, Output, dict]] + :param name: Name of the data transfer. + :type name: str + :param description: Description of the data transfer. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict[str, str] + :param display_name: Display name of the job. + :type display_name: str + :param experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + :type experiment_name: str + :param compute: The compute target the job runs on. + :type compute: str + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferImport cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + component: Union[str, DataTransferImportComponent], + compute: Optional[str] = None, + source: Optional[Union[Dict, Database, FileSystem]] = None, + outputs: Optional[Dict[str, Union[str, Output]]] = None, + **kwargs: Any, + ): + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + super(DataTransferImport, self).__init__( + component=component, + outputs=outputs, + compute=compute, + **kwargs, + ) + # init mark for _AttrDict + self._init = True + self.task = DataTransferTaskType.IMPORT_DATA + is_component = isinstance(component, DataTransferImportComponent) + if is_component: + _component: DataTransferImportComponent = cast(DataTransferImportComponent, component) + self.task = _component.task or self.task + self.source = _build_source_sink(source) + self._init = False + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, DataTransferImportComponent), + } + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import DataTransferImportSchema + + return DataTransferImportSchema(context=context) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["type", "task", "source"] + + def _customized_validate(self) -> MutableValidationResult: + result = super()._customized_validate() + if self.source is None: + result.append_error( + yaml_path="source", + message="Source is a required field for import data task in DataTransfer job", + ) + if len(self.outputs) != 1 or list(self.outputs.keys())[0] != "sink": + result.append_error( + yaml_path="outputs.sink", + message="Outputs field only support one output called sink in import task", + ) + if ( + "sink" in self.outputs + and not isinstance(self.outputs["sink"], str) + and isinstance(self.outputs["sink"]._data, Output) + ): + sink_output = self.outputs["sink"]._data + if self.source is not None: + + if (self.source.type == ExternalDataType.DATABASE and sink_output.type != AssetTypes.MLTABLE) or ( + self.source.type == ExternalDataType.FILE_SYSTEM and sink_output.type != AssetTypes.URI_FOLDER + ): + result.append_error( + yaml_path="outputs.sink.type", + message="Outputs field only support type {} for {} and {} for {}".format( + AssetTypes.MLTABLE, + ExternalDataType.DATABASE, + AssetTypes.URI_FOLDER, + ExternalDataType.FILE_SYSTEM, + ), + ) + return result + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + for key, value in { + "componentId": self._get_component_id(), + }.items(): + if value is not None: + rest_obj[key] = value + return cast(dict, convert_ordered_dict_to_dict(rest_obj)) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DataTransferImport": + from .data_transfer_func import import_data + + loaded_data = load_from_dict(DataTransferImportJobSchema, data, context, additional_message, **kwargs) + data_transfer_job: DataTransferImport = import_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + return data_transfer_job + + def _to_job(self) -> DataTransferImportJob: + return DataTransferImportJob( + experiment_name=self.experiment_name, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + status=self.status, + source=self.source, + outputs=self._job_outputs, + services=self.services, + compute=self.compute, + ) + + +@experimental +class DataTransferExport(DataTransfer): + """Base class for data transfer export node. + + You should not instantiate this class directly. Instead, you should + create from builder function: export_data. + + :param component: Id of the data transfer built in component to be run for the step + :type component: str + :param sink: The sink of external data and databases. + :type sink: Union[Dict, Database, FileSystem] + :param inputs: Mapping of input data bindings used in the job. + :type inputs: Dict[str, Union[NodeOutput, Input, str, Input]] + :param name: Name of the data transfer. + :type name: str + :param description: Description of the data transfer. + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated. + :type tags: dict[str, str] + :param display_name: Display name of the job. + :type display_name: str + :param experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + :type experiment_name: str + :param compute: The compute target the job runs on. + :type compute: str + :raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferExport cannot be successfully validated. + Details will be provided in the error message. + """ + + def __init__( + self, + *, + component: Union[str, DataTransferCopyComponent, DataTransferImportComponent], + compute: Optional[str] = None, + sink: Optional[Union[Dict, Database, FileSystem]] = None, + inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None, + **kwargs: Any, + ): + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + super(DataTransferExport, self).__init__( + component=component, + inputs=inputs, + compute=compute, + **kwargs, + ) + # init mark for _AttrDict + self._init = True + self.task = DataTransferTaskType.EXPORT_DATA + is_component = isinstance(component, DataTransferExportComponent) + if is_component: + _component: DataTransferExportComponent = cast(DataTransferExportComponent, component) + self.task = _component.task or self.task + self.sink = sink + self._init = False + + @property + def sink(self) -> Optional[Union[Dict, Database, FileSystem]]: + """The sink of external data and databases. + + :return: The sink of external data and databases. + :rtype: Union[None, Database, FileSystem] + """ + return self._sink + + @sink.setter + def sink(self, value: Union[Dict, Database, FileSystem]) -> None: + self._sink = _build_source_sink(value) + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, DataTransferExportComponent), + } + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import DataTransferExportSchema + + return DataTransferExportSchema(context=context) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return ["type", "task", "sink"] + + def _customized_validate(self) -> MutableValidationResult: + result = super()._customized_validate() + if self.sink is None: + result.append_error( + yaml_path="sink", + message="Sink is a required field for export data task in DataTransfer job", + ) + if len(self.inputs) != 1 or list(self.inputs.keys())[0] != "source": + result.append_error( + yaml_path="inputs.source", + message="Inputs field only support one input called source in export task", + ) + if "source" in self.inputs and isinstance(self.inputs["source"]._data, Input): + source_input = self.inputs["source"]._data + if self.sink is not None and not isinstance(self.sink, Dict): + if (self.sink.type == ExternalDataType.DATABASE and source_input.type != AssetTypes.URI_FILE) or ( + self.sink.type == ExternalDataType.FILE_SYSTEM and source_input.type != AssetTypes.URI_FOLDER + ): + result.append_error( + yaml_path="inputs.source.type", + message="Inputs field only support type {} for {} and {} for {}".format( + AssetTypes.URI_FILE, + ExternalDataType.DATABASE, + AssetTypes.URI_FOLDER, + ExternalDataType.FILE_SYSTEM, + ), + ) + + return result + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj = super()._to_rest_object(**kwargs) + for key, value in { + "componentId": self._get_component_id(), + }.items(): + if value is not None: + rest_obj[key] = value + return cast(dict, convert_ordered_dict_to_dict(rest_obj)) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DataTransferExport": + from .data_transfer_func import export_data + + loaded_data = load_from_dict(DataTransferExportJobSchema, data, context, additional_message, **kwargs) + data_transfer_job: DataTransferExport = export_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + return data_transfer_job + + def _to_job(self) -> DataTransferExportJob: + return DataTransferExportJob( + experiment_name=self.experiment_name, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + status=self.status, + sink=self.sink, + inputs=self._job_inputs, + services=self.services, + compute=self.compute, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer_func.py new file mode 100644 index 00000000..423c125b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/data_transfer_func.py @@ -0,0 +1,335 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access + +from typing import Any, Callable, Dict, Optional, Tuple, Union + +from azure.ai.ml._utils._experimental import experimental +from azure.ai.ml.constants._common import AssetTypes, LegacyAssetTypes +from azure.ai.ml.constants._component import ComponentSource, DataTransferBuiltinComponentUri, ExternalDataType +from azure.ai.ml.entities._builders.base_node import pipeline_node_decorator +from azure.ai.ml.entities._component.datatransfer_component import DataTransferCopyComponent +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem +from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin +from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput +from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException + +from .data_transfer import DataTransferCopy, DataTransferExport, DataTransferImport, _build_source_sink + +SUPPORTED_INPUTS = [ + LegacyAssetTypes.PATH, + AssetTypes.URI_FILE, + AssetTypes.URI_FOLDER, + AssetTypes.CUSTOM_MODEL, + AssetTypes.MLFLOW_MODEL, + AssetTypes.MLTABLE, + AssetTypes.TRITON_MODEL, +] + + +def _parse_input(input_value: Union[Input, dict, str, PipelineInput, NodeOutput]) -> Tuple: + component_input = None + job_input: Union[Input, dict, str, PipelineInput, NodeOutput] = "" + + if isinstance(input_value, Input): + component_input = Input(**input_value._to_dict()) + input_type = input_value.type + if input_type in SUPPORTED_INPUTS: + job_input = Input(**input_value._to_dict()) + elif isinstance(input_value, dict): + # if user provided dict, we try to parse it to Input. + # for job input, only parse for path type + input_type = input_value.get("type", None) + if input_type in SUPPORTED_INPUTS: + job_input = Input(**input_value) + component_input = Input(**input_value) + elif isinstance(input_value, str): + # Input bindings + component_input = ComponentTranslatableMixin._to_input_builder_function(input_value) + job_input = input_value + elif isinstance(input_value, (PipelineInput, NodeOutput)): + data: Any = None + # datatransfer node can accept PipelineInput/NodeOutput for export task. + if input_value._data is None or isinstance(input_value._data, Output): + data = Input(type=input_value.type, mode=input_value.mode) + else: + data = input_value._data + component_input, _ = _parse_input(data) + job_input = input_value + else: + msg = ( + f"Unsupported input type: {type(input_value)}, only Input, dict, str, PipelineInput and NodeOutput are " + f"supported." + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + return component_input, job_input + + +def _parse_output(output_value: Union[Output, Dict]) -> Tuple: + component_output = None + job_output: Union[Output, Dict] = {} + + if isinstance(output_value, Output): + component_output = Output(**output_value._to_dict()) + job_output = Output(**output_value._to_dict()) + elif not output_value: + # output value can be None or empty dictionary + # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder + component_output = ComponentTranslatableMixin._to_output(output_value) + job_output = output_value + elif isinstance(output_value, dict): # When output value is a non-empty dictionary + job_output = Output(**output_value) + component_output = Output(**output_value) + elif isinstance(output_value, str): # When output is passed in from pipeline job yaml + job_output = output_value + else: + msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + return component_output, job_output + + +def _parse_inputs_outputs(io_dict: Optional[Dict], parse_func: Callable) -> Tuple[Dict, Dict]: + component_io_dict, job_io_dict = {}, {} + if io_dict: + for key, val in io_dict.items(): + component_io, job_io = parse_func(val) + component_io_dict[key] = component_io + job_io_dict[key] = job_io + return component_io_dict, job_io_dict + + +@experimental +def copy_data( + *, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + experiment_name: Optional[str] = None, + compute: Optional[str] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + is_deterministic: bool = True, + data_copy_mode: Optional[str] = None, + **kwargs: Any, +) -> DataTransferCopy: + """Create a DataTransferCopy object which can be used inside dsl.pipeline as a function. + + :keyword name: The name of the job. + :paramtype name: str + :keyword description: Description of the job. + :paramtype description: str + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. + :paramtype tags: dict[str, str] + :keyword display_name: Display name of the job. + :paramtype display_name: str + :keyword experiment_name: Name of the experiment the job will be created under. + :paramtype experiment_name: str + :keyword compute: The compute resource the job runs on. + :paramtype compute: str + :keyword inputs: Mapping of inputs data bindings used in the job. + :paramtype inputs: dict + :keyword outputs: Mapping of outputs data bindings used in the job. + :paramtype outputs: dict + :keyword is_deterministic: Specify whether the command will return same output given same input. + If a command (component) is deterministic, when use it as a node/step in a pipeline, + it will reuse results from a previous submitted job in current workspace which has same inputs and settings. + In this case, this step will not use any compute resource. + Default to be True, specify is_deterministic=False if you would like to avoid such reuse behavior. + :paramtype is_deterministic: bool + :keyword data_copy_mode: data copy mode in copy task, possible value is "merge_with_overwrite", "fail_if_conflict". + :paramtype data_copy_mode: str + :return: A DataTransferCopy object. + :rtype: ~azure.ai.ml.entities._component.datatransfer_component.DataTransferCopyComponent + """ + inputs = inputs or {} + outputs = outputs or {} + component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) + # job inputs can not be None + job_inputs = {k: v for k, v in job_inputs.items() if v is not None} + component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) + component = kwargs.pop("component", None) + if component is None: + component = DataTransferCopyComponent( + name=name, + tags=tags, + display_name=display_name, + description=description, + inputs=component_inputs, + outputs=component_outputs, + data_copy_mode=data_copy_mode, + _source=ComponentSource.BUILDER, + is_deterministic=is_deterministic, + **kwargs, + ) + data_transfer_copy_obj = DataTransferCopy( + component=component, + name=name, + description=description, + tags=tags, + display_name=display_name, + experiment_name=experiment_name, + compute=compute, + inputs=job_inputs, + outputs=job_outputs, + data_copy_mode=data_copy_mode, + **kwargs, + ) + return data_transfer_copy_obj + + +@experimental +@pipeline_node_decorator +def import_data( + *, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + experiment_name: Optional[str] = None, + compute: Optional[str] = None, + source: Optional[Union[Dict, Database, FileSystem]] = None, + outputs: Optional[Dict] = None, + **kwargs: Any, +) -> DataTransferImport: + """Create a DataTransferImport object which can be used inside dsl.pipeline. + + :keyword name: The name of the job. + :paramtype name: str + :keyword description: Description of the job. + :paramtype description: str + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. + :paramtype tags: dict[str, str] + :keyword display_name: Display name of the job. + :paramtype display_name: str + :keyword experiment_name: Name of the experiment the job will be created under. + :paramtype experiment_name: str + :keyword compute: The compute resource the job runs on. + :paramtype compute: str + :keyword source: The data source of file system or database. + :paramtype source: Union[Dict, ~azure.ai.ml.entities._inputs_outputs.external_data.Database, + ~azure.ai.ml.entities._inputs_outputs.external_data.FileSystem] + :keyword outputs: Mapping of outputs data bindings used in the job. + The default will be an output port with the key "sink" and type "mltable". + :paramtype outputs: dict + :return: A DataTransferImport object. + :rtype: ~azure.ai.ml.entities._job.pipeline._component_translatable.DataTransferImport + """ + source = _build_source_sink(source) + outputs = outputs or {"sink": Output(type=AssetTypes.MLTABLE)} + # # job inputs can not be None + # job_inputs = {k: v for k, v in job_inputs.items() if v is not None} + _, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) + component = kwargs.pop("component", None) + update_source = False + if component is None: + if source and source.type == ExternalDataType.DATABASE: + component = DataTransferBuiltinComponentUri.IMPORT_DATABASE + else: + component = DataTransferBuiltinComponentUri.IMPORT_FILE_SYSTEM + update_source = True + + data_transfer_import_obj = DataTransferImport( + component=component, + name=name, + description=description, + tags=tags, + display_name=display_name, + experiment_name=experiment_name, + compute=compute, + source=source, + outputs=job_outputs, + **kwargs, + ) + if update_source: + data_transfer_import_obj._source = ComponentSource.BUILTIN + + return data_transfer_import_obj + + +@experimental +@pipeline_node_decorator +def export_data( + *, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + experiment_name: Optional[str] = None, + compute: Optional[str] = None, + sink: Optional[Union[Dict, Database, FileSystem]] = None, + inputs: Optional[Dict] = None, + **kwargs: Any, +) -> DataTransferExport: + """Create a DataTransferExport object which can be used inside dsl.pipeline. + + :keyword name: The name of the job. + :paramtype name: str + :keyword description: Description of the job. + :paramtype description: str + :keyword tags: Tag dictionary. Tags can be added, removed, and updated. + :paramtype tags: dict[str, str] + :keyword display_name: Display name of the job. + :paramtype display_name: str + :keyword experiment_name: Name of the experiment the job will be created under. + :paramtype experiment_name: str + :keyword compute: The compute resource the job runs on. + :paramtype compute: str + :keyword sink: The sink of external data and databases. + :paramtype sink: Union[ + Dict, + ~azure.ai.ml.entities._inputs_outputs.external_data.Database, + ~azure.ai.ml.entities._inputs_outputs.external_data.FileSystem] + :keyword inputs: Mapping of inputs data bindings used in the job. + :paramtype inputs: dict + :return: A DataTransferExport object. + :rtype: ~azure.ai.ml.entities._job.pipeline._component_translatable.DataTransferExport + :raises ValidationException: If sink is not provided or exporting file system is not supported. + """ + sink = _build_source_sink(sink) + _, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) + # job inputs can not be None + job_inputs = {k: v for k, v in job_inputs.items() if v is not None} + component = kwargs.pop("component", None) + update_source = False + if component is None: + if sink and sink.type == ExternalDataType.DATABASE: + component = DataTransferBuiltinComponentUri.EXPORT_DATABASE + else: + msg = "Sink is a required field for export data task and we don't support exporting file system for now." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + update_source = True + + data_transfer_export_obj = DataTransferExport( + component=component, + name=name, + description=description, + tags=tags, + display_name=display_name, + experiment_name=experiment_name, + compute=compute, + sink=sink, + inputs=job_inputs, + **kwargs, + ) + if update_source: + data_transfer_export_obj._source = ComponentSource.BUILTIN + + return data_transfer_export_obj diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/do_while.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/do_while.py new file mode 100644 index 00000000..ecfd51ca --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/do_while.py @@ -0,0 +1,357 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import logging +from typing import Any, Dict, Optional, Union + +from typing_extensions import Literal + +from azure.ai.ml._schema.pipeline.control_flow_job import DoWhileSchema +from azure.ai.ml.constants._component import DO_WHILE_MAX_ITERATION, ControlFlowType +from azure.ai.ml.entities._job.job_limits import DoWhileJobLimits +from azure.ai.ml.entities._job.pipeline._io import InputOutputBase, NodeInput, NodeOutput +from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob +from azure.ai.ml.entities._validation import MutableValidationResult + +from .._util import load_from_dict, validate_attribute_type +from .base_node import BaseNode +from .control_flow_node import LoopNode +from .pipeline import Pipeline + +module_logger = logging.getLogger(__name__) + + +class DoWhile(LoopNode): + """Do-while loop node in the pipeline job. By specifying the loop body and loop termination condition in this class, + a job-level do while loop can be implemented. It will be initialized when calling dsl.do_while or when loading the + pipeline yml containing do_while node. Please do not manually initialize this class. + + :param body: Pipeline job for the do-while loop body. + :type body: ~azure.ai.ml.entities._builders.pipeline.Pipeline + :param condition: Boolean type control output of body as do-while loop condition. + :type condition: ~azure.ai.ml.entities.Output + :param mapping: Output-Input mapping for each round of the do-while loop. + Key is the last round output of the body. Value is the input port for the current body. + :type mapping: dict[Union[str, ~azure.ai.ml.entities.Output], + Union[str, ~azure.ai.ml.entities.Input, list]] + :param limits: Limits in running the do-while node. + :type limits: Union[dict, ~azure.ai.ml.entities._job.job_limits.DoWhileJobLimits] + :raises ValidationError: If the initialization parameters are not of valid types. + """ + + def __init__( + self, + *, + body: Union[Pipeline, BaseNode], + condition: Optional[Union[str, NodeInput, NodeOutput]], + mapping: Dict, + limits: Optional[Union[dict, DoWhileJobLimits]] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + + kwargs.pop("type", None) + super(DoWhile, self).__init__( + type=ControlFlowType.DO_WHILE, + body=body, + **kwargs, + ) + + # init mark for _AttrDict + self._init = True + self._mapping = mapping or {} + self._condition = condition + self._limits = limits + self._init = False + + @property + def mapping(self) -> Dict: + """Get the output-input mapping for each round of the do-while loop. + + :return: Output-Input mapping for each round of the do-while loop. + :rtype: dict[Union[str, ~azure.ai.ml.entities.Output], + Union[str, ~azure.ai.ml.entities.Input, list]] + """ + return self._mapping + + @property + def condition(self) -> Optional[Union[str, NodeInput, NodeOutput]]: + """Get the boolean type control output of the body as the do-while loop condition. + + :return: Control output of the body as the do-while loop condition. + :rtype: ~azure.ai.ml.entities.Output + """ + return self._condition + + @property + def limits(self) -> Union[Dict, DoWhileJobLimits, None]: + """Get the limits in running the do-while node. + + :return: Limits in running the do-while node. + :rtype: Union[dict, ~azure.ai.ml.entities._job.job_limits.DoWhileJobLimits] + """ + return self._limits + + @classmethod + def _attr_type_map(cls) -> dict: + return { + **super(DoWhile, cls)._attr_type_map(), + "mapping": dict, + "limits": (dict, DoWhileJobLimits), + } + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DoWhile": + loaded_data = load_from_dict(DoWhileSchema, data, context, additional_message, **kwargs) + + return cls(**loaded_data) + + @classmethod + def _get_port_obj( + cls, body: BaseNode, port_name: str, is_input: bool = True, validate_port: bool = True + ) -> Union[str, NodeInput, NodeOutput]: + if is_input: + port = body.inputs.get(port_name, None) + else: + port = body.outputs.get(port_name, None) + if port is None: + if validate_port: + raise cls._create_validation_error( + message=f"Cannot find {port_name} in do_while loop body {'inputs' if is_input else 'outputs'}.", + no_personal_data_message=f"Miss port in do_while loop body {'inputs' if is_input else 'outputs'}.", + ) + return port_name + + res: Union[str, NodeInput, NodeOutput] = port + return res + + @classmethod + def _create_instance_from_schema_dict( + cls, pipeline_jobs: Dict[str, BaseNode], loaded_data: Dict, validate_port: bool = True + ) -> "DoWhile": + """Create a do_while instance from schema parsed dict. + + :param pipeline_jobs: The pipeline jobs + :type pipeline_jobs: Dict[str, BaseNode] + :param loaded_data: The loaded data + :type loaded_data: Dict + :param validate_port: Whether to raise if inputs/outputs are not present. Defaults to True + :type validate_port: bool + :return: The DoWhile node + :rtype: DoWhile + """ + + # Get body object from pipeline job list. + body_name = cls._get_data_binding_expression_value(loaded_data.pop("body"), regex=r"\{\{.*\.jobs\.(.*)\}\}") + body = cls._get_body_from_pipeline_jobs(pipeline_jobs, body_name) + + # Convert mapping key-vault to input/output object + mapping = {} + for k, v in loaded_data.pop("mapping", {}).items(): + output_name = cls._get_data_binding_expression_value(k, regex=r"\{\{.*\.%s\.outputs\.(.*)\}\}" % body_name) + input_names = v if isinstance(v, list) else [v] + input_names = [ + cls._get_data_binding_expression_value(item, regex=r"\{\{.*\.%s\.inputs\.(.*)\}\}" % body_name) + for item in input_names + ] + mapping[output_name] = [cls._get_port_obj(body, item, validate_port=validate_port) for item in input_names] + + limits = loaded_data.pop("limits", None) + + if "condition" in loaded_data: + # Convert condition to output object + condition_name = cls._get_data_binding_expression_value( + loaded_data.pop("condition"), regex=r"\{\{.*\.%s\.outputs\.(.*)\}\}" % body_name + ) + condition_value = cls._get_port_obj(body, condition_name, is_input=False, validate_port=validate_port) + else: + condition_value = None + + do_while_instance = DoWhile( + body=body, + mapping=mapping, + condition=condition_value, + **loaded_data, + ) + do_while_instance.set_limits(**limits) + + return do_while_instance + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> DoWhileSchema: + return DoWhileSchema(context=context) + + @classmethod + def _from_rest_object(cls, obj: dict, pipeline_jobs: dict) -> "DoWhile": + # pylint: disable=protected-access + + obj = BaseNode._from_rest_object_to_init_params(obj) + return cls._create_instance_from_schema_dict(pipeline_jobs, obj, validate_port=False) + + def set_limits( + self, + *, + max_iteration_count: int, + # pylint: disable=unused-argument + **kwargs: Any, + ) -> None: + """ + Set the maximum iteration count for the do-while job. + + The range of the iteration count is (0, 1000]. + + :keyword max_iteration_count: The maximum iteration count for the do-while job. + :paramtype max_iteration_count: int + """ + if isinstance(self.limits, DoWhileJobLimits): + self.limits._max_iteration_count = max_iteration_count # pylint: disable=protected-access + else: + self._limits = DoWhileJobLimits(max_iteration_count=max_iteration_count) + + def _customized_validate(self) -> MutableValidationResult: + validation_result = self._validate_loop_condition() + validation_result.merge_with(self._validate_body()) + validation_result.merge_with(self._validate_do_while_limit()) + validation_result.merge_with(self._validate_body_output_mapping()) + return validation_result + + def _validate_port( + self, + port: Union[str, NodeInput, NodeOutput], + node_ports: Dict[str, Union[NodeInput, NodeOutput]], + port_type: Literal["input", "output"], + yaml_path: str, + ) -> MutableValidationResult: + """Validate input/output port is exist in the dowhile body. + + :param port: Either: + * The name of an input or output + * An input object + * An output object + :type port: Union[str, NodeInput, NodeOutput], + :param node_ports: The node input/outputs + :type node_ports: Union[Dict[str, Union[NodeInput, NodeOutput]]] + :param port_type: The port type + :type port_type: Literal["input", "output"], + :param yaml_path: The yaml path + :type yaml_path: str, + :return: The validation result + :rtype: MutableValidationResult + """ + validation_result = self._create_empty_validation_result() + if isinstance(port, str): + port_obj = node_ports.get(port, None) + else: + port_obj = port + if ( + port_obj is not None + and port_obj._owner is not None # pylint: disable=protected-access + and not isinstance(port_obj._owner, PipelineJob) # pylint: disable=protected-access + and port_obj._owner._instance_id != self.body._instance_id # pylint: disable=protected-access + ): + # Check the port owner is dowhile body. + validation_result.append_error( + yaml_path=yaml_path, + message=( + f"{port_obj._port_name} is the {port_type} of {port_obj._owner.name}, " # pylint: disable=protected-access + f"dowhile only accept {port_type} of the body: {self.body.name}." + ), + ) + elif port_obj is None or port_obj._port_name not in node_ports: # pylint: disable=protected-access + # Check port is exist in dowhile body. + validation_result.append_error( + yaml_path=yaml_path, + message=( + f"The {port_type} of mapping {port_obj._port_name if port_obj else port} does not " # pylint: disable=protected-access + f"exist in {self.body.name} {port_type}, existing {port_type}: {node_ports.keys()}" + ), + ) + return validation_result + + def _validate_loop_condition(self) -> MutableValidationResult: + # pylint: disable=protected-access + validation_result = self._create_empty_validation_result() + if self.condition is not None: + # Check condition exists in dowhile body. + validation_result.merge_with( + self._validate_port(self.condition, self.body.outputs, port_type="output", yaml_path="condition") + ) + if validation_result.passed: + # Check condition is a control output. + condition_name = self.condition if isinstance(self.condition, str) else self.condition._port_name + if not self.body._outputs[condition_name]._is_primitive_type: + validation_result.append_error( + yaml_path="condition", + message=( + f"{condition_name} is not a control output and is not primitive type. " + "The condition of dowhile must be the control output or primitive type of the body." + ), + ) + return validation_result + + def _validate_do_while_limit(self) -> MutableValidationResult: + validation_result = self._create_empty_validation_result() + if isinstance(self.limits, DoWhileJobLimits): + if not self.limits or self.limits.max_iteration_count is None: + return validation_result + if isinstance(self.limits.max_iteration_count, InputOutputBase): + validation_result.append_error( + yaml_path="limit.max_iteration_count", + message="The max iteration count cannot be linked with an primitive type input.", + ) + elif self.limits.max_iteration_count > DO_WHILE_MAX_ITERATION or self.limits.max_iteration_count < 0: + validation_result.append_error( + yaml_path="limit.max_iteration_count", + message=f"The max iteration count cannot be less than 0 or larger than {DO_WHILE_MAX_ITERATION}.", + ) + return validation_result + + def _validate_body_output_mapping(self) -> MutableValidationResult: + # pylint disable=protected-access + validation_result = self._create_empty_validation_result() + if not isinstance(self.mapping, dict): + validation_result.append_error( + yaml_path="mapping", message=f"Mapping expects a dict type but passes in a {type(self.mapping)} type." + ) + else: + # Record the mapping relationship between input and output + input_output_mapping: Dict = {} + # Validate mapping input&output should come from while body + for output, inputs in self.mapping.items(): + # pylint: disable=protected-access + output_name = output if isinstance(output, str) else output._port_name + validate_results = self._validate_port( + output, self.body.outputs, port_type="output", yaml_path="mapping" + ) + if validate_results.passed: + is_primitive_output = self.body._outputs[output_name]._is_primitive_type + inputs = inputs if isinstance(inputs, list) else [inputs] + for item in inputs: + input_validate_results = self._validate_port( + item, self.body.inputs, port_type="input", yaml_path="mapping" + ) + validation_result.merge_with(input_validate_results) + # pylint: disable=protected-access + input_name = item if isinstance(item, str) else item._port_name + input_output_mapping[input_name] = input_output_mapping.get(input_name, []) + [output_name] + is_primitive_type = self.body._inputs[input_name]._meta._is_primitive_type + + if input_validate_results.passed and not is_primitive_output and is_primitive_type: + validate_results.append_error( + yaml_path="mapping", + message=( + f"{output_name} is a non-primitive type output and {input_name} " + "is a primitive input. Non-primitive type output cannot be connected " + "to an a primitive type input." + ), + ) + + validation_result.merge_with(validate_results) + # Validate whether input is linked to multiple outputs + for _input, outputs in input_output_mapping.items(): + if len(outputs) > 1: + validation_result.append_error( + yaml_path="mapping", message=f"Input {_input} has been linked to multiple outputs {outputs}." + ) + return validation_result diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/fl_scatter_gather.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/fl_scatter_gather.py new file mode 100644 index 00000000..0ad6b0e2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/fl_scatter_gather.py @@ -0,0 +1,886 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import re +from typing import Any, Dict, List, Optional, Tuple, Union + +from azure.ai.ml import Output +from azure.ai.ml._schema import PathAwareSchema +from azure.ai.ml._schema.pipeline.control_flow_job import FLScatterGatherSchema +from azure.ai.ml.constants import JobType +from azure.ai.ml.constants._common import AssetTypes +from azure.ai.ml.dsl import pipeline +from azure.ai.ml.dsl._do_while import do_while +from azure.ai.ml.entities._assets.federated_learning_silo import FederatedLearningSilo +from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode +from azure.ai.ml.entities._builders.pipeline import Pipeline +from azure.ai.ml.entities._component.command_component import CommandComponent +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._inputs_outputs.input import Input +from azure.ai.ml.entities._job.pipeline._io.mixin import NodeIOMixin +from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob +from azure.ai.ml.entities._util import convert_ordered_dict_to_dict +from azure.ai.ml.entities._validation import MutableValidationResult + +from .subcomponents import create_scatter_output_table + +# TODO 2293610: add support for more types of outputs besides uri_folder and mltable +# Likely types that ought to be mergeable: string, int, uri_file +MERGE_COMPONENT_MAPPING = { + "mltable": create_scatter_output_table, + "uri_folder": create_scatter_output_table, +} + + +ANCHORABLE_OUTPUT_TYPES = {AssetTypes.MLTABLE, AssetTypes.URI_FOLDER} + +ANCHORING_PATH_ROOT = "root" + + +# big TODO: For some reason, surfacing this file in __init__.py causes +# a circular import exception on the first attempted import +# In notebooks, the second import succeeds, but then causes a silent failure where the +# MLDesigner component created by the subcomponents.create_scatter_output_table function +# will produce a ComponentExecutor object instead of the actual component. +# TODO 2293541: Add telemetry of some sort +# pylint: disable=too-many-instance-attributes +class FLScatterGather(ControlFlowNode, NodeIOMixin): + """A node which creates a federated learning scatter-gather loop as a pipeline subgraph. + Intended for use inside a pipeline job. This is initialized when calling + `dsl.fl_scatter_gather()` or when loading a serialized version of this node from YAML. + Please do not manually initialize this class. + + :param silo_configs: List of federated learning silo configurations. + :type silo_configs: List[~azure.ai.ml.entities._assets.federated_learning_silo.FederatedLearningSilo] + :param silo_component: Component representing the silo for federated learning. + :type silo_component: ~azure.ai.ml.entities.Component + :param aggregation_component: Component representing the aggregation step. + :type aggregation_component: ~azure.ai.ml.entities.Component + :param aggregation_compute: The compute resource for the aggregation step. + :type aggregation_compute: str + :param aggregation_datastore: The datastore for the aggregation step. + :type aggregation_datastore: str + :param shared_silo_kwargs: Keyword arguments shared across all silos. + :type shared_silo_kwargs: dict + :param aggregation_kwargs: Keyword arguments specific to the aggregation step. + :type aggregation_kwargs: dict + :param silo_to_aggregation_argument_map: Mapping of silo to aggregation arguments. + :type silo_to_aggregation_argument_map: dict + :param aggregation_to_silo_argument_map: Mapping of aggregation to silo arguments. + :type aggregation_to_silo_argument_map: dict + :param max_iterations: The maximum number of iterations for the scatter-gather loop. + :type max_iterations: int + :param create_default_mappings_if_needed: Whether to create default argument mappings if needed. + :type create_default_mappings_if_needed: bool + """ + + # See node class for input descriptions, no point maintaining + # double descriptions between a wrapper its interior. + def __init__( + self, + *, + silo_configs: List[FederatedLearningSilo], + silo_component: Component, + aggregation_component: Component, + aggregation_compute: Optional[str] = None, + aggregation_datastore: Optional[str] = None, + shared_silo_kwargs: Optional[Dict] = None, + aggregation_kwargs: Optional[Dict] = None, + silo_to_aggregation_argument_map: Optional[Dict] = None, + aggregation_to_silo_argument_map: Optional[Dict] = None, + max_iterations: int = 1, + create_default_mappings_if_needed: bool = False, + **kwargs: Any, + ) -> None: + # auto-create X_to_Y_argument_map values if allowed and needed. + if create_default_mappings_if_needed: + ( + silo_to_aggregation_argument_map, + aggregation_to_silo_argument_map, + ) = FLScatterGather._try_create_default_mappings( + silo_component, + aggregation_component, + silo_to_aggregation_argument_map, + aggregation_to_silo_argument_map, + ) + + # input validation. + FLScatterGather.validate_inputs( + silo_configs=silo_configs, + silo_component=silo_component, + aggregation_component=aggregation_component, + shared_silo_kwargs=shared_silo_kwargs, + aggregation_compute=aggregation_compute, + aggregation_datastore=aggregation_datastore, + aggregation_kwargs=aggregation_kwargs, + silo_to_aggregation_argument_map=silo_to_aggregation_argument_map, + aggregation_to_silo_argument_map=aggregation_to_silo_argument_map, + max_iterations=max_iterations, + ) + + # store inputs + self.silo_configs = silo_configs + self.aggregation_compute = aggregation_compute + self.aggregation_datastore = aggregation_datastore + self.silo_component = silo_component + self.aggregation_component = aggregation_component + self.shared_silo_kwargs = shared_silo_kwargs + self.aggregation_kwargs = aggregation_kwargs + self.silo_to_aggregation_argument_map = silo_to_aggregation_argument_map + self.aggregation_to_silo_argument_map = aggregation_to_silo_argument_map + self.max_iterations = max_iterations + self._init = True # Needed by parent class to work properly + + self.scatter_gather_graph = self.scatter_gather() + + # set SG node flag for telemetry + self.scatter_gather_graph.properties["azureml.telemetry.attribution"] = "FederatedLearningSGJobFlag" + self.scatter_gather_graph._to_rest_object() + + # set output to final aggregation step's output + self._outputs = self.scatter_gather_graph.outputs + super(FLScatterGather, self).__init__( + type=JobType.COMPONENT, + component=None, + inputs=None, + outputs=self.scatter_gather_graph.outputs, + name=None, + display_name=None, + description=None, + tags=None, + properties=None, + comment=None, + compute=None, + experiment_name=None, + ) + + def scatter_gather(self) -> PipelineJob: + """Executes the scatter-gather loop by creating and executing a pipeline subgraph. + Returns the outputs of the final aggregation step. + + :return: Outputs of the final aggregation step. + :rtype: list[~azure.ai.ml.Output] + """ + + @pipeline( + func=None, + name="Scatter gather", + description="It includes all steps that need to be executed in silo and aggregation", + ) + # pylint: disable-next=docstring-missing-return,docstring-missing-rtype + def scatter_gather_iteration_body(**silo_inputs: Input) -> PipelineJob: + """ + Performs a scatter-gather iteration by running copies of the silo step on different + computes/datstores according to this node's silo configs. The outputs of these + silo components are then merged by an internal helper component. The merged values + are then inputted into the user-provided aggregation component. Returns the executed aggregation component. + + Kwargs are a dictionary of names and Inputs to be injected into each executed silo step. This dictionary is + merged with silo-specific inputs before each executed. + """ + + silo_outputs = [] + # TODO 2293586 replace this for-loop with a parallel-for node + for silo_config in self.silo_configs: + silo_inputs.update(silo_config.inputs) + executed_silo_component = self.silo_component(**silo_inputs) + for v, k in executed_silo_component.inputs.items(): + if v in silo_config.inputs and k.type == "uri_folder": + k.mode = "ro_mount" + FLScatterGather._anchor_step( + pipeline_step=executed_silo_component, + compute=silo_config.compute, + internal_datastore=silo_config.datastore, + orchestrator_datastore=self.aggregation_datastore, + ) + # add to silo outputs list + silo_outputs.append(executed_silo_component) + + # produce internal argument-merging components and record them in local subgraph + merge_comp_mapping = self._inject_merge_components(silo_outputs) + + # produce aggregate step inputs by merging static kwargs and mapped arguments from + # internal merge components + agg_inputs: Dict = {} + if self.aggregation_kwargs is not None: + agg_inputs.update(self.aggregation_kwargs) + internal_merge_outputs = { + self._get_aggregator_input_name(k): v.outputs.aggregated_output for k, v in merge_comp_mapping.items() + } + agg_inputs.update(internal_merge_outputs) + + # run the user aggregation step + executed_aggregation_component = self.aggregation_component(**agg_inputs) + # Set mode of aggregated mltable inputs as eval mount to allow files referenced within the table + # to be accessible by the component + for name, agg_input in executed_aggregation_component.inputs.items(): + if ( + self.silo_to_aggregation_argument_map is not None + and name in self.silo_to_aggregation_argument_map.values() + and agg_input.type == "mltable" + ): + agg_input.mode = "eval_download" + + # Anchor both the internal merge components and the user-supplied aggregation step + # to the aggregation compute and datastore + if self.aggregation_compute is not None and self.aggregation_datastore is not None: + # internal merge component is also siloed to wherever the aggregation component lives. + for executed_merge_component in merge_comp_mapping.values(): + FLScatterGather._anchor_step( + pipeline_step=executed_merge_component, + compute=self.aggregation_compute, + internal_datastore=self.aggregation_datastore, + orchestrator_datastore=self.aggregation_datastore, + ) + FLScatterGather._anchor_step( + pipeline_step=executed_aggregation_component, + compute=self.aggregation_compute, + internal_datastore=self.aggregation_datastore, + orchestrator_datastore=self.aggregation_datastore, + ) + res: PipelineJob = executed_aggregation_component.outputs + return res + + @pipeline(func=None, name="Scatter gather graph") + # pylint: disable-next=docstring-missing-return,docstring-missing-rtype + def create_scatter_gather_graph() -> PipelineJob: + """ + Creates a scatter-gather graph by executing the scatter_gather_iteration_body + function in a do-while loop. The loop terminates when the user-supplied + termination condition is met. + """ + + silo_inputs: Dict = {} + if self.shared_silo_kwargs is not None: + # Start with static inputs + silo_inputs.update(self.shared_silo_kwargs) + + # merge in inputs passed in from previous iteration's aggregate step) + if self.aggregation_to_silo_argument_map is not None: + silo_inputs.update({v: None for v in self.aggregation_to_silo_argument_map.values()}) + + scatter_gather_body = scatter_gather_iteration_body(**silo_inputs) + + # map aggregation outputs to scatter inputs + if self.aggregation_to_silo_argument_map is not None: + do_while_mapping = { + k: getattr(scatter_gather_body.inputs, v) for k, v in self.aggregation_to_silo_argument_map.items() + } + + do_while( + body=scatter_gather_body, # type: ignore[arg-type] + mapping=do_while_mapping, # pylint: disable=possibly-used-before-assignment + max_iteration_count=self.max_iterations, + ) + res_scatter: PipelineJob = scatter_gather_body.outputs # type: ignore[assignment] + return res_scatter + + res: PipelineJob = create_scatter_gather_graph() + return res + + @classmethod + def _get_fl_datastore_path( + cls, + datastore_name: Optional[str], + output_name: str, + unique_id: str = "${{name}}", + iteration_num: Optional[int] = None, + ) -> str: + """Construct a path string using the inputted values. The important aspect is that this produces a + path with a specified datastore. + + :param datastore_name: The datastore to use in the constructed path. + :type datastore_name: str + :param output_name: The name of the output value that this path is assumed to belong to. + Is injected into the path. + :type output_name: str + :param unique_id: An additional string to inject if needed. Defaults to ${{name}}, which is the + output name again. + :type unique_id: str + :param iteration_num: The iteration number of the current scatter-gather iteration. + If set, inject this into the resulting path string. + :type iteration_num: Optional[int] + :return: A data path string containing the various aforementioned inputs. + :rtype: str + + """ + data_path = f"azureml://datastores/{datastore_name}/paths/federated_learning/{output_name}/{unique_id}/" + if iteration_num: + data_path += f"iteration_{iteration_num}/" + return data_path + + @classmethod + def _check_datastore(cls, path: str, expected_datastore: Optional[str]) -> bool: + """Perform a simple regex check to try determine if the datastore in the inputted path string + matches the expected_datastore. + + + :param path: An output pathstring. + :type path: str + :param expected_datastore: A datastore name. + :type expected_datastore: str + :return: Whether or not the expected_datastore was found in the path at the expected location. + :rtype: bool + """ + match = re.match("(.*datastore/)([^/]*)(/.*)", path) + if match: + groups = match.groups() + if groups[1] == expected_datastore: + return True + return False + + @classmethod + def _check_or_set_datastore( + cls, + name: str, + output: Output, + target_datastore: Optional[str], + iteration_num: Optional[int] = None, + ) -> MutableValidationResult: + """Tries to assign output.path to a value which includes the target_datastore if it's not already + set. If the output's path is already set, return a warning if it doesn't match the target_datastore. + + :param name: The name of the output to modify + :type name: str + :param output: The output object to examine and potentially change the datastore of. + :type output: Output + :param target_datastore: The name of the datastore to try applying to the output + :type target_datastore: str + :param iteration_num: the current iteration in the scatter gather loop. If set, include this in the generated + path. + :type iteration_num: Optional[int] + :return: A validation result containing any problems that arose. Contains a warning if the examined output + already contains a datastore that does not match 'target_datastore'. + :rtype: MutableValidationResult + """ + validation_result = cls._create_empty_validation_result() + if not hasattr(output, "path") or not output.path: + output.path = cls._get_fl_datastore_path(target_datastore, name, iteration_num=iteration_num) + # Double check the path's datastore leads to the target if it's already set. + elif not cls._check_datastore(output.path, target_datastore): + validation_result.append_warning( + yaml_path=name, + message=f"Output '{name}' has an undetermined datastore, or a datstore" + + f" that does not match the expected datastore for this output, which is '{target_datastore}'." + + " Make sure this is intended.", + ) + return validation_result + + # TODO 2293705: Add anchoring for more resource types. + @classmethod + def _anchor_step( + cls, + pipeline_step: Union[Pipeline, CommandComponent], + compute: str, + internal_datastore: str, + orchestrator_datastore: Optional[str], + iteration: Optional[int] = 0, + _path: str = "root", + ) -> MutableValidationResult: + """Take a pipeline step and recursively enforces the right compute/datastore config. + + :param pipeline_step: a step to anchor + :type pipeline_step: Union[Pipeline, CommandComponent] + :param compute: name of the compute target + :type compute: str + :param internal_datastore: The name of the datastore that should be used for internal output anchoring. + :type internal_datastore: str + :param orchestrator_datastore: The name of the orchestrator/aggregation datastore that should be used for + 'real' output anchoring. + :type orchestrator_datastore: str + :param iteration: The current iteration number in the scatter gather loop. Defaults to 0. + :type iteration: Optional[int] + :param _path: for recursive anchoring, codes the "path" inside the pipeline for messaging + :type _path: str + :return: A validation result containing any issues that were uncovered during anchoring. This function adds + warnings when outputs already have assigned paths which don't contain the expected datastore. + :rtype: MutableValidationResult + """ + + validation_result = cls._create_empty_validation_result() + + # Current step is a pipeline, which means we need to inspect its steps (jobs) and + # potentially anchor those as well. + if pipeline_step.type == "pipeline": + if hasattr(pipeline_step, "component"): + # Current step is probably not the root of the graph + # its outputs should be anchored to the internal_datastore. + for name, output in pipeline_step.outputs.items(): + if not isinstance(output, str): + if output.type in ANCHORABLE_OUTPUT_TYPES: + validation_result.merge_with( + cls._check_or_set_datastore( + name=name, + output=output, + target_datastore=orchestrator_datastore, + iteration_num=iteration, + ) + ) + + # then we need to anchor the internal component of this step + # The outputs of this sub-component are a deep copy of the outputs of this step + # This is dangerous, and we need to make sure they both use the same datastore, + # so we keep datastore types identical across this recursive call. + cls._anchor_step( + pipeline_step.component, # type: ignore + compute, + internal_datastore=internal_datastore, + orchestrator_datastore=orchestrator_datastore, + _path=f"{_path}.component", + ) + + else: + # This is a pipeline step with multiple jobs beneath it. + # Anchor its outputs... + for name, output in pipeline_step.outputs.items(): + if not isinstance(output, str): + if output.type in ANCHORABLE_OUTPUT_TYPES: + validation_result.merge_with( + cls._check_or_set_datastore( + name=name, + output=output, + target_datastore=orchestrator_datastore, + iteration_num=iteration, + ) + ) + # ...then recursively anchor each job inside the pipeline + if not isinstance(pipeline_step, CommandComponent): + for job_key in pipeline_step.jobs: + job = pipeline_step.jobs[job_key] + # replace orchestrator with internal datastore, jobs components + # should either use the local datastore + # or have already had their outputs re-assigned. + cls._anchor_step( + job, + compute, + internal_datastore=internal_datastore, + orchestrator_datastore=internal_datastore, + _path=f"{_path}.jobs.{job_key}", + ) + + elif pipeline_step.type == "command": + # if the current step is a command component + # make sure the compute corresponds to the silo + if not isinstance(pipeline_step, CommandComponent) and pipeline_step.compute is None: + pipeline_step.compute = compute + # then anchor each of the job's outputs + for name, output in pipeline_step.outputs.items(): + if not isinstance(output, str): + if output.type in ANCHORABLE_OUTPUT_TYPES: + validation_result.merge_with( + cls._check_or_set_datastore( + name=name, + output=output, + target_datastore=orchestrator_datastore, + iteration_num=iteration, + ) + ) + else: + # TODO revisit this and add support for anchoring more things + raise NotImplementedError(f"under path={_path}: step type={pipeline_step.type} is not supported") + + return validation_result + + # Making this a class method allows for easier, isolated testing, and allows careful + # users to call this as a pre-init step. + # TODO: Might be worth migrating this to a schema validation class, but out of scope for now. + # pylint: disable=too-many-statements,too-many-branches, too-many-locals + @classmethod + def validate_inputs( + cls, + *, + silo_configs: List[FederatedLearningSilo], + silo_component: Component, + aggregation_component: Component, + shared_silo_kwargs: Optional[Dict], + aggregation_compute: Optional[str], + aggregation_datastore: Optional[str], + aggregation_kwargs: Optional[Dict], + silo_to_aggregation_argument_map: Optional[Dict], + aggregation_to_silo_argument_map: Optional[Dict], + max_iterations: int, + raise_error: bool = False, + ) -> MutableValidationResult: + """Validates the inputs for the scatter-gather node. + + :keyword silo_configs: List of federated learning silo configurations. + :paramtype silo_configs: List[~azure.ai.ml.entities._assets.federated_learning_silo.FederatedLearningSilo] + :keyword silo_component: Component representing the silo for federated learning. + :paramtype silo_component: ~azure.ai.ml.entities.Component + :keyword aggregation_component: Component representing the aggregation step. + :paramtype aggregation_component: ~azure.ai.ml.entities.Component + :keyword shared_silo_kwargs: Keyword arguments shared across all silos. + :paramtype shared_silo_kwargs: Dict + :keyword aggregation_compute: The compute resource for the aggregation step. + :paramtype aggregation_compute: str + :keyword aggregation_datastore: The datastore for the aggregation step. + :paramtype aggregation_datastore: str + :keyword aggregation_kwargs: Keyword arguments specific to the aggregation step. + :paramtype aggregation_kwargs: Dict + :keyword silo_to_aggregation_argument_map: Mapping of silo to aggregation arguments. + :paramtype silo_to_aggregation_argument_map: Dict + :keyword aggregation_to_silo_argument_map: Mapping of aggregation to silo arguments. + :paramtype aggregation_to_silo_argument_map: Dict + :keyword max_iterations: The maximum number of iterations for the scatter-gather loop. + :paramtype max_iterations: int + :keyword raise_error: Whether to raise an exception if validation fails. Defaults to False. + :paramtype raise_error: bool + :return: The validation result. + :rtype: ~azure.ai.ml.entities._validation.MutableValidationResult + """ + validation_result = cls._create_empty_validation_result() + + # saved values for validation later on + silo_inputs = None + silo_outputs = None + agg_inputs = None + agg_outputs = None + # validate silo component + if silo_component is None: + validation_result.append_error( + yaml_path="silo_component", + message="silo_component is a required argument for the scatter gather node.", + ) + else: + # ensure that silo component has both inputs and outputs + if not hasattr(silo_component, "inputs"): + validation_result.append_error( + yaml_path="silo_component", + message="silo_component is missing 'inputs' attribute;" + + "it does not appear to be a valid component that can be used in a scatter-gather loop.", + ) + else: + silo_inputs = silo_component.inputs + if not hasattr(silo_component, "outputs"): + validation_result.append_error( + yaml_path="silo_component", + message="silo_component is missing 'outputs' attribute;" + + "it does not appear to be a valid component that can be used in a scatter-gather loop.", + ) + else: + silo_outputs = silo_component.outputs + # validate aggregation component + if aggregation_component is None: + validation_result.append_error( + yaml_path="aggregation_component", + message="aggregation_component is a required argument for the scatter gather node.", + ) + else: + # ensure that aggregation component has both inputs and outputs + if not hasattr(aggregation_component, "inputs"): + validation_result.append_error( + yaml_path="aggregation_component", + message="aggregation_component is missing 'inputs' attribute;" + + "it does not appear to be a valid component that can be used in a scatter-gather loop.", + ) + else: + agg_inputs = aggregation_component.inputs + if not hasattr(aggregation_component, "outputs"): + validation_result.append_error( + yaml_path="aggregation_component", + message="aggregation_component is missing 'outputs' attribute;" + + " it does not appear to be a valid component that can be used in a scatter-gather loop.", + ) + else: + agg_outputs = aggregation_component.outputs + + # validate silos configs + if silo_configs is None: + validation_result.append_error( + yaml_path="silo_configs", + message="silo_configs is a required argument for the scatter gather node.", + ) + elif len(silo_configs) == 0: + validation_result.append_error( + yaml_path="silo_configs", + message="silo_configs cannot be an empty list.", + ) + else: + first_silo = silo_configs[0] + expected_inputs: List = [] + if hasattr(first_silo, "inputs"): + expected_inputs = first_silo.inputs.keys() # type: ignore + num_expected_inputs = len(expected_inputs) + # pylint: disable=consider-using-enumerate + for i in range(len(silo_configs)): + silo = silo_configs[i] + if not hasattr(silo, "compute"): + validation_result.append_error( + yaml_path="silo_configs", + message=f"Silo at index {i} in silo_configs is missing its compute value.", + ) + if not hasattr(silo, "datastore"): + validation_result.append_error( + yaml_path="silo_configs", + message=f"Silo at index {i} in silo_configs is missing its datastore value.", + ) + silo_input_len = 0 + if hasattr(silo, "inputs"): + silo_input_len = len(silo.inputs) + # if inputs exist, make sure the inputs names are consistent across silo configs + for expected_input_name in expected_inputs: + if expected_input_name not in silo.inputs: + validation_result.append_error( + yaml_path="silo_configs", + message=f"Silo at index {i} has is missing inputs named '{expected_input_name}'," + + "which was listed in the first silo config. " + + "Silos must have consistent inputs names.", + ) + if silo_input_len != num_expected_inputs: + validation_result.append_error( + yaml_path="silo_configs", + message=f"Silo at index {i} has {silo_input_len} inputs, but the first silo established that" + + f"each silo would have {num_expected_inputs} silo-specific inputs.", + ) + + # Make sure both aggregation overrides are set, or not + if aggregation_datastore is None and aggregation_compute is not None: + validation_result.append_error( + yaml_path="aggregation_datastore", + message="aggregation_datastore cannot be unset if aggregation_compute is set.", + ) + elif aggregation_datastore is not None and aggregation_compute is None: + validation_result.append_error( + yaml_path="aggregation_compute", + message="aggregation_compute cannot be unset if aggregation_datastore is set.", + ) + + # validate component kwargs, ensuring that the relevant components contain the specified inputs + if shared_silo_kwargs is None: + validation_result.append_error( + yaml_path="shared_silo_kwargs", + message="shared_silo_kwargs should never be None. Input an empty dictionary instead.", + ) + elif silo_inputs is not None: + for k in shared_silo_kwargs.keys(): + if k not in silo_inputs: + validation_result.append_error( + yaml_path="shared_silo_kwargs", + message=f"shared_silo_kwargs keyword {k} not listed in silo_component's inputs", + ) + if aggregation_kwargs is None: + validation_result.append_error( + yaml_path="aggregation_kwargs", + message="aggregation_kwargs should never be None. Input an empty dictionary instead.", + ) + elif silo_inputs is not None: + for k in aggregation_kwargs.keys(): + if agg_inputs is not None and k not in agg_inputs: + validation_result.append_error( + yaml_path="aggregation_kwargs", + message=f"aggregation_kwargs keyword {k} not listed in aggregation_component's inputs", + ) + + # validate that argument mappings leverage inputs and outputs that actually exist + if aggregation_to_silo_argument_map is None: + validation_result.append_error( + yaml_path="aggregation_to_silo_argument_map", + message="aggregation_to_silo_argument_map should never be None. Input an empty dictionary instead.", + ) + elif silo_inputs is not None and agg_outputs is not None: + for k, v in aggregation_to_silo_argument_map.items(): + if k not in agg_outputs: + validation_result.append_error( + yaml_path="aggregation_to_silo_argument_map", + message=f"aggregation_to_silo_argument_map key {k} " + + "is not a known output of the aggregation component.", + ) + if v not in silo_inputs: + validation_result.append_error( + yaml_path="aggregation_to_silo_argument_map", + message=f"aggregation_to_silo_argument_map value {v} " + + "is not a known input of the silo component.", + ) + # and check the other mapping + if silo_to_aggregation_argument_map is None: + validation_result.append_error( + yaml_path="silo_to_aggregation_argument_map", + message="silo_to_aggregation_argument_map should never be None. " + + "Input an empty dictionary instead.", + ) + elif agg_inputs is not None and silo_outputs is not None: + for k, v in silo_to_aggregation_argument_map.items(): + if k not in silo_outputs: + validation_result.append_error( + yaml_path="silo_to_aggregation_argument_map", + message=f"silo_to_aggregation_argument_map key {k }" + + " is not a known output of the silo component.", + ) + if v not in agg_inputs: + validation_result.append_error( + yaml_path="silo_to_aggregation_argument_map", + message=f"silo_to_aggregation_argument_map value {v}" + + " is not a known input of the aggregation component.", + ) + + if max_iterations < 1: + validation_result.append_error( + yaml_path="max_iterations", + message=f"max_iterations must be a positive value, not '{max_iterations}'.", + ) + + return cls._try_raise(validation_result, raise_error=raise_error) + + @classmethod + def _custom_fl_data_path( + cls, + datastore_name: str, + output_name: str, + unique_id: str = "${{name}}", + iteration_num: str = "${{iteration_num}}", + ) -> str: + """Produces a path to store the data during FL training. + + :param datastore_name: name of the Azure ML datastore + :type datastore_name: str + :param output_name: a name unique to this output + :type output_name: str + :param unique_id: a unique id for the run (default: inject run id with ${{name}}) + :type unique_id: str + :param iteration_num: an iteration number if relevant + :type iteration_num: str + :return: direct url to the data path to store the data + :rtype: str + """ + data_path = f"azureml://datastores/{datastore_name}/paths/federated_learning/{output_name}/{unique_id}/" + if iteration_num is not None: + data_path += f"iteration_{iteration_num}/" + + return data_path + + def _get_aggregator_input_name(self, silo_output_name: str) -> Optional[str]: + """Retrieves the aggregator input name + + :param silo_output_name: The silo output name + :type silo_output_name: str + :return: + * Returns aggregator input name that maps to silo_output. + * Returns None if silo_output_name not in silo_to_aggregation_argument_map + :rtype: Optional[str] + """ + if self.silo_to_aggregation_argument_map is None: + return None + + return self.silo_to_aggregation_argument_map.get(silo_output_name) + + @classmethod + def _try_create_default_mappings( + cls, + silo_comp: Optional[Component], + agg_comp: Optional[Component], + silo_agg_map: Optional[Dict], + agg_silo_map: Optional[Dict], + ) -> Tuple[Optional[Dict], Optional[Dict]]: + """ + This function tries to produce dictionaries that link the silo and aggregation + components' outputs to the other's inputs. + The mapping only occurs for inputted mappings that are None, otherwise + the inputted mapping is returned unchanged. + These auto-generated mappings are naive, and simply maps all outputs of a component that have a + identically-named input in the other component. + + This function does nothing if either inputted component is None. This function will also do nothing + for a given mapping if either of the relevant inputs or outputs are None (but not empty). + + Example inputs: + silo_comp.inputs = {"silo_input" : value } + silo_comp.outputs = {"c" : ..., "silo_output2" : ... } + agg_comp.inputs = {"silo_output1" : ... } + agg_comp.outputs = {"agg_output" : ... } + silo_agg_map = None + agg_silo_map = {} + + Example returns: + {"silo_output1" : "silo_output1"}, {} + + :param silo_comp: The silo component + :type silo_comp: Optional[Component] + :param agg_comp: The aggregation component + :type agg_comp: Optional[Component] + :param silo_agg_map: Mapping of silo to aggregation arguments. + :type silo_agg_map: Optional[Dict] + :param agg_silo_map: Mapping of aggregation to silo arguments. + :type agg_silo_map: Optional[Dict] + :return: Returns a tuple of the potentially modified silo to aggregation mapping, followed by the aggregation + to silo mapping. + :rtype: Tuple[Optional[Dict], Optional[Dict]] + """ + if silo_comp is None or agg_comp is None: + return silo_agg_map, agg_silo_map + if silo_agg_map is None and silo_comp.outputs is not None and agg_comp.inputs is not None: + silo_agg_map = {output: output for output in silo_comp.outputs.keys() if output in agg_comp.inputs} + if agg_silo_map is None: + agg_silo_map = {output: output for output in agg_comp.outputs.keys() if output in silo_comp.inputs} + return silo_agg_map, agg_silo_map + + @staticmethod + # pylint: disable-next=docstring-missing-rtype + def _get_merge_component(output_type: str) -> Any: + """Gets the merge component to be used based on type of output + + :param output_type: The output type + :type output_type: str + :return: The merge component + """ + return MERGE_COMPONENT_MAPPING[output_type] + + def _inject_merge_components(self, executed_silo_components: Any) -> Dict: + """Add a merge component for each silo output in the silo_to_aggregation_argument_map. + These merge components act as a mediator between the user silo and aggregation steps, reducing + the variable number of silo outputs into a single input for the aggergation step. + + :param executed_silo_components: A list of executed silo steps to extract outputs from. + :type executed_silo_components: + :return: A mapping from silo output names to the corresponding newly created and executed merge component + :rtype: dict + """ + executed_component = executed_silo_components[0] + + merge_comp_mapping = {} + if self.silo_to_aggregation_argument_map is not None: + for ( + silo_output_argument_name, + _, + ) in self.silo_to_aggregation_argument_map.items(): + merge_comp = self._get_merge_component(executed_component.outputs[silo_output_argument_name].type) + merge_component_inputs = { + silo_output_argument_name + + "_silo_" + + str(i): executed_silo_components[i].outputs[silo_output_argument_name] + for i in range(0, len(executed_silo_components)) + } + executed_merge_component = merge_comp(**merge_component_inputs) + for input_obj in executed_merge_component.inputs.values(): + input_obj.mode = "direct" + for output_obj in executed_merge_component.outputs.values(): + output_obj.type = "mltable" + merge_comp_mapping.update({silo_output_argument_name: executed_merge_component}) + + return merge_comp_mapping + + # boilerplate functions - largely copied from other node builders + + @property + def outputs(self) -> Dict[str, Union[str, Output]]: + """Get the outputs of the scatter-gather node. + + :return: The outputs of the scatter-gather node. + :rtype: Dict[str, Union[str, ~azure.ai.ml.Output]] + """ + return self._outputs + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema: + return FLScatterGatherSchema(context=context) + + def _to_rest_object(self, **kwargs: Any) -> dict: + """Convert self to a rest object for remote call. + + :return: The rest object + :rtype: dict + """ + rest_node = super(FLScatterGather, self)._to_rest_object(**kwargs) + rest_node.update({"outputs": self._to_rest_outputs()}) + # TODO: Bug Item number: 2897665 + res: dict = convert_ordered_dict_to_dict(rest_node) # type: ignore + return res diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_func.py new file mode 100644 index 00000000..c9ecabd8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_func.py @@ -0,0 +1,93 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +from typing import Any, Dict, Optional + +from azure.ai.ml.constants._component import ComponentSource +from azure.ai.ml.entities._component.import_component import ImportComponent +from azure.ai.ml.entities._inputs_outputs import Output +from azure.ai.ml.entities._job.import_job import ImportSource + +from .command_func import _parse_input, _parse_inputs_outputs, _parse_output +from .import_node import Import + + +def import_job( + *, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + display_name: Optional[str] = None, + experiment_name: Optional[str] = None, + source: Optional[ImportSource] = None, + output: Optional[Output] = None, + is_deterministic: bool = True, + **kwargs: Any, +) -> Import: + """Create an Import object which can be used inside dsl.pipeline as a function + and can also be created as a standalone import job. + + :keyword name: Name of the import job or component created. + :paramtype name: str + :keyword description: A friendly description of the import. + :paramtype description: str + :keyword tags: Tags to be attached to this import. + :paramtype tags: Dict + :keyword display_name: A friendly name. + :paramtype display_name: str + :keyword experiment_name: Name of the experiment the job will be created under. + If None is provided, the default will be set to the current directory name. + Will be ignored as a pipeline step. + :paramtype experiment_name: str + :keyword source: Input source parameters used by this import. + :paramtype source: ~azure.ai.ml.entities._job.import_job.ImportSource + :keyword output: The output of this import. + :paramtype output: ~azure.ai.ml.entities.Output + :keyword is_deterministic: Specify whether the command will return the same output given the same input. + If a command (component) is deterministic, when used as a node/step in a pipeline, + it will reuse results from a previously submitted job in the current workspace + which has the same inputs and settings. + In this case, this step will not use any compute resource. + Defaults to True. + :paramtype is_deterministic: bool + :returns: The Import object. + :rtype: ~azure.ai.ml.entities._builders.import_node.Import + """ + inputs = source._to_job_inputs() if source else kwargs.pop("inputs") + outputs = {"output": output} if output else kwargs.pop("outputs") + component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) + # job inputs can not be None + job_inputs = {k: v for k, v in job_inputs.items() if v is not None} + component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) + + component = kwargs.pop("component", None) + + if component is None: + component = ImportComponent( + name=name, + tags=tags, + display_name=display_name, + description=description, + source=component_inputs, + output=component_outputs["output"], + _source=ComponentSource.BUILDER, + is_deterministic=is_deterministic, + **kwargs, + ) + + import_obj = Import( + component=component, + name=name, + description=description, + tags=tags, + display_name=display_name, + experiment_name=experiment_name, + inputs=job_inputs, + outputs=job_outputs, + **kwargs, + ) + + return import_obj diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_node.py new file mode 100644 index 00000000..144753d5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/import_node.py @@ -0,0 +1,205 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access + +import logging +from typing import Any, Dict, List, Optional, Tuple, Type, Union + +from marshmallow import Schema + +from azure.ai.ml._restclient.v2022_02_01_preview.models import CommandJob as RestCommandJob +from azure.ai.ml._restclient.v2022_02_01_preview.models import JobBaseData +from azure.ai.ml._schema.job.import_job import ImportJobSchema +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY +from azure.ai.ml.constants._component import ComponentSource, NodeType +from azure.ai.ml.constants._compute import ComputeType +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._component.import_component import ImportComponent +from azure.ai.ml.entities._inputs_outputs import Output +from azure.ai.ml.entities._job._input_output_helpers import from_rest_data_outputs, from_rest_inputs_to_dataset_literal +from azure.ai.ml.entities._job.import_job import ImportJob, ImportSource +from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException + +from ..._schema import PathAwareSchema +from .._inputs_outputs import Output +from .._util import convert_ordered_dict_to_dict, load_from_dict, validate_attribute_type +from .base_node import BaseNode + +module_logger = logging.getLogger(__name__) + + +class Import(BaseNode): + """Base class for import node, used for import component version consumption. + + You should not instantiate this class directly. Instead, you should + create from a builder function. + + :param component: Id or instance of the import component/job to be run for the step. + :type component: ~azure.ai.ml.entities._component.import_component.ImportComponent + :param inputs: Input parameters to the import. + :type inputs: Dict[str, str] + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Dict[str, Union[str, ~azure.ai.ml.entities.Output]] + :param name: Name of the import. + :type name: str + :param description: Description of the import. + :type description: str + :param display_name: Display name of the job. + :type display_name: str + :param experiment_name: Name of the experiment the job will be created under, + if None is provided, the default will be set to the current directory name. + :type experiment_name: str + """ + + def __init__( + self, + *, + component: Union[str, ImportComponent], + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + + kwargs.pop("type", None) + kwargs.pop("compute", None) + + self._parameters = kwargs.pop("parameters", {}) + BaseNode.__init__( + self, + type=NodeType.IMPORT, + inputs=inputs, + outputs=outputs, + component=component, + compute=ComputeType.ADF, + **kwargs, + ) + + @classmethod + def _get_supported_inputs_types(cls) -> Type[str]: + # import source parameters type, connection, query, path are always str + return str + + @classmethod + def _get_supported_outputs_types(cls) -> Tuple: + return str, Output + + @property + def component(self) -> Union[str, ImportComponent]: + res: Union[str, ImportComponent] = self._component + return res + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, ImportComponent), + } + + def _to_job(self) -> ImportJob: + return ImportJob( + id=self.id, + name=self.name, + display_name=self.display_name, + description=self.description, + experiment_name=self.experiment_name, + status=self.status, + source=ImportSource._from_job_inputs(self._job_inputs), + output=self._job_outputs.get("output"), + creation_context=self.creation_context, + ) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return [] + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj: dict = super()._to_rest_object(**kwargs) + rest_obj.update( + convert_ordered_dict_to_dict( + { + "componentId": self._get_component_id(), + } + ) + ) + return rest_obj + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Import": + from .import_func import import_job + + loaded_data = load_from_dict(ImportJobSchema, data, context, additional_message, **kwargs) + + _import_job: Import = import_job(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + return _import_job + + @classmethod + def _load_from_rest_job(cls, obj: JobBaseData) -> "Import": + from .import_func import import_job + + rest_command_job: RestCommandJob = obj.properties + inputs = from_rest_inputs_to_dataset_literal(rest_command_job.inputs) + outputs = from_rest_data_outputs(rest_command_job.outputs) + + _import_job: Import = import_job( + name=obj.name, + display_name=rest_command_job.display_name, + description=rest_command_job.description, + experiment_name=rest_command_job.experiment_name, + status=rest_command_job.status, + creation_context=obj.system_data, + inputs=inputs, + output=outputs["output"] if "output" in outputs else None, + ) + _import_job._id = obj.id + if isinstance(_import_job.component, ImportComponent): + _import_job.component._source = ( + ComponentSource.REMOTE_WORKSPACE_JOB + ) # This is used by pipeline job telemetries. + + return _import_job + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import ImportSchema + + return ImportSchema(context=context) + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> "Import": + """Call Import as a function will return a new instance each time. + + :return: An Import node. + :rtype: Import + """ + if isinstance(self._component, Component): + # call this to validate inputs + node: Import = self._component(*args, **kwargs) + # merge inputs + for name, original_input in self.inputs.items(): + if name not in kwargs: + # use setattr here to make sure owner of input won't change + setattr(node.inputs, name, original_input._data) + node._job_inputs[name] = original_input._data + # get outputs + for name, original_output in self.outputs.items(): + # use setattr here to make sure owner of input won't change + if not isinstance(original_output, str): + setattr(node.outputs, name, original_output._data) + self._refine_optional_inputs_with_no_value(node, kwargs) + # set default values: compute, environment_variables, outputs + node._name = self.name + node.compute = self.compute + node.tags = self.tags + # Pass through the display name only if the display name is not system generated. + node.display_name = self.display_name if self.display_name != self.name else None + return node + msg = "Import can be called as a function only when referenced component is {}, currently got {}." + raise ValidationException( + message=msg.format(type(Component), self._component), + no_personal_data_message=msg.format(type(Component), "self._component"), + target=ErrorTarget.COMMAND_JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel.py new file mode 100644 index 00000000..db1de797 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel.py @@ -0,0 +1,551 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import copy +import json +import logging +import os +import re +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast + +from marshmallow import INCLUDE, Schema + +from azure.ai.ml._schema.core.fields import NestedField, UnionField +from azure.ai.ml._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, + _BaseJobIdentityConfiguration, +) +from azure.ai.ml.entities._job.job import Job +from azure.ai.ml.entities._job.parallel.run_function import RunFunction +from azure.ai.ml.entities._job.pipeline._io import NodeOutput +from azure.ai.ml.exceptions import MlException + +from ..._schema import PathAwareSchema +from ..._utils.utils import is_data_binding_expression +from ...constants._common import ARM_ID_PREFIX +from ...constants._component import NodeType +from .._component.component import Component +from .._component.flow import FlowComponent +from .._component.parallel_component import ParallelComponent +from .._inputs_outputs import Input, Output +from .._job.job_resource_configuration import JobResourceConfiguration +from .._job.parallel.parallel_job import ParallelJob +from .._job.parallel.parallel_task import ParallelTask +from .._job.parallel.retry_settings import RetrySettings +from .._job.pipeline._io import NodeWithGroupInputMixin +from .._util import convert_ordered_dict_to_dict, get_rest_dict_for_node_attrs, validate_attribute_type +from .base_node import BaseNode + +module_logger = logging.getLogger(__name__) + + +class Parallel(BaseNode, NodeWithGroupInputMixin): # pylint: disable=too-many-instance-attributes + """Base class for parallel node, used for parallel component version consumption. + + You should not instantiate this class directly. Instead, you should + create from builder function: parallel. + + :param component: Id or instance of the parallel component/job to be run for the step + :type component: ~azure.ai.ml.entities._component.parallel_component.parallelComponent + :param name: Name of the parallel + :type name: str + :param description: Description of the commad + :type description: str + :param tags: Tag dictionary. Tags can be added, removed, and updated + :type tags: dict[str, str] + :param properties: The job property dictionary + :type properties: dict[str, str] + :param display_name: Display name of the job + :type display_name: str + :param retry_settings: Parallel job run failed retry + :type retry_settings: BatchRetrySettings + :param logging_level: A string of the logging level name + :type logging_level: str + :param max_concurrency_per_instance: The max parallellism that each compute instance has + :type max_concurrency_per_instance: int + :param error_threshold: The number of item processing failures should be ignored + :type error_threshold: int + :param mini_batch_error_threshold: The number of mini batch processing failures should be ignored + :type mini_batch_error_threshold: int + :param task: The parallel task + :type task: ParallelTask + :param mini_batch_size: For FileDataset input, this field is the number of files + a user script can process in one run() call. + For TabularDataset input, this field is the approximate size of data + the user script can process in one run() call. + Example values are 1024, 1024KB, 10MB, and 1GB. (optional, default value is 10 files + for FileDataset and 1MB for TabularDataset.) + This value could be set through PipelineParameter + :type mini_batch_size: str + :param partition_keys: The keys used to partition dataset into mini-batches. If specified, + the data with the same key will be partitioned into the same mini-batch. + If both partition_keys and mini_batch_size are specified, + the partition keys will take effect. + The input(s) must be partitioned dataset(s), + and the partition_keys must be a subset of the keys of every input dataset for this to work. + :keyword identity: The identity that the command job will use while running on compute. + :paramtype identity: Optional[Union[ + dict[str, str], + ~azure.ai.ml.entities.ManagedIdentityConfiguration, + ~azure.ai.ml.entities.AmlTokenConfiguration, + ~azure.ai.ml.entities.UserIdentityConfiguration]] + :type partition_keys: List + :param input_data: The input data + :type input_data: str + :param inputs: Inputs of the component/job + :type inputs: dict + :param outputs: Outputs of the component/job + :type outputs: dict + """ + + # pylint: disable=too-many-statements + def __init__( + self, + *, + component: Union[ParallelComponent, str], + compute: Optional[str] = None, + inputs: Optional[Dict[str, Union[NodeOutput, Input, str, bool, int, float, Enum]]] = None, + outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None, + retry_settings: Optional[Union[RetrySettings, Dict[str, str]]] = None, + logging_level: Optional[str] = None, + max_concurrency_per_instance: Optional[int] = None, + error_threshold: Optional[int] = None, + mini_batch_error_threshold: Optional[int] = None, + input_data: Optional[str] = None, + task: Optional[Union[ParallelTask, RunFunction, Dict]] = None, + partition_keys: Optional[List] = None, + mini_batch_size: Optional[Union[str, int]] = None, + resources: Optional[JobResourceConfiguration] = None, + environment_variables: Optional[Dict] = None, + identity: Optional[ + Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration, Dict] + ] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + kwargs.pop("type", None) + + if isinstance(component, FlowComponent): + # make input definition fit actual inputs for flow component + with component._inputs._fit_inputs(inputs): # type: ignore[attr-defined] + BaseNode.__init__( + self, + type=NodeType.PARALLEL, + component=component, + inputs=inputs, + outputs=outputs, + compute=compute, + **kwargs, + ) + else: + BaseNode.__init__( + self, + type=NodeType.PARALLEL, + component=component, + inputs=inputs, + outputs=outputs, + compute=compute, + **kwargs, + ) + # init mark for _AttrDict + self._init = True + + self._task = task + + if ( + mini_batch_size is not None + and not isinstance(mini_batch_size, int) + and not is_data_binding_expression(mini_batch_size) + ): + """Convert str to int.""" # pylint: disable=pointless-string-statement + pattern = re.compile(r"^\d+([kKmMgG][bB])*$") + if not pattern.match(mini_batch_size): + raise ValueError(r"Parameter mini_batch_size must follow regex rule ^\d+([kKmMgG][bB])*$") + + try: + mini_batch_size = int(mini_batch_size) + except ValueError as e: + if not isinstance(mini_batch_size, int): + unit = mini_batch_size[-2:].lower() + if unit == "kb": + mini_batch_size = int(mini_batch_size[0:-2]) * 1024 + elif unit == "mb": + mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024 + elif unit == "gb": + mini_batch_size = int(mini_batch_size[0:-2]) * 1024 * 1024 * 1024 + else: + raise ValueError("mini_batch_size unit must be kb, mb or gb") from e + + self.mini_batch_size = mini_batch_size + self.partition_keys = partition_keys + self.input_data = input_data + self._retry_settings = retry_settings + self.logging_level = logging_level + self.max_concurrency_per_instance = max_concurrency_per_instance + self.error_threshold = error_threshold + self.mini_batch_error_threshold = mini_batch_error_threshold + self._resources = resources + self.environment_variables = {} if environment_variables is None else environment_variables + self._identity = identity + if isinstance(self.component, ParallelComponent): + self.resources = cast(JobResourceConfiguration, self.resources) or cast( + JobResourceConfiguration, copy.deepcopy(self.component.resources) + ) + # TODO: Bug Item number: 2897665 + self.retry_settings = self.retry_settings or copy.deepcopy(self.component.retry_settings) # type: ignore + self.input_data = self.input_data or self.component.input_data + self.max_concurrency_per_instance = ( + self.max_concurrency_per_instance or self.component.max_concurrency_per_instance + ) + self.mini_batch_error_threshold = ( + self.mini_batch_error_threshold or self.component.mini_batch_error_threshold + ) + self.mini_batch_size = self.mini_batch_size or self.component.mini_batch_size + self.partition_keys = self.partition_keys or copy.deepcopy(self.component.partition_keys) + + if not self.task: + self.task = self.component.task + # task.code is based on self.component.base_path + self._base_path = self.component.base_path + + self._init = False + + @classmethod + def _get_supported_outputs_types(cls) -> Tuple: + return str, Output + + @property + def retry_settings(self) -> RetrySettings: + """Get the retry settings for the parallel job. + + :return: The retry settings for the parallel job. + :rtype: ~azure.ai.ml.entities._job.parallel.retry_settings.RetrySettings + """ + return self._retry_settings # type: ignore + + @retry_settings.setter + def retry_settings(self, value: Union[RetrySettings, Dict]) -> None: + """Set the retry settings for the parallel job. + + :param value: The retry settings for the parallel job. + :type value: ~azure.ai.ml.entities._job.parallel.retry_settings.RetrySettings or dict + """ + if isinstance(value, dict): + value = RetrySettings(**value) + self._retry_settings = value + + @property + def resources(self) -> Optional[JobResourceConfiguration]: + """Get the resource configuration for the parallel job. + + :return: The resource configuration for the parallel job. + :rtype: ~azure.ai.ml.entities._job.job_resource_configuration.JobResourceConfiguration + """ + return self._resources + + @resources.setter + def resources(self, value: Union[JobResourceConfiguration, Dict]) -> None: + """Set the resource configuration for the parallel job. + + :param value: The resource configuration for the parallel job. + :type value: ~azure.ai.ml.entities._job.job_resource_configuration.JobResourceConfiguration or dict + """ + if isinstance(value, dict): + value = JobResourceConfiguration(**value) + self._resources = value + + @property + def identity( + self, + ) -> Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration, Dict]]: + """The identity that the job will use while running on compute. + + :return: The identity that the job will use while running on compute. + :rtype: Optional[Union[~azure.ai.ml.ManagedIdentityConfiguration, ~azure.ai.ml.AmlTokenConfiguration, + ~azure.ai.ml.UserIdentityConfiguration]] + """ + return self._identity + + @identity.setter + def identity( + self, + value: Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration, None], + ) -> None: + """Sets the identity that the job will use while running on compute. + + :param value: The identity that the job will use while running on compute. + :type value: Union[dict[str, str], ~azure.ai.ml.ManagedIdentityConfiguration, + ~azure.ai.ml.AmlTokenConfiguration, ~azure.ai.ml.UserIdentityConfiguration] + """ + if isinstance(value, dict): + identity_schema = UnionField( + [ + NestedField(ManagedIdentitySchema, unknown=INCLUDE), + NestedField(AMLTokenIdentitySchema, unknown=INCLUDE), + NestedField(UserIdentitySchema, unknown=INCLUDE), + ] + ) + value = identity_schema._deserialize(value=value, attr=None, data=None) + self._identity = value + + @property + def component(self) -> Union[str, ParallelComponent]: + """Get the component of the parallel job. + + :return: The component of the parallel job. + :rtype: str or ~azure.ai.ml.entities._component.parallel_component.ParallelComponent + """ + res: Union[str, ParallelComponent] = self._component + return res + + @property + def task(self) -> Optional[ParallelTask]: + """Get the parallel task. + + :return: The parallel task. + :rtype: ~azure.ai.ml.entities._job.parallel.parallel_task.ParallelTask + """ + return self._task # type: ignore + + @task.setter + def task(self, value: Union[ParallelTask, Dict]) -> None: + """Set the parallel task. + + :param value: The parallel task. + :type value: ~azure.ai.ml.entities._job.parallel.parallel_task.ParallelTask or dict + """ + # base path should be reset if task is set via sdk + self._base_path: Optional[Union[str, os.PathLike]] = None + if isinstance(value, dict): + value = ParallelTask(**value) + self._task = value + + def _set_base_path(self, base_path: Optional[Union[str, os.PathLike]]) -> None: + if self._base_path: + return + super(Parallel, self)._set_base_path(base_path) + + def set_resources( + self, + *, + instance_type: Optional[Union[str, List[str]]] = None, + instance_count: Optional[int] = None, + properties: Optional[Dict] = None, + docker_args: Optional[str] = None, + shm_size: Optional[str] = None, + # pylint: disable=unused-argument + **kwargs: Any, + ) -> None: + """Set the resources for the parallel job. + + :keyword instance_type: The instance type or a list of instance types used as supported by the compute target. + :paramtype instance_type: Union[str, List[str]] + :keyword instance_count: The number of instances or nodes used by the compute target. + :paramtype instance_count: int + :keyword properties: The property dictionary for the resources. + :paramtype properties: dict + :keyword docker_args: Extra arguments to pass to the Docker run command. + :paramtype docker_args: str + :keyword shm_size: Size of the Docker container's shared memory block. + :paramtype shm_size: str + """ + if self.resources is None: + self.resources = JobResourceConfiguration() + + if instance_type is not None: + self.resources.instance_type = instance_type + if instance_count is not None: + self.resources.instance_count = instance_count + if properties is not None: + self.resources.properties = properties + if docker_args is not None: + self.resources.docker_args = docker_args + if shm_size is not None: + self.resources.shm_size = shm_size + + # Save the resources to internal component as well, otherwise calling sweep() will loose the settings + if isinstance(self.component, Component): + self.component.resources = self.resources + + @classmethod + def _attr_type_map(cls) -> dict: + return { + "component": (str, ParallelComponent, FlowComponent), + "retry_settings": (dict, RetrySettings), + "resources": (dict, JobResourceConfiguration), + "task": (dict, ParallelTask), + "logging_level": str, + "max_concurrency_per_instance": (str, int), + "error_threshold": (str, int), + "mini_batch_error_threshold": (str, int), + "environment_variables": dict, + } + + def _to_job(self) -> ParallelJob: + return ParallelJob( + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + properties=self.properties, + compute=self.compute, + resources=self.resources, + partition_keys=self.partition_keys, + mini_batch_size=self.mini_batch_size, + task=self.task, + retry_settings=self.retry_settings, + input_data=self.input_data, + logging_level=self.logging_level, + identity=self.identity, + max_concurrency_per_instance=self.max_concurrency_per_instance, + error_threshold=self.error_threshold, + mini_batch_error_threshold=self.mini_batch_error_threshold, + environment_variables=self.environment_variables, + inputs=self._job_inputs, + outputs=self._job_outputs, + ) + + def _parallel_attr_to_dict(self, attr: str, base_type: Type) -> dict: + # Convert parallel attribute to dict + rest_attr = {} + parallel_attr = getattr(self, attr) + if parallel_attr is not None: + if isinstance(parallel_attr, base_type): + rest_attr = parallel_attr._to_dict() + elif isinstance(parallel_attr, dict): + rest_attr = parallel_attr + else: + msg = f"Expecting {base_type} for {attr}, got {type(parallel_attr)} instead." + raise MlException(message=msg, no_personal_data_message=msg) + # TODO: Bug Item number: 2897665 + res: dict = convert_ordered_dict_to_dict(rest_attr) # type: ignore + return res + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return [ + "type", + "resources", + "error_threshold", + "mini_batch_error_threshold", + "environment_variables", + "max_concurrency_per_instance", + "task", + "input_data", + ] + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj: Dict = super(Parallel, self)._to_rest_object(**kwargs) + rest_obj.update( + convert_ordered_dict_to_dict( + { + "componentId": self._get_component_id(), + "retry_settings": get_rest_dict_for_node_attrs(self.retry_settings), + "logging_level": self.logging_level, + "mini_batch_size": self.mini_batch_size, + "partition_keys": ( + json.dumps(self.partition_keys) if self.partition_keys is not None else self.partition_keys + ), + "identity": get_rest_dict_for_node_attrs(self.identity), + "resources": get_rest_dict_for_node_attrs(self.resources), + } + ) + ) + return rest_obj + + @classmethod + def _from_rest_object_to_init_params(cls, obj: dict) -> Dict: + obj = super()._from_rest_object_to_init_params(obj) + # retry_settings + if "retry_settings" in obj and obj["retry_settings"]: + obj["retry_settings"] = RetrySettings._from_dict(obj["retry_settings"]) + + if "task" in obj and obj["task"]: + obj["task"] = ParallelTask._from_dict(obj["task"]) + task_code = obj["task"].code + task_env = obj["task"].environment + # remove azureml: prefix in code and environment which is added in _to_rest_object + if task_code and isinstance(task_code, str) and task_code.startswith(ARM_ID_PREFIX): + obj["task"].code = task_code[len(ARM_ID_PREFIX) :] + if task_env and isinstance(task_env, str) and task_env.startswith(ARM_ID_PREFIX): + obj["task"].environment = task_env[len(ARM_ID_PREFIX) :] + + if "resources" in obj and obj["resources"]: + obj["resources"] = JobResourceConfiguration._from_rest_object(obj["resources"]) + + if "partition_keys" in obj and obj["partition_keys"]: + obj["partition_keys"] = json.dumps(obj["partition_keys"]) + if "identity" in obj and obj["identity"]: + obj["identity"] = _BaseJobIdentityConfiguration._from_rest_object(obj["identity"]) + return obj + + def _build_inputs(self) -> Dict: + inputs = super(Parallel, self)._build_inputs() + built_inputs = {} + # Validate and remove non-specified inputs + for key, value in inputs.items(): + if value is not None: + built_inputs[key] = value + return built_inputs + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import ParallelSchema + + return ParallelSchema(context=context) + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> "Parallel": + """Call Parallel as a function will return a new instance each time. + + :return: A Parallel node + :rtype: Parallel + """ + if isinstance(self._component, Component): + # call this to validate inputs + node: Parallel = self._component(*args, **kwargs) + # merge inputs + for name, original_input in self.inputs.items(): + if name not in kwargs: + # use setattr here to make sure owner of input won't change + setattr(node.inputs, name, original_input._data) + # get outputs + for name, original_output in self.outputs.items(): + # use setattr here to make sure owner of input won't change + if not isinstance(original_output, str): + setattr(node.outputs, name, original_output._data) + self._refine_optional_inputs_with_no_value(node, kwargs) + # set default values: compute, environment_variables, outputs + node._name = self.name + node.compute = self.compute + node.tags = self.tags + node.display_name = self.display_name + node.mini_batch_size = self.mini_batch_size + node.partition_keys = self.partition_keys + node.logging_level = self.logging_level + node.max_concurrency_per_instance = self.max_concurrency_per_instance + node.error_threshold = self.error_threshold + # deep copy for complex object + node.retry_settings = copy.deepcopy(self.retry_settings) + node.input_data = self.input_data + node.task = copy.deepcopy(self.task) + node._base_path = self.base_path + node.resources = copy.deepcopy(self.resources) + node.environment_variables = copy.deepcopy(self.environment_variables) + node.identity = copy.deepcopy(self.identity) + return node + msg = f"Parallel can be called as a function only when referenced component is {type(Component)}, \ + currently got {self._component}." + raise MlException(message=msg, no_personal_data_message=msg) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job": + raise NotImplementedError() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_for.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_for.py new file mode 100644 index 00000000..1e888f50 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_for.py @@ -0,0 +1,362 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import json +import os +from typing import Any, Dict, Optional, Union + +from azure.ai.ml import Input, Output +from azure.ai.ml._schema import PathAwareSchema +from azure.ai.ml._schema.pipeline.control_flow_job import ParallelForSchema +from azure.ai.ml._utils.utils import is_data_binding_expression +from azure.ai.ml.constants import AssetTypes +from azure.ai.ml.constants._component import ComponentParameterTypes, ControlFlowType +from azure.ai.ml.entities import Component, Pipeline +from azure.ai.ml.entities._builders import BaseNode +from azure.ai.ml.entities._builders.control_flow_node import LoopNode +from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput +from azure.ai.ml.entities._job.pipeline._io.mixin import NodeIOMixin +from azure.ai.ml.entities._util import convert_ordered_dict_to_dict, validate_attribute_type +from azure.ai.ml.entities._validation import MutableValidationResult +from azure.ai.ml.exceptions import UserErrorException + + +class ParallelFor(LoopNode, NodeIOMixin): + """Parallel for loop node in the pipeline job. By specifying the loop body and aggregated items, a job-level + parallel for loop can be implemented. It will be initialized when calling dsl.parallel_for or when loading the + pipeline yml containing parallel_for node. Please do not manually initialize this class. + + :param body: Pipeline job for the parallel for loop body. + :type body: ~azure.ai.ml.entities.Pipeline + :param items: The loop body's input which will bind to the loop node. + :type items: typing.Union[list, dict, str, ~azure.ai.ml.entities._job.pipeline._io.NodeOutput, + ~azure.ai.ml.entities._job.pipeline._io.PipelineInput] + :param max_concurrency: Maximum number of concurrent iterations to run. All loop body nodes will be executed + in parallel if not specified. + :type max_concurrency: int + """ + + OUT_TYPE_MAPPING = { + AssetTypes.URI_FILE: AssetTypes.MLTABLE, + AssetTypes.URI_FOLDER: AssetTypes.MLTABLE, + AssetTypes.MLTABLE: AssetTypes.MLTABLE, + AssetTypes.MLFLOW_MODEL: AssetTypes.MLTABLE, + AssetTypes.TRITON_MODEL: AssetTypes.MLTABLE, + AssetTypes.CUSTOM_MODEL: AssetTypes.MLTABLE, + # legacy path support + "path": AssetTypes.MLTABLE, + ComponentParameterTypes.NUMBER: ComponentParameterTypes.STRING, + ComponentParameterTypes.STRING: ComponentParameterTypes.STRING, + ComponentParameterTypes.BOOLEAN: ComponentParameterTypes.STRING, + ComponentParameterTypes.INTEGER: ComponentParameterTypes.STRING, + } + + def __init__( + self, + *, + body: "Pipeline", + items: Union[list, dict, str, PipelineInput, NodeOutput], + max_concurrency: Optional[int] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + + kwargs.pop("type", None) + super(ParallelFor, self).__init__( + type=ControlFlowType.PARALLEL_FOR, + body=body, + **kwargs, + ) + # loop body is incomplete in submission time, so won't validate required inputs + self.body._validate_required_input_not_provided = False + self._outputs: dict = {} + + actual_outputs = kwargs.get("outputs", {}) + # parallel for node shares output meta with body + try: + outputs = self.body._component.outputs + # transform body outputs to aggregate types when available + self._outputs = self._build_outputs_dict( + outputs=actual_outputs, output_definition_dict=self._convert_output_meta(outputs) + ) + except AttributeError: + # when body output not available, create default output builder without meta + self._outputs = self._build_outputs_dict(outputs=actual_outputs) + + self._items = items + + self.max_concurrency = max_concurrency + + @property + def outputs(self) -> Dict[str, Union[str, Output]]: + """Get the outputs of the parallel for loop. + + :return: The dictionary containing the outputs of the parallel for loop. + :rtype: dict[str, Union[str, ~azure.ai.ml.Output]] + """ + return self._outputs + + @property + def items(self) -> Union[list, dict, str, PipelineInput, NodeOutput]: + """Get the loop body's input which will bind to the loop node. + + :return: The input for the loop body. + :rtype: typing.Union[list, dict, str, ~azure.ai.ml.entities._job.pipeline._io.NodeOutput, + ~azure.ai.ml.entities._job.pipeline._io.PipelineInput] + """ + return self._items + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema: + return ParallelForSchema(context=context) + + @classmethod + def _attr_type_map(cls) -> dict: + return { + **super(ParallelFor, cls)._attr_type_map(), + "items": (dict, list, str, PipelineInput, NodeOutput), + } + + @classmethod + # pylint: disable-next=docstring-missing-param + def _to_rest_item(cls, item: dict) -> dict: + """Convert item to rest object. + + :return: The rest object + :rtype: dict + """ + primitive_inputs, asset_inputs = {}, {} + # validate item + for key, val in item.items(): + if isinstance(val, Input): + asset_inputs[key] = val + elif isinstance(val, (PipelineInput, NodeOutput)): + # convert binding object to string + primitive_inputs[key] = str(val) + else: + primitive_inputs[key] = val + return { + # asset type inputs will be converted to JobInput dict: + # {"asset_param": {"uri": "xxx", "job_input_type": "uri_file"}} + **cls._input_entity_to_rest_inputs(input_entity=asset_inputs), + # primitive inputs has primitive type value like this + # {"int_param": 1} + **primitive_inputs, + } + + @classmethod + # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + def _to_rest_items(cls, items: Union[list, dict, str, NodeOutput, PipelineInput]) -> str: + """Convert items to rest object.""" + # validate items. + cls._validate_items(items=items, raise_error=True, body_component=None) + result: str = "" + # convert items to rest object + if isinstance(items, list): + rest_items_list = [cls._to_rest_item(item=i) for i in items] + result = json.dumps(rest_items_list) + elif isinstance(items, dict): + rest_items_dict = {k: cls._to_rest_item(item=v) for k, v in items.items()} + result = json.dumps(rest_items_dict) + elif isinstance(items, (NodeOutput, PipelineInput)): + result = str(items) + elif isinstance(items, str): + result = items + else: + raise UserErrorException("Unsupported items type: {}".format(type(items))) + return result + + def _to_rest_object(self, **kwargs: Any) -> dict: + """Convert self to a rest object for remote call. + + :return: The rest object + :rtype: dict + """ + rest_node = super(ParallelFor, self)._to_rest_object(**kwargs) + # convert items to rest object + rest_items = self._to_rest_items(items=self.items) + rest_node.update({"items": rest_items, "outputs": self._to_rest_outputs()}) + # TODO: Bug Item number: 2897665 + res: dict = convert_ordered_dict_to_dict(rest_node) # type: ignore + return res + + @classmethod + # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + def _from_rest_item(cls, rest_item: Any) -> Dict: + """Convert rest item to item.""" + primitive_inputs, asset_inputs = {}, {} + for key, val in rest_item.items(): + if isinstance(val, dict) and val.get("job_input_type"): + asset_inputs[key] = val + else: + primitive_inputs[key] = val + return {**cls._from_rest_inputs(inputs=asset_inputs), **primitive_inputs} + + @classmethod + # pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + def _from_rest_items(cls, rest_items: str) -> Union[dict, list, str]: + """Convert items from rest object.""" + try: + items = json.loads(rest_items) + except json.JSONDecodeError: + # return original items when failed to load + return rest_items + if isinstance(items, list): + return [cls._from_rest_item(rest_item=i) for i in items] + if isinstance(items, dict): + return {k: cls._from_rest_item(rest_item=v) for k, v in items.items()} + return rest_items + + @classmethod + def _from_rest_object(cls, obj: dict, pipeline_jobs: dict) -> "ParallelFor": + # pylint: disable=protected-access + obj = BaseNode._from_rest_object_to_init_params(obj) + obj["items"] = cls._from_rest_items(rest_items=obj.get("items", "")) + return cls._create_instance_from_schema_dict(pipeline_jobs=pipeline_jobs, loaded_data=obj) + + @classmethod + def _create_instance_from_schema_dict(cls, pipeline_jobs: Dict, loaded_data: Dict, **kwargs: Any) -> "ParallelFor": + body_name = cls._get_data_binding_expression_value(loaded_data.pop("body"), regex=r"\{\{.*\.jobs\.(.*)\}\}") + + loaded_data["body"] = cls._get_body_from_pipeline_jobs(pipeline_jobs=pipeline_jobs, body_name=body_name) + return cls(**loaded_data, **kwargs) + + def _convert_output_meta(self, outputs: Dict[str, Union[NodeOutput, Output]]) -> Dict[str, Output]: + """Convert output meta to aggregate types. + + :param outputs: Output meta + :type outputs: Dict[str, Union[NodeOutput, Output]] + :return: Dictionary of aggregate types + :rtype: Dict[str, Output] + """ + # pylint: disable=protected-access + aggregate_outputs = {} + for name, output in outputs.items(): + if output.type in self.OUT_TYPE_MAPPING: + new_type = self.OUT_TYPE_MAPPING[output.type] + else: + # when loop body introduces some new output type, this will be raised as a reminder to support is in + # parallel for + raise UserErrorException( + "Referencing output with type {} is not supported in parallel_for node.".format(output.type) + ) + if isinstance(output, NodeOutput): + output = output._to_job_output() # type: ignore + if isinstance(output, Output): + out_dict = output._to_dict() + out_dict["type"] = new_type + resolved_output = Output(**out_dict) + else: + resolved_output = Output(type=new_type) + aggregate_outputs[name] = resolved_output + return aggregate_outputs + + def _customized_validate(self) -> MutableValidationResult: + """Customized validation for parallel for node. + + :return: The validation result + :rtype: MutableValidationResult + """ + # pylint: disable=protected-access + validation_result = self._validate_body() + validation_result.merge_with( + self._validate_items(items=self.items, raise_error=False, body_component=self.body._component) + ) + return validation_result + + @classmethod + def _validate_items( + cls, + items: Union[list, dict, str, NodeOutput, PipelineInput], + raise_error: bool = True, + body_component: Optional[Union[str, Component]] = None, + ) -> MutableValidationResult: + validation_result = cls._create_empty_validation_result() + if items is not None: + if isinstance(items, str): + # TODO: remove the validation + # try to deserialize str if it's a json string + try: + items = json.loads(items) + except json.JSONDecodeError as e: + if not is_data_binding_expression(items, ["parent"]): + validation_result.append_error( + yaml_path="items", + message=f"Items is neither a valid JSON string due to {e} or a binding string.", + ) + if isinstance(items, dict): + # Validate dict keys + items = list(items.values()) + if isinstance(items, list): + if len(items) > 0: + cls._validate_items_list(items, validation_result, body_component=body_component) + else: + validation_result.append_error(yaml_path="items", message="Items is an empty list/dict.") + else: + validation_result.append_error( + yaml_path="items", + message="Items is required for parallel_for node", + ) + return cls._try_raise(validation_result, raise_error=raise_error) + + @classmethod + def _validate_items_list( + cls, + items: list, + validation_result: MutableValidationResult, + body_component: Optional[Union[str, Component]] = None, + ) -> None: + meta: dict = {} + # all items have to be dict and have matched meta + for item in items: + # item has to be dict + # Note: item can be empty dict when loop_body don't have foreach inputs. + if not isinstance(item, dict): + validation_result.append_error( + yaml_path="items", + message=f"Items has to be list/dict of dict as value, " f"but got {type(item)} for {item}.", + ) + else: + # item has to have matched meta + if meta.keys() != item.keys(): + if not meta.keys(): + meta = item + else: + msg = f"Items should have same keys with body inputs, but got {item.keys()} and {meta.keys()}." + validation_result.append_error(yaml_path="items", message=msg) + # items' keys should appear in body's inputs + if isinstance(body_component, Component) and (not item.keys() <= body_component.inputs.keys()): + msg = f"Item {item} got unmatched inputs with loop body component inputs {body_component.inputs}." + validation_result.append_error(yaml_path="items", message=msg) + # validate item value type + cls._validate_item_value_type(item=item, validation_result=validation_result) + + @classmethod + def _validate_item_value_type(cls, item: dict, validation_result: MutableValidationResult) -> None: + supported_types = (Input, str, bool, int, float, PipelineInput) + for _, val in item.items(): + if not isinstance(val, supported_types): + validation_result.append_error( + yaml_path="items", + message="Unsupported type {} in parallel_for items. Supported types are: {}".format( + type(val), supported_types + ), + ) + if isinstance(val, Input): + cls._validate_input_item_value(entry=val, validation_result=validation_result) + + @classmethod + def _validate_input_item_value(cls, entry: Input, validation_result: MutableValidationResult) -> None: + if not isinstance(entry, Input): + return + if not entry.path: + validation_result.append_error( + yaml_path="items", + message=f"Input path not provided for {entry}.", + ) + if isinstance(entry.path, str) and os.path.exists(entry.path): + validation_result.append_error( + yaml_path="items", + message=f"Local file input {entry} is not supported, please create it as a dataset.", + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py new file mode 100644 index 00000000..a8f08d1e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py @@ -0,0 +1,285 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import os +from typing import Any, Dict, List, Optional, Union + +from azure.ai.ml.constants._component import ComponentSource +from azure.ai.ml.entities._component.parallel_component import ParallelComponent +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, +) +from azure.ai.ml.entities._deployment.deployment_settings import BatchRetrySettings +from azure.ai.ml.entities._job.parallel.run_function import RunFunction + +from .command_func import _parse_input, _parse_inputs_outputs, _parse_output +from .parallel import Parallel + + +def parallel_run_function( + *, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + properties: Optional[Dict] = None, + display_name: Optional[str] = None, + experiment_name: Optional[str] = None, + compute: Optional[str] = None, + retry_settings: Optional[BatchRetrySettings] = None, + environment_variables: Optional[Dict] = None, + logging_level: Optional[str] = None, + max_concurrency_per_instance: Optional[int] = None, + error_threshold: Optional[int] = None, + mini_batch_error_threshold: Optional[int] = None, + task: Optional[RunFunction] = None, + mini_batch_size: Optional[str] = None, + partition_keys: Optional[List] = None, + input_data: Optional[str] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + instance_count: Optional[int] = None, + instance_type: Optional[str] = None, + docker_args: Optional[str] = None, + shm_size: Optional[str] = None, + identity: Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]] = None, + is_deterministic: bool = True, + **kwargs: Any, +) -> Parallel: + """Create a Parallel object which can be used inside dsl.pipeline as a function and can also be created as a + standalone parallel job. + + For an example of using ParallelRunStep, see the notebook + https://aka.ms/parallel-example-notebook + + .. note:: + + To use parallel_run_function: + + * Create a :class:`azure.ai.ml.entities._builders.Parallel` object to specify how parallel run is performed, + with parameters to control batch size,number of nodes per compute target, and a + reference to your custom Python script. + + * Build pipeline with the parallel object as a function. defines inputs and + outputs for the step. + + * Sumbit the pipeline to run. + + .. code:: python + + from azure.ai.ml import Input, Output, parallel + + parallel_run = parallel_run_function( + name="batch_score_with_tabular_input", + display_name="Batch Score with Tabular Dataset", + description="parallel component for batch score", + inputs=dict( + job_data_path=Input( + type=AssetTypes.MLTABLE, + description="The data to be split and scored in parallel", + ), + score_model=Input( + type=AssetTypes.URI_FOLDER, description="The model for batch score." + ), + ), + outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), + input_data="${{inputs.job_data_path}}", + max_concurrency_per_instance=2, # Optional, default is 1 + mini_batch_size="100", # optional + mini_batch_error_threshold=5, # Optional, allowed failed count on mini batch items, default is -1 + logging_level="DEBUG", # Optional, default is INFO + error_threshold=5, # Optional, allowed failed count totally, default is -1 + retry_settings=dict(max_retries=2, timeout=60), # Optional + task=RunFunction( + code="./src", + entry_script="tabular_batch_inference.py", + environment=Environment( + image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04", + conda_file="./src/environment_parallel.yml", + ), + program_arguments="--model ${{inputs.score_model}}", + append_row_to="${{outputs.job_output_path}}", # Optional, if not set, summary_only + ), + ) + + :keyword name: Name of the parallel job or component created. + :paramtype name: str + :keyword description: A friendly description of the parallel. + :paramtype description: str + :keyword tags: Tags to be attached to this parallel. + :paramtype tags: Dict + :keyword properties: The asset property dictionary. + :paramtype properties: Dict + :keyword display_name: A friendly name. + :paramtype display_name: str + :keyword experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + Will be ignored as a pipeline step. + :paramtype experiment_name: str + :keyword compute: The name of the compute where the parallel job is executed (will not be used + if the parallel is used as a component/function). + :paramtype compute: str + :keyword retry_settings: Parallel component run failed retry + :paramtype retry_settings: ~azure.ai.ml.entities._deployment.deployment_settings.BatchRetrySettings + :keyword environment_variables: A dictionary of environment variables names and values. + These environment variables are set on the process + where user script is being executed. + :paramtype environment_variables: Dict[str, str] + :keyword logging_level: A string of the logging level name, which is defined in 'logging'. + Possible values are 'WARNING', 'INFO', and 'DEBUG'. (optional, default value is 'INFO'.) + This value could be set through PipelineParameter. + :paramtype logging_level: str + :keyword max_concurrency_per_instance: The max parallellism that each compute instance has. + :paramtype max_concurrency_per_instance: int + :keyword error_threshold: The number of record failures for Tabular Dataset and file failures for File Dataset + that should be ignored during processing. + If the error count goes above this value, then the job will be aborted. + Error threshold is for the entire input rather + than the individual mini-batch sent to run() method. + The range is [-1, int.max]. -1 indicates ignore all failures during processing + :paramtype error_threshold: int + :keyword mini_batch_error_threshold: The number of mini batch processing failures should be ignored + :paramtype mini_batch_error_threshold: int + :keyword task: The parallel task + :paramtype task: ~azure.ai.ml.entities._job.parallel.run_function.RunFunction + :keyword mini_batch_size: For FileDataset input, + this field is the number of files a user script can process in one run() call. + For TabularDataset input, this field is the approximate size of data + the user script can process in one run() call. + Example values are 1024, 1024KB, 10MB, and 1GB. + (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) + This value could be set through PipelineParameter. + :paramtype mini_batch_size: str + :keyword partition_keys: The keys used to partition dataset into mini-batches. If specified, + the data with the same key will be partitioned into the same mini-batch. + If both partition_keys and mini_batch_size are specified, + the partition keys will take effect. + The input(s) must be partitioned dataset(s), + and the partition_keys must be a subset of the keys of every input dataset for this to work + :paramtype partition_keys: List + :keyword input_data: The input data. + :paramtype input_data: str + :keyword inputs: A dict of inputs used by this parallel. + :paramtype inputs: Dict + :keyword outputs: The outputs of this parallel + :paramtype outputs: Dict + :keyword instance_count: Optional number of instances or nodes used by the compute target. + Defaults to 1 + :paramtype instance_count: int + :keyword instance_type: Optional type of VM used as supported by the compute target.. + :paramtype instance_type: str + :keyword docker_args: Extra arguments to pass to the Docker run command. + This would override any parameters that have already been set by the system, + or in this section. + This parameter is only supported for Azure ML compute types. + :paramtype docker_args: str + :keyword shm_size: Size of the docker container's shared memory block. + This should be in the format of (number)(unit) where number as to be greater than 0 + and the unit can be one of b(bytes), k(kilobytes), m(megabytes), or g(gigabytes). + :paramtype shm_size: str + :keyword identity: Identity that PRS job will use while running on compute. + :paramtype identity: Optional[Union[ + ~azure.ai.ml.entities.ManagedIdentityConfiguration, + ~azure.ai.ml.entities.AmlTokenConfiguration, + ~azure.ai.ml.entities.UserIdentityConfiguration]] + :keyword is_deterministic: Specify whether the parallel will return same output given same input. + If a parallel (component) is deterministic, when use it as a node/step in a pipeline, + it will reuse results from a previous submitted job in current workspace + which has same inputs and settings. + In this case, this step will not use any compute resource. Defaults to True, + specify is_deterministic=False if you would like to avoid such reuse behavior, + defaults to True. + :paramtype is_deterministic: bool + :return: The parallel node + :rtype: ~azure.ai.ml._builders.parallel.Parallel + """ + # pylint: disable=too-many-locals + inputs = inputs or {} + outputs = outputs or {} + component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) + # job inputs can not be None + job_inputs = {k: v for k, v in job_inputs.items() if v is not None} + component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) + + component = kwargs.pop("component", None) + + if component is None: + if task is None: + component = ParallelComponent( + base_path=os.getcwd(), # base path should be current folder + name=name, + tags=tags, + code=None, + display_name=display_name, + description=description, + inputs=component_inputs, + outputs=component_outputs, + retry_settings=retry_settings, # type: ignore[arg-type] + logging_level=logging_level, + max_concurrency_per_instance=max_concurrency_per_instance, + error_threshold=error_threshold, + mini_batch_error_threshold=mini_batch_error_threshold, + task=task, + mini_batch_size=mini_batch_size, + partition_keys=partition_keys, + input_data=input_data, + _source=ComponentSource.BUILDER, + is_deterministic=is_deterministic, + **kwargs, + ) + else: + component = ParallelComponent( + base_path=os.getcwd(), # base path should be current folder + name=name, + tags=tags, + code=task.code, + display_name=display_name, + description=description, + inputs=component_inputs, + outputs=component_outputs, + retry_settings=retry_settings, # type: ignore[arg-type] + logging_level=logging_level, + max_concurrency_per_instance=max_concurrency_per_instance, + error_threshold=error_threshold, + mini_batch_error_threshold=mini_batch_error_threshold, + task=task, + mini_batch_size=mini_batch_size, + partition_keys=partition_keys, + input_data=input_data, + _source=ComponentSource.BUILDER, + is_deterministic=is_deterministic, + **kwargs, + ) + + parallel_obj = Parallel( + component=component, + name=name, + description=description, + tags=tags, + properties=properties, + display_name=display_name, + experiment_name=experiment_name, + compute=compute, + inputs=job_inputs, + outputs=job_outputs, + identity=identity, + environment_variables=environment_variables, + retry_settings=retry_settings, # type: ignore[arg-type] + logging_level=logging_level, + max_concurrency_per_instance=max_concurrency_per_instance, + error_threshold=error_threshold, + mini_batch_error_threshold=mini_batch_error_threshold, + task=task, + mini_batch_size=mini_batch_size, + partition_keys=partition_keys, + input_data=input_data, + **kwargs, + ) + + if instance_count is not None or instance_type is not None or docker_args is not None or shm_size is not None: + parallel_obj.set_resources( + instance_count=instance_count, instance_type=instance_type, docker_args=docker_args, shm_size=shm_size + ) + + return parallel_obj diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py new file mode 100644 index 00000000..188d9044 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/pipeline.py @@ -0,0 +1,225 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import logging +from enum import Enum +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast + +from marshmallow import Schema + +from azure.ai.ml.entities._component.component import Component, NodeType +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job.job import Job +from azure.ai.ml.entities._validation import MutableValidationResult + +from ..._schema import PathAwareSchema +from .._job.pipeline.pipeline_job_settings import PipelineJobSettings +from .._util import convert_ordered_dict_to_dict, copy_output_setting, validate_attribute_type +from .base_node import BaseNode + +if TYPE_CHECKING: + from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob + +module_logger = logging.getLogger(__name__) + + +class Pipeline(BaseNode): + """Base class for pipeline node, used for pipeline component version consumption. You should not instantiate this + class directly. Instead, you should use @pipeline decorator to create a pipeline node. + + :param component: Id or instance of the pipeline component/job to be run for the step. + :type component: Union[~azure.ai.ml.entities.Component, str] + :param inputs: Inputs of the pipeline node. + :type inputs: Optional[Dict[str, Union[ + ~azure.ai.ml.entities.Input, + str, bool, int, float, Enum, "Input"]]]. + :param outputs: Outputs of the pipeline node. + :type outputs: Optional[Dict[str, Union[str, ~azure.ai.ml.entities.Output, "Output"]]] + :param settings: Setting of pipeline node, only taking effect for root pipeline job. + :type settings: Optional[~azure.ai.ml.entities._job.pipeline.pipeline_job_settings.PipelineJobSettings] + """ + + def __init__( + self, + *, + component: Union[Component, str], + inputs: Optional[ + Dict[ + str, + Union[ + Input, + str, + bool, + int, + float, + Enum, + "Input", + ], + ] + ] = None, + outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None, + settings: Optional[PipelineJobSettings] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + kwargs.pop("type", None) + + BaseNode.__init__( + self, + type=NodeType.PIPELINE, + component=component, + inputs=inputs, + outputs=outputs, + **kwargs, + ) + # copy pipeline component output's setting to node level + self._copy_pipeline_component_out_setting_to_node() + self._settings: Optional[PipelineJobSettings] = None + self.settings = settings + + @property + def component(self) -> Optional[Union[str, Component]]: + """Id or instance of the pipeline component/job to be run for the step. + + :return: Id or instance of the pipeline component/job. + :rtype: Union[str, ~azure.ai.ml.entities.Component] + """ + res: Union[str, Component] = self._component + return res + + @property + def settings(self) -> Optional[PipelineJobSettings]: + """Settings of the pipeline. + + Note: settings is available only when create node as a job. + i.e. ml_client.jobs.create_or_update(node). + + :return: Settings of the pipeline. + :rtype: ~azure.ai.ml.entities.PipelineJobSettings + """ + if self._settings is None: + self._settings = PipelineJobSettings() + return self._settings + + @settings.setter + def settings(self, value: Union[PipelineJobSettings, Dict]) -> None: + """Set the settings of the pipeline. + + :param value: The settings of the pipeline. + :type value: Union[~azure.ai.ml.entities.PipelineJobSettings, dict] + :raises TypeError: If the value is not an instance of PipelineJobSettings or a dict. + """ + if value is not None: + if isinstance(value, PipelineJobSettings): + # since PipelineJobSettings inherit _AttrDict, we need add this branch to distinguish with dict + pass + elif isinstance(value, dict): + value = PipelineJobSettings(**value) + else: + raise TypeError("settings must be PipelineJobSettings or dict but got {}".format(type(value))) + self._settings = value + + @classmethod + def _get_supported_inputs_types(cls) -> None: + # Return None here to skip validation, + # as input could be custom class object(parameter group). + return None + + @property + def _skip_required_compute_missing_validation(self) -> bool: + return True + + @classmethod + def _get_skip_fields_in_schema_validation(cls) -> List[str]: + # pipeline component must be a file reference when loading from yaml, + # so the created object can't pass schema validation. + return ["component"] + + @classmethod + def _attr_type_map(cls) -> dict: + # Use local import to avoid recursive reference as BaseNode is imported in PipelineComponent. + from azure.ai.ml.entities import PipelineComponent + + return { + "component": (str, PipelineComponent), + } + + def _to_job(self) -> "PipelineJob": + from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob + + return PipelineJob( + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + properties=self.properties, + # Filter None out to avoid case below failed with conflict keys check: + # group: None (user not specified) + # group.xx: 1 (user specified + inputs={k: v for k, v in self._job_inputs.items() if v}, + outputs=self._job_outputs, + component=self.component, + settings=self.settings, + ) + + def _customized_validate(self) -> MutableValidationResult: + """Check unsupported settings when use as a node. + + :return: The validation result + :rtype: MutableValidationResult + """ + # Note: settings is not supported on node, + # jobs.create_or_update(node) will call node._to_job() at first, + # thus won't reach here. + # pylint: disable=protected-access + from azure.ai.ml.entities import PipelineComponent + + validation_result = super(Pipeline, self)._customized_validate() + ignored_keys = PipelineComponent._check_ignored_keys(self) + if ignored_keys: + validation_result.append_warning(message=f"{ignored_keys} ignored on node {self.name!r}.") + if isinstance(self.component, PipelineComponent): + validation_result.merge_with(self.component._customized_validate()) + return validation_result + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj: Dict = super()._to_rest_object(**kwargs) + rest_obj.update( + convert_ordered_dict_to_dict( + { + "componentId": self._get_component_id(), + } + ) + ) + return rest_obj + + def _build_inputs(self) -> Dict: + inputs = super(Pipeline, self)._build_inputs() + built_inputs = {} + # Validate and remove non-specified inputs + for key, value in inputs.items(): + if value is not None: + built_inputs[key] = value + return built_inputs + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline.pipeline_component import PipelineSchema + + return PipelineSchema(context=context) + + def _copy_pipeline_component_out_setting_to_node(self) -> None: + """Copy pipeline component output's setting to node level.""" + from azure.ai.ml.entities import PipelineComponent + from azure.ai.ml.entities._job.pipeline._io import NodeOutput + + if not isinstance(self.component, PipelineComponent): + return + for key, val in self.component.outputs.items(): + node_output = cast(NodeOutput, self.outputs.get(key)) + copy_output_setting(source=val, target=node_output) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job": + raise NotImplementedError() diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark.py new file mode 100644 index 00000000..e72f1334 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark.py @@ -0,0 +1,663 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access, too-many-instance-attributes + +import copy +import logging +import re +from enum import Enum +from os import PathLike, path +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union, cast + +from marshmallow import INCLUDE, Schema + +from ..._restclient.v2023_04_01_preview.models import JobBase as JobBaseData +from ..._restclient.v2023_04_01_preview.models import SparkJob as RestSparkJob +from ..._schema import NestedField, PathAwareSchema, UnionField +from ..._schema.job.identity import AMLTokenIdentitySchema, ManagedIdentitySchema, UserIdentitySchema +from ..._schema.job.parameterized_spark import CONF_KEY_MAP +from ..._schema.job.spark_job import SparkJobSchema +from ..._utils.utils import is_url +from ...constants._common import ( + ARM_ID_PREFIX, + BASE_PATH_CONTEXT_KEY, + REGISTRY_URI_FORMAT, + SPARK_ENVIRONMENT_WARNING_MESSAGE, +) +from ...constants._component import NodeType +from ...constants._job.job import SparkConfKey +from ...entities._assets import Environment +from ...entities._component.component import Component +from ...entities._component.spark_component import SparkComponent +from ...entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, + _BaseJobIdentityConfiguration, +) +from ...entities._inputs_outputs import Input, Output +from ...entities._job._input_output_helpers import ( + from_rest_data_outputs, + from_rest_inputs_to_dataset_literal, + validate_inputs_for_args, +) +from ...entities._job.spark_job import SparkJob +from ...entities._job.spark_job_entry import SparkJobEntryType +from ...entities._job.spark_resource_configuration import SparkResourceConfiguration +from ...entities._validation import MutableValidationResult +from ...exceptions import ErrorCategory, ErrorTarget, ValidationException +from .._job.pipeline._io import NodeOutput +from .._job.spark_helpers import ( + _validate_compute_or_resources, + _validate_input_output_mode, + _validate_spark_configurations, +) +from .._job.spark_job_entry_mixin import SparkJobEntry, SparkJobEntryMixin +from .._util import convert_ordered_dict_to_dict, get_rest_dict_for_node_attrs, load_from_dict, validate_attribute_type +from .base_node import BaseNode + +module_logger = logging.getLogger(__name__) + + +class Spark(BaseNode, SparkJobEntryMixin): + """Base class for spark node, used for spark component version consumption. + + You should not instantiate this class directly. Instead, you should + create it from the builder function: spark. + + :param component: The ID or instance of the Spark component or job to be run during the step. + :type component: Union[str, ~azure.ai.ml.entities.SparkComponent] + :param identity: The identity that the Spark job will use while running on compute. + :type identity: Union[Dict[str, str], + ~azure.ai.ml.entities.ManagedIdentityConfiguration, + ~azure.ai.ml.entities.AmlTokenConfiguration, + ~azure.ai.ml.entities.UserIdentityConfiguration + + ] + + :param driver_cores: The number of cores to use for the driver process, only in cluster mode. + :type driver_cores: int + :param driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit + suffix ("k", "m", "g" or "t") (e.g. "512m", "2g"). + :type driver_memory: str + :param executor_cores: The number of cores to use on each executor. + :type executor_cores: int + :param executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit + suffix ("k", "m", "g" or "t") (e.g. "512m", "2g"). + :type executor_memory: str + :param executor_instances: The initial number of executors. + :type executor_instances: int + :param dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of + executors registered with this application up and down based on the workload. + :type dynamic_allocation_enabled: bool + :param dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation + is enabled. + :type dynamic_allocation_min_executors: int + :param dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation + is enabled. + :type dynamic_allocation_max_executors: int + :param conf: A dictionary with pre-defined Spark configurations key and values. + :type conf: Dict[str, str] + :param inputs: A mapping of input names to input data sources used in the job. + :type inputs: Dict[str, Union[ + str, + bool, + int, + float, + Enum, + ~azure.ai.ml.entities._job.pipeline._io.NodeOutput, + ~azure.ai.ml.Input + + ]] + + :param outputs: A mapping of output names to output data sources used in the job. + :type outputs: Dict[str, Union[str, ~azure.ai.ml.Output]] + :param args: The arguments for the job. + :type args: str + :param compute: The compute resource the job runs on. + :type compute: str + :param resources: The compute resource configuration for the job. + :type resources: Union[Dict, ~azure.ai.ml.entities.SparkResourceConfiguration] + :param entry: The file or class entry point. + :type entry: Dict[str, str] + :param py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps. + :type py_files: List[str] + :param jars: The list of .JAR files to include on the driver and executor classpaths. + :type jars: List[str] + :param files: The list of files to be placed in the working directory of each executor. + :type files: List[str] + :param archives: The list of archives to be extracted into the working directory of each executor. + :type archives: List[str] + """ + + def __init__( + self, + *, + component: Union[str, SparkComponent], + identity: Optional[ + Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration] + ] = None, + driver_cores: Optional[Union[int, str]] = None, + driver_memory: Optional[str] = None, + executor_cores: Optional[Union[int, str]] = None, + executor_memory: Optional[str] = None, + executor_instances: Optional[Union[int, str]] = None, + dynamic_allocation_enabled: Optional[Union[bool, str]] = None, + dynamic_allocation_min_executors: Optional[Union[int, str]] = None, + dynamic_allocation_max_executors: Optional[Union[int, str]] = None, + conf: Optional[Dict[str, str]] = None, + inputs: Optional[ + Dict[ + str, + Union[ + NodeOutput, + Input, + str, + bool, + int, + float, + Enum, + "Input", + ], + ] + ] = None, + outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None, + compute: Optional[str] = None, + resources: Optional[Union[Dict, SparkResourceConfiguration]] = None, + entry: Union[Dict[str, str], SparkJobEntry, None] = None, + py_files: Optional[List[str]] = None, + jars: Optional[List[str]] = None, + files: Optional[List[str]] = None, + archives: Optional[List[str]] = None, + args: Optional[str] = None, + **kwargs: Any, + ) -> None: + # validate init params are valid type + validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map()) + kwargs.pop("type", None) + + BaseNode.__init__( + self, type=NodeType.SPARK, inputs=inputs, outputs=outputs, component=component, compute=compute, **kwargs + ) + + # init mark for _AttrDict + self._init = True + SparkJobEntryMixin.__init__(self, entry=entry) + self.conf = conf + self.driver_cores = driver_cores + self.driver_memory = driver_memory + self.executor_cores = executor_cores + self.executor_memory = executor_memory + self.executor_instances = executor_instances + self.dynamic_allocation_enabled = dynamic_allocation_enabled + self.dynamic_allocation_min_executors = dynamic_allocation_min_executors + self.dynamic_allocation_max_executors = dynamic_allocation_max_executors + + is_spark_component = isinstance(component, SparkComponent) + if is_spark_component: + # conf is dict and we need copy component conf here, otherwise node conf setting will affect component + # setting + _component = cast(SparkComponent, component) + self.conf = self.conf or copy.copy(_component.conf) + self.driver_cores = self.driver_cores or _component.driver_cores + self.driver_memory = self.driver_memory or _component.driver_memory + self.executor_cores = self.executor_cores or _component.executor_cores + self.executor_memory = self.executor_memory or _component.executor_memory + self.executor_instances = self.executor_instances or _component.executor_instances + self.dynamic_allocation_enabled = self.dynamic_allocation_enabled or _component.dynamic_allocation_enabled + self.dynamic_allocation_min_executors = ( + self.dynamic_allocation_min_executors or _component.dynamic_allocation_min_executors + ) + self.dynamic_allocation_max_executors = ( + self.dynamic_allocation_max_executors or _component.dynamic_allocation_max_executors + ) + if self.executor_instances is None and str(self.dynamic_allocation_enabled).lower() == "true": + self.executor_instances = self.dynamic_allocation_min_executors + # When create standalone job or pipeline job, following fields will always get value from component or get + # default None, because we will not pass those fields to Spark. But in following cases, we expect to get + # correct value from spark._from_rest_object() and then following fields will get from their respective + # keyword arguments. + # 1. when we call regenerated_spark_node=Spark._from_rest_object(spark_node._to_rest_object()) in local test, + # we expect regenerated_spark_node and spark_node are identical. + # 2.when get created remote job through Job._from_rest_object(result) in job operation where component is an + # arm_id, we expect get remote returned values. + # 3.when we load a remote job, component now is an arm_id, we need get entry from node level returned from + # service + self.entry = _component.entry if is_spark_component else entry + self.py_files = _component.py_files if is_spark_component else py_files + self.jars = _component.jars if is_spark_component else jars + self.files = _component.files if is_spark_component else files + self.archives = _component.archives if is_spark_component else archives + self.args = _component.args if is_spark_component else args + self.environment: Any = _component.environment if is_spark_component else None + + self.resources = resources + self.identity = identity + self._swept = False + self._init = False + + @classmethod + def _get_supported_outputs_types(cls) -> Tuple: + return str, Output + + @property + def component(self) -> Union[str, SparkComponent]: + """The ID or instance of the Spark component or job to be run during the step. + + :rtype: ~azure.ai.ml.entities.SparkComponent + """ + res: Union[str, SparkComponent] = self._component + return res + + @property + def resources(self) -> Optional[Union[Dict, SparkResourceConfiguration]]: + """The compute resource configuration for the job. + + :rtype: ~azure.ai.ml.entities.SparkResourceConfiguration + """ + return self._resources # type: ignore + + @resources.setter + def resources(self, value: Optional[Union[Dict, SparkResourceConfiguration]]) -> None: + """Sets the compute resource configuration for the job. + + :param value: The compute resource configuration for the job. + :type value: Union[Dict[str, str], ~azure.ai.ml.entities.SparkResourceConfiguration] + """ + if isinstance(value, dict): + value = SparkResourceConfiguration(**value) + self._resources = value + + @property + def identity( + self, + ) -> Optional[Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]: + """The identity that the Spark job will use while running on compute. + + :rtype: Union[~azure.ai.ml.entities.ManagedIdentityConfiguration, ~azure.ai.ml.entities.AmlTokenConfiguration, + ~azure.ai.ml.entities.UserIdentityConfiguration] + """ + # If there is no identity from CLI/SDK input: for jobs running on synapse compute (MLCompute Clusters), the + # managed identity is the default; for jobs running on clusterless, the user identity should be the default, + # otherwise use user input identity. + if self._identity is None: + if self.compute is not None: + return ManagedIdentityConfiguration() + if self.resources is not None: + return UserIdentityConfiguration() + return self._identity + + @identity.setter + def identity( + self, + value: Union[Dict[str, str], ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration], + ) -> None: + """Sets the identity that the Spark job will use while running on compute. + + :param value: The identity that the Spark job will use while running on compute. + :type value: Union[Dict[str, str], ~azure.ai.ml.entities.ManagedIdentityConfiguration, + ~azure.ai.ml.entities.AmlTokenConfiguration, ~azure.ai.ml.entities.UserIdentityConfiguration] + """ + if isinstance(value, dict): + identify_schema = UnionField( + [ + NestedField(ManagedIdentitySchema, unknown=INCLUDE), + NestedField(AMLTokenIdentitySchema, unknown=INCLUDE), + NestedField(UserIdentitySchema, unknown=INCLUDE), + ] + ) + value = identify_schema._deserialize(value=value, attr=None, data=None) + self._identity = value + + @property + def code(self) -> Optional[Union[str, PathLike]]: + """The local or remote path pointing at source code. + + :rtype: Union[str, PathLike] + """ + if isinstance(self.component, Component): + _code: Optional[Union[str, PathLike]] = self.component.code + return _code + return None + + @code.setter + def code(self, value: str) -> None: + """Sets the source code to be used for the job. + + :param value: The local or remote path pointing at source code. + :type value: Union[str, PathLike] + """ + if isinstance(self.component, Component): + self.component.code = value + else: + msg = "Can't set code property for a registered component {}" + raise ValidationException( + message=msg.format(self.component), + no_personal_data_message=msg.format(self.component), + target=ErrorTarget.SPARK_JOB, + error_category=ErrorCategory.USER_ERROR, + ) + + @classmethod + def _from_rest_object_to_init_params(cls, obj: dict) -> Dict: + obj = super()._from_rest_object_to_init_params(obj) + + if "resources" in obj and obj["resources"]: + obj["resources"] = SparkResourceConfiguration._from_rest_object(obj["resources"]) + + if "identity" in obj and obj["identity"]: + obj["identity"] = _BaseJobIdentityConfiguration._from_rest_object(obj["identity"]) + + if "entry" in obj and obj["entry"]: + obj["entry"] = SparkJobEntry._from_rest_object(obj["entry"]) + if "conf" in obj and obj["conf"]: + # get conf setting value from conf + for field_name, _ in CONF_KEY_MAP.items(): + value = obj["conf"].get(field_name, None) + if value is not None: + obj[field_name] = value + + return obj + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Spark": + from .spark_func import spark + + loaded_data = load_from_dict(SparkJobSchema, data, context, additional_message, **kwargs) + spark_job: Spark = spark(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) + + return spark_job + + @classmethod + def _load_from_rest_job(cls, obj: JobBaseData) -> "Spark": + from .spark_func import spark + + rest_spark_job: RestSparkJob = obj.properties + rest_spark_conf = copy.copy(rest_spark_job.conf) or {} + + spark_job: Spark = spark( + name=obj.name, + id=obj.id, + entry=SparkJobEntry._from_rest_object(rest_spark_job.entry), + display_name=rest_spark_job.display_name, + description=rest_spark_job.description, + tags=rest_spark_job.tags, + properties=rest_spark_job.properties, + experiment_name=rest_spark_job.experiment_name, + services=rest_spark_job.services, + status=rest_spark_job.status, + creation_context=obj.system_data, + code=rest_spark_job.code_id, + compute=rest_spark_job.compute_id, + environment=rest_spark_job.environment_id, + identity=( + _BaseJobIdentityConfiguration._from_rest_object(rest_spark_job.identity) + if rest_spark_job.identity + else None + ), + args=rest_spark_job.args, + conf=rest_spark_conf, + driver_cores=rest_spark_conf.get( + SparkConfKey.DRIVER_CORES, None + ), # copy fields from conf into the promote attribute in spark + driver_memory=rest_spark_conf.get(SparkConfKey.DRIVER_MEMORY, None), + executor_cores=rest_spark_conf.get(SparkConfKey.EXECUTOR_CORES, None), + executor_memory=rest_spark_conf.get(SparkConfKey.EXECUTOR_MEMORY, None), + executor_instances=rest_spark_conf.get(SparkConfKey.EXECUTOR_INSTANCES, None), + dynamic_allocation_enabled=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None), + dynamic_allocation_min_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None), + dynamic_allocation_max_executors=rest_spark_conf.get(SparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None), + resources=SparkResourceConfiguration._from_rest_object(rest_spark_job.resources), + inputs=from_rest_inputs_to_dataset_literal(rest_spark_job.inputs), + outputs=from_rest_data_outputs(rest_spark_job.outputs), + ) + return spark_job + + @classmethod + def _attr_type_map(cls) -> dict: + return { + # hack: allow use InternalSparkComponent as component + # "component": (str, SparkComponent), + "environment": (str, Environment), + "resources": (dict, SparkResourceConfiguration), + "code": (str, PathLike), + } + + @property + def _skip_required_compute_missing_validation(self) -> bool: + return self.resources is not None + + def _to_job(self) -> SparkJob: + if isinstance(self.component, SparkComponent): + return SparkJob( + experiment_name=self.experiment_name, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + code=self.component.code, + entry=self.entry, + py_files=self.py_files, + jars=self.jars, + files=self.files, + archives=self.archives, + identity=self.identity, + driver_cores=self.driver_cores, + driver_memory=self.driver_memory, + executor_cores=self.executor_cores, + executor_memory=self.executor_memory, + executor_instances=self.executor_instances, + dynamic_allocation_enabled=self.dynamic_allocation_enabled, + dynamic_allocation_min_executors=self.dynamic_allocation_min_executors, + dynamic_allocation_max_executors=self.dynamic_allocation_max_executors, + conf=self.conf, + environment=self.environment, + status=self.status, + inputs=self._job_inputs, + outputs=self._job_outputs, + services=self.services, + args=self.args, + compute=self.compute, + resources=self.resources, + ) + + return SparkJob( + experiment_name=self.experiment_name, + name=self.name, + display_name=self.display_name, + description=self.description, + tags=self.tags, + code=self.component, + entry=self.entry, + py_files=self.py_files, + jars=self.jars, + files=self.files, + archives=self.archives, + identity=self.identity, + driver_cores=self.driver_cores, + driver_memory=self.driver_memory, + executor_cores=self.executor_cores, + executor_memory=self.executor_memory, + executor_instances=self.executor_instances, + dynamic_allocation_enabled=self.dynamic_allocation_enabled, + dynamic_allocation_min_executors=self.dynamic_allocation_min_executors, + dynamic_allocation_max_executors=self.dynamic_allocation_max_executors, + conf=self.conf, + environment=self.environment, + status=self.status, + inputs=self._job_inputs, + outputs=self._job_outputs, + services=self.services, + args=self.args, + compute=self.compute, + resources=self.resources, + ) + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline import SparkSchema + + return SparkSchema(context=context) + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return [ + "type", + "resources", + "py_files", + "jars", + "files", + "archives", + "identity", + "conf", + "args", + ] + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj: dict = super()._to_rest_object(**kwargs) + rest_obj.update( + convert_ordered_dict_to_dict( + { + "componentId": self._get_component_id(), + "identity": get_rest_dict_for_node_attrs(self.identity), + "resources": get_rest_dict_for_node_attrs(self.resources), + "entry": get_rest_dict_for_node_attrs(self.entry), + } + ) + ) + return rest_obj + + def _build_inputs(self) -> dict: + inputs = super(Spark, self)._build_inputs() + built_inputs = {} + # Validate and remove non-specified inputs + for key, value in inputs.items(): + if value is not None: + built_inputs[key] = value + return built_inputs + + def _customized_validate(self) -> MutableValidationResult: + result = super()._customized_validate() + if ( + isinstance(self.component, SparkComponent) + and isinstance(self.component._environment, Environment) + and self.component._environment.image is not None + ): + result.append_warning( + yaml_path="environment.image", + message=SPARK_ENVIRONMENT_WARNING_MESSAGE, + ) + result.merge_with(self._validate_entry_exist()) + result.merge_with(self._validate_fields()) + return result + + def _validate_entry_exist(self) -> MutableValidationResult: + is_remote_code = isinstance(self.code, str) and ( + self.code.startswith("git+") + or self.code.startswith(REGISTRY_URI_FORMAT) + or self.code.startswith(ARM_ID_PREFIX) + or is_url(self.code) + or bool(self.CODE_ID_RE_PATTERN.match(self.code)) + ) + validation_result = self._create_empty_validation_result() + # validate whether component entry exists to ensure code path is correct, especially when code is default value + if self.code is None or is_remote_code or not isinstance(self.entry, SparkJobEntry): + # skip validate when code is not a local path or code is None, or self.entry is not SparkJobEntry object + pass + else: + if not path.isabs(self.code): + _component: SparkComponent = self.component # type: ignore + code_path = Path(_component.base_path) / self.code + if code_path.exists(): + code_path = code_path.resolve().absolute() + else: + validation_result.append_error( + message=f"Code path {code_path} doesn't exist.", yaml_path="component.code" + ) + entry_path = code_path / self.entry.entry + else: + entry_path = Path(self.code) / self.entry.entry + + if ( + isinstance(self.entry, SparkJobEntry) + and self.entry.entry_type == SparkJobEntryType.SPARK_JOB_FILE_ENTRY + ): + if not entry_path.exists(): + validation_result.append_error( + message=f"Entry {entry_path} doesn't exist.", yaml_path="component.entry" + ) + return validation_result + + def _validate_fields(self) -> MutableValidationResult: + validation_result = self._create_empty_validation_result() + try: + _validate_compute_or_resources(self.compute, self.resources) + except ValidationException as e: + validation_result.append_error(message=str(e), yaml_path="resources") + validation_result.append_error(message=str(e), yaml_path="compute") + + try: + _validate_input_output_mode(self.inputs, self.outputs) + except ValidationException as e: + msg = str(e) + m = re.match(r"(Input|Output) '(\w+)'", msg) + if m: + io_type, io_name = m.groups() + if io_type == "Input": + validation_result.append_error(message=msg, yaml_path=f"inputs.{io_name}") + else: + validation_result.append_error(message=msg, yaml_path=f"outputs.{io_name}") + + try: + _validate_spark_configurations(self) + except ValidationException as e: + validation_result.append_error(message=str(e), yaml_path="conf") + + try: + self._validate_entry() + except ValidationException as e: + validation_result.append_error(message=str(e), yaml_path="entry") + + if self.args: + try: + validate_inputs_for_args(self.args, self.inputs) + except ValidationException as e: + validation_result.append_error(message=str(e), yaml_path="args") + return validation_result + + # pylint: disable-next=docstring-missing-param + def __call__(self, *args: Any, **kwargs: Any) -> "Spark": + """Call Spark as a function will return a new instance each time. + + :return: A Spark object + :rtype: Spark + """ + if isinstance(self._component, Component): + # call this to validate inputs + node: Spark = self._component(*args, **kwargs) + # merge inputs + for name, original_input in self.inputs.items(): + if name not in kwargs: + # use setattr here to make sure owner of input won't change + setattr(node.inputs, name, original_input._data) + node._job_inputs[name] = original_input._data + # get outputs + for name, original_output in self.outputs.items(): + # use setattr here to make sure owner of output won't change + if not isinstance(original_output, str): + setattr(node.outputs, name, original_output._data) + self._refine_optional_inputs_with_no_value(node, kwargs) + node._name = self.name + node.compute = self.compute + node.environment = copy.deepcopy(self.environment) + node.resources = copy.deepcopy(self.resources) + return node + + msg = "Spark can be called as a function only when referenced component is {}, currently got {}." + raise ValidationException( + message=msg.format(type(Component), self._component), + no_personal_data_message=msg.format(type(Component), "self._component"), + target=ErrorTarget.SPARK_JOB, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py new file mode 100644 index 00000000..342f8c44 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/spark_func.py @@ -0,0 +1,306 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access, too-many-locals + +import os +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +from azure.ai.ml._restclient.v2023_04_01_preview.models import AmlToken, ManagedIdentity, UserIdentity +from azure.ai.ml.constants._common import AssetTypes +from azure.ai.ml.constants._component import ComponentSource +from azure.ai.ml.entities import Environment +from azure.ai.ml.entities._component.spark_component import SparkComponent +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin +from azure.ai.ml.entities._job.spark_job_entry import SparkJobEntry +from azure.ai.ml.entities._job.spark_resource_configuration import SparkResourceConfiguration +from azure.ai.ml.exceptions import ErrorTarget, ValidationException + +from .spark import Spark + +SUPPORTED_INPUTS = [AssetTypes.URI_FILE, AssetTypes.URI_FOLDER, AssetTypes.MLTABLE] + + +def _parse_input(input_value: Union[Input, dict, str, bool, int, float]) -> Tuple: + component_input = None + job_input: Union[Input, dict, str, bool, int, float] = "" + + if isinstance(input_value, Input): + component_input = Input(**input_value._to_dict()) + input_type = input_value.type + if input_type in SUPPORTED_INPUTS: + job_input = Input(**input_value._to_dict()) + elif isinstance(input_value, dict): + # if user provided dict, we try to parse it to Input. + # for job input, only parse for path type + input_type = input_value.get("type", None) + if input_type in SUPPORTED_INPUTS: + job_input = Input(**input_value) + component_input = Input(**input_value) + elif isinstance(input_value, (str, bool, int, float)): + # Input bindings are not supported + component_input = ComponentTranslatableMixin._to_input_builder_function(input_value) + job_input = input_value + else: + msg = f"Unsupported input type: {type(input_value)}, only Input, dict, str, bool, int and float are supported." + raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB) + return component_input, job_input + + +def _parse_output(output_value: Union[Output, dict]) -> Tuple: + component_output = None + job_output: Union[Output, dict] = {} + + if isinstance(output_value, Output): + component_output = Output(**output_value._to_dict()) + job_output = Output(**output_value._to_dict()) + elif not output_value: + # output value can be None or empty dictionary + # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder + component_output = ComponentTranslatableMixin._to_output(output_value) + job_output = output_value + elif isinstance(output_value, dict): # When output value is a non-empty dictionary + job_output = Output(**output_value) + component_output = Output(**output_value) + elif isinstance(output_value, str): # When output is passed in from pipeline job yaml + job_output = output_value + else: + msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported." + raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB) + return component_output, job_output + + +def _parse_inputs_outputs(io_dict: Dict, parse_func: Callable) -> Tuple[Dict, Dict]: + component_io_dict, job_io_dict = {}, {} + if io_dict: + for key, val in io_dict.items(): + component_io, job_io = parse_func(val) + component_io_dict[key] = component_io + job_io_dict[key] = job_io + return component_io_dict, job_io_dict + + +def spark( + *, + experiment_name: Optional[str] = None, + name: Optional[str] = None, + display_name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + code: Optional[Union[str, os.PathLike]] = None, + entry: Union[Dict[str, str], SparkJobEntry, None] = None, + py_files: Optional[List[str]] = None, + jars: Optional[List[str]] = None, + files: Optional[List[str]] = None, + archives: Optional[List[str]] = None, + identity: Optional[Union[Dict[str, str], ManagedIdentity, AmlToken, UserIdentity]] = None, + driver_cores: Optional[int] = None, + driver_memory: Optional[str] = None, + executor_cores: Optional[int] = None, + executor_memory: Optional[str] = None, + executor_instances: Optional[int] = None, + dynamic_allocation_enabled: Optional[bool] = None, + dynamic_allocation_min_executors: Optional[int] = None, + dynamic_allocation_max_executors: Optional[int] = None, + conf: Optional[Dict[str, str]] = None, + environment: Optional[Union[str, Environment]] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + args: Optional[str] = None, + compute: Optional[str] = None, + resources: Optional[Union[Dict, SparkResourceConfiguration]] = None, + **kwargs: Any, +) -> Spark: + """Creates a Spark object which can be used inside a dsl.pipeline function or used as a standalone Spark job. + + :keyword experiment_name: The name of the experiment the job will be created under. + :paramtype experiment_name: Optional[str] + :keyword name: The name of the job. + :paramtype name: Optional[str] + :keyword display_name: The job display name. + :paramtype display_name: Optional[str] + :keyword description: The description of the job. Defaults to None. + :paramtype description: Optional[str] + :keyword tags: The dictionary of tags for the job. Tags can be added, removed, and updated. Defaults to None. + :paramtype tags: Optional[dict[str, str]] + :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url + pointing to a remote location. + :type code: Optional[Union[str, os.PathLike]] + :keyword entry: The file or class entry point. + :paramtype entry: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkJobEntry]] + :keyword py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps. + Defaults to None. + :paramtype py_files: Optional[List[str]] + :keyword jars: The list of .JAR files to include on the driver and executor classpaths. Defaults to None. + :paramtype jars: Optional[List[str]] + :keyword files: The list of files to be placed in the working directory of each executor. Defaults to None. + :paramtype files: Optional[List[str]] + :keyword archives: The list of archives to be extracted into the working directory of each executor. + Defaults to None. + :paramtype archives: Optional[List[str]] + :keyword identity: The identity that the Spark job will use while running on compute. + :paramtype identity: Optional[Union[ + dict[str, str], + ~azure.ai.ml.entities.ManagedIdentityConfiguration, + ~azure.ai.ml.entities.AmlTokenConfiguration, + ~azure.ai.ml.entities.UserIdentityConfiguration]] + :keyword driver_cores: The number of cores to use for the driver process, only in cluster mode. + :paramtype driver_cores: Optional[int] + :keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit + suffix ("k", "m", "g" or "t") (e.g. "512m", "2g"). + :paramtype driver_memory: Optional[str] + :keyword executor_cores: The number of cores to use on each executor. + :paramtype executor_cores: Optional[int] + :keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit + suffix ("k", "m", "g" or "t") (e.g. "512m", "2g"). + :paramtype executor_memory: Optional[str] + :keyword executor_instances: The initial number of executors. + :paramtype executor_instances: Optional[int] + :keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of + executors registered with this application up and down based on the workload. + :paramtype dynamic_allocation_enabled: Optional[bool] + :keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is + enabled. + :paramtype dynamic_allocation_min_executors: Optional[int] + :keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is + enabled. + :paramtype dynamic_allocation_max_executors: Optional[int] + :keyword conf: A dictionary with pre-defined Spark configurations key and values. Defaults to None. + :paramtype conf: Optional[dict[str, str]] + :keyword environment: The Azure ML environment to run the job in. + :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]] + :keyword inputs: A mapping of input names to input data used in the job. Defaults to None. + :paramtype inputs: Optional[dict[str, ~azure.ai.ml.Input]] + :keyword outputs: A mapping of output names to output data used in the job. Defaults to None. + :paramtype outputs: Optional[dict[str, ~azure.ai.ml.Output]] + :keyword args: The arguments for the job. + :paramtype args: Optional[str] + :keyword compute: The compute resource the job runs on. + :paramtype compute: Optional[str] + :keyword resources: The compute resource configuration for the job. + :paramtype resources: Optional[Union[dict, ~azure.ai.ml.entities.SparkResourceConfiguration]] + :return: A Spark object. + :rtype: ~azure.ai.ml.entities.Spark + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_spark_configurations.py + :start-after: [START spark_function_configuration_1] + :end-before: [END spark_function_configuration_1] + :language: python + :dedent: 8 + :caption: Configuring a SparkJob. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_spark_configurations.py + :start-after: [START spark_function_configuration_2] + :end-before: [END spark_function_configuration_2] + :language: python + :dedent: 8 + :caption: Configuring a SparkJob. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_spark_configurations.py + :start-after: [START spark_dsl_pipeline] + :end-before: [END spark_dsl_pipeline] + :language: python + :dedent: 8 + :caption: Building a Spark pipeline using the DSL pipeline decorator + """ + + inputs = inputs or {} + outputs = outputs or {} + component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) + # job inputs can not be None + job_inputs = {k: v for k, v in job_inputs.items() if v is not None} + component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) + component = kwargs.pop("component", None) + + if component is None: + component = SparkComponent( + name=name, + display_name=display_name, + tags=tags, + description=description, + code=code, + entry=entry, + py_files=py_files, + jars=jars, + files=files, + archives=archives, + driver_cores=driver_cores, + driver_memory=driver_memory, + executor_cores=executor_cores, + executor_memory=executor_memory, + executor_instances=executor_instances, + dynamic_allocation_enabled=dynamic_allocation_enabled, + dynamic_allocation_min_executors=dynamic_allocation_min_executors, + dynamic_allocation_max_executors=dynamic_allocation_max_executors, + conf=conf, + environment=environment, + inputs=component_inputs, + outputs=component_outputs, + args=args, + _source=ComponentSource.BUILDER, + **kwargs, + ) + if isinstance(component, SparkComponent): + spark_obj = Spark( + experiment_name=experiment_name, + name=name, + display_name=display_name, + tags=tags, + description=description, + component=component, + identity=identity, + driver_cores=driver_cores, + driver_memory=driver_memory, + executor_cores=executor_cores, + executor_memory=executor_memory, + executor_instances=executor_instances, + dynamic_allocation_enabled=dynamic_allocation_enabled, + dynamic_allocation_min_executors=dynamic_allocation_min_executors, + dynamic_allocation_max_executors=dynamic_allocation_max_executors, + conf=conf, + inputs=job_inputs, + outputs=job_outputs, + compute=compute, + resources=resources, + **kwargs, + ) + else: + # when we load a remote job, component now is an arm_id, we need get entry from node level returned from + # service + spark_obj = Spark( + experiment_name=experiment_name, + name=name, + display_name=display_name, + tags=tags, + description=description, + component=component, + identity=identity, + driver_cores=driver_cores, + driver_memory=driver_memory, + executor_cores=executor_cores, + executor_memory=executor_memory, + executor_instances=executor_instances, + dynamic_allocation_enabled=dynamic_allocation_enabled, + dynamic_allocation_min_executors=dynamic_allocation_min_executors, + dynamic_allocation_max_executors=dynamic_allocation_max_executors, + conf=conf, + inputs=job_inputs, + outputs=job_outputs, + compute=compute, + resources=resources, + entry=entry, + py_files=py_files, + jars=jars, + files=files, + archives=archives, + args=args, + **kwargs, + ) + return spark_obj diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/subcomponents.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/subcomponents.py new file mode 100644 index 00000000..9b9ed5d2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/subcomponents.py @@ -0,0 +1,59 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# This file contains mldesigner decorator-produced components +# that are used within node constructors. Keep imports and +# general complexity in this file to a minimum. + +from typing import List + +from mldesigner import Output, command_component + +from azure.ai.ml.constants._common import DefaultOpenEncoding + + +def save_mltable_yaml(path: str, mltable_paths: List[str]) -> None: + """Save MLTable YAML. + + :param path: The path to save the MLTable YAML file. + :type path: str + :param mltable_paths: List of paths to be included in the MLTable. + :type mltable_paths: List[str] + :raises ValueError: If the given path points to a file. + """ + import os + + path = os.path.abspath(path) + + if os.path.isfile(path): + raise ValueError(f"The given path {path} points to a file.") + + if not os.path.exists(path): + os.makedirs(path, exist_ok=True) + + save_path = os.path.join(path, "MLTable") + # Do not touch - this is MLTable syntax that is needed to mount these paths + # To the MLTable's inputs + mltable_file_content = "\n".join(["paths:"] + [f"- folder : {path}" for path in mltable_paths]) + + with open(save_path, "w", encoding=DefaultOpenEncoding.WRITE) as f: + f.write(mltable_file_content) + + +# TODO 2293610: add support for more types of outputs besides uri_folder and mltable +@command_component() +def create_scatter_output_table(aggregated_output: Output(type="mltable"), **kwargs: str) -> Output: # type: ignore + """Create scatter output table. + + This function is used by the FL scatter gather node to reduce a dynamic number of silo outputs + into a single input for the user-supplied aggregation step. + + :param aggregated_output: The aggregated output MLTable. + :type aggregated_output: ~mldesigner.Output(type="mltable") + + Keyword arguments represent input names and URI folder paths. + """ + # kwargs keys are inputs names (ex: silo_output_silo_1) + # values are uri_folder paths + save_mltable_yaml(aggregated_output, list(kwargs.values())) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/sweep.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/sweep.py new file mode 100644 index 00000000..603babbe --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/sweep.py @@ -0,0 +1,454 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access + +import logging +from typing import Any, Dict, List, Optional, Tuple, Union + +import pydash +from marshmallow import EXCLUDE, Schema + +from azure.ai.ml._schema._sweep.sweep_fields_provider import EarlyTerminationField +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY +from azure.ai.ml.constants._component import NodeType +from azure.ai.ml.constants._job.sweep import SearchSpace +from azure.ai.ml.entities._component.command_component import CommandComponent +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, +) +from azure.ai.ml.entities._inputs_outputs import Input, Output +from azure.ai.ml.entities._job.job_limits import SweepJobLimits +from azure.ai.ml.entities._job.job_resource_configuration import JobResourceConfiguration +from azure.ai.ml.entities._job.pipeline._io import NodeInput +from azure.ai.ml.entities._job.queue_settings import QueueSettings +from azure.ai.ml.entities._job.sweep.early_termination_policy import ( + BanditPolicy, + EarlyTerminationPolicy, + MedianStoppingPolicy, + TruncationSelectionPolicy, +) +from azure.ai.ml.entities._job.sweep.objective import Objective +from azure.ai.ml.entities._job.sweep.parameterized_sweep import ParameterizedSweep +from azure.ai.ml.entities._job.sweep.sampling_algorithm import SamplingAlgorithm +from azure.ai.ml.entities._job.sweep.search_space import ( + Choice, + LogNormal, + LogUniform, + Normal, + QLogNormal, + QLogUniform, + QNormal, + QUniform, + Randint, + SweepDistribution, + Uniform, +) +from azure.ai.ml.exceptions import ErrorTarget, UserErrorException, ValidationErrorType, ValidationException +from azure.ai.ml.sweep import SweepJob + +from ..._restclient.v2022_10_01.models import ComponentVersion +from ..._schema import PathAwareSchema +from ..._schema._utils.data_binding_expression import support_data_binding_expression_for_fields +from ..._utils.utils import camel_to_snake +from .base_node import BaseNode + +module_logger = logging.getLogger(__name__) + + +class Sweep(ParameterizedSweep, BaseNode): + """Base class for sweep node. + + This class should not be instantiated directly. Instead, it should be created via the builder function: sweep. + + :param trial: The ID or instance of the command component or job to be run for the step. + :type trial: Union[~azure.ai.ml.entities.CommandComponent, str] + :param compute: The compute definition containing the compute information for the step. + :type compute: str + :param limits: The limits for the sweep node. + :type limits: ~azure.ai.ml.sweep.SweepJobLimits + :param sampling_algorithm: The sampling algorithm to use to sample inside the search space. + Accepted values are: "random", "grid", or "bayesian". + :type sampling_algorithm: str + :param objective: The objective used to determine the target run with the local optimal + hyperparameter in search space. + :type objective: ~azure.ai.ml.sweep.Objective + :param early_termination_policy: The early termination policy of the sweep node. + :type early_termination_policy: Union[ + + ~azure.mgmt.machinelearningservices.models.BanditPolicy, + ~azure.mgmt.machinelearningservices.models.MedianStoppingPolicy, + ~azure.mgmt.machinelearningservices.models.TruncationSelectionPolicy + + ] + + :param search_space: The hyperparameter search space to run trials in. + :type search_space: Dict[str, Union[ + + ~azure.ai.ml.entities.Choice, + ~azure.ai.ml.entities.LogNormal, + ~azure.ai.ml.entities.LogUniform, + ~azure.ai.ml.entities.Normal, + ~azure.ai.ml.entities.QLogNormal, + ~azure.ai.ml.entities.QLogUniform, + ~azure.ai.ml.entities.QNormal, + ~azure.ai.ml.entities.QUniform, + ~azure.ai.ml.entities.Randint, + ~azure.ai.ml.entities.Uniform + + ]] + + :param inputs: Mapping of input data bindings used in the job. + :type inputs: Dict[str, Union[ + + ~azure.ai.ml.Input, + + str, + bool, + int, + float + + ]] + + :param outputs: Mapping of output data bindings used in the job. + :type outputs: Dict[str, Union[str, ~azure.ai.ml.Output]] + :param identity: The identity that the training job will use while running on compute. + :type identity: Union[ + + ~azure.ai.ml.ManagedIdentityConfiguration, + ~azure.ai.ml.AmlTokenConfiguration, + ~azure.ai.ml.UserIdentityConfiguration + + ] + + :param queue_settings: The queue settings for the job. + :type queue_settings: ~azure.ai.ml.entities.QueueSettings + :param resources: Compute Resource configuration for the job. + :type resources: Optional[Union[dict, ~azure.ai.ml.entities.ResourceConfiguration]] + """ + + def __init__( + self, + *, + trial: Optional[Union[CommandComponent, str]] = None, + compute: Optional[str] = None, + limits: Optional[SweepJobLimits] = None, + sampling_algorithm: Optional[Union[str, SamplingAlgorithm]] = None, + objective: Optional[Objective] = None, + early_termination: Optional[ + Union[BanditPolicy, MedianStoppingPolicy, TruncationSelectionPolicy, EarlyTerminationPolicy, str] + ] = None, + search_space: Optional[ + Dict[ + str, + Union[ + Choice, LogNormal, LogUniform, Normal, QLogNormal, QLogUniform, QNormal, QUniform, Randint, Uniform + ], + ] + ] = None, + inputs: Optional[Dict[str, Union[Input, str, bool, int, float]]] = None, + outputs: Optional[Dict[str, Union[str, Output]]] = None, + identity: Optional[ + Union[Dict, ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration] + ] = None, + queue_settings: Optional[QueueSettings] = None, + resources: Optional[Union[dict, JobResourceConfiguration]] = None, + **kwargs: Any, + ) -> None: + # TODO: get rid of self._job_inputs, self._job_outputs once we have general Input + self._job_inputs, self._job_outputs = inputs, outputs + + kwargs.pop("type", None) + BaseNode.__init__( + self, + type=NodeType.SWEEP, + component=trial, + inputs=inputs, + outputs=outputs, + compute=compute, + **kwargs, + ) + # init mark for _AttrDict + self._init = True + ParameterizedSweep.__init__( + self, + sampling_algorithm=sampling_algorithm, + objective=objective, + limits=limits, + early_termination=early_termination, + search_space=search_space, + queue_settings=queue_settings, + resources=resources, + ) + + self.identity: Any = identity + self._init = False + + @property + def trial(self) -> CommandComponent: + """The ID or instance of the command component or job to be run for the step. + + :rtype: ~azure.ai.ml.entities.CommandComponent + """ + res: CommandComponent = self._component + return res + + @property + def search_space( + self, + ) -> Optional[ + Dict[ + str, + Union[Choice, LogNormal, LogUniform, Normal, QLogNormal, QLogUniform, QNormal, QUniform, Randint, Uniform], + ] + ]: + """Dictionary of the hyperparameter search space. + + Each key is the name of a hyperparameter and its value is the parameter expression. + + :rtype: Dict[str, Union[~azure.ai.ml.entities.Choice, ~azure.ai.ml.entities.LogNormal, + ~azure.ai.ml.entities.LogUniform, ~azure.ai.ml.entities.Normal, ~azure.ai.ml.entities.QLogNormal, + ~azure.ai.ml.entities.QLogUniform, ~azure.ai.ml.entities.QNormal, ~azure.ai.ml.entities.QUniform, + ~azure.ai.ml.entities.Randint, ~azure.ai.ml.entities.Uniform]] + """ + return self._search_space + + @search_space.setter + def search_space(self, values: Dict[str, Dict[str, Union[str, int, float, dict]]]) -> None: + """Sets the search space for the sweep job. + + :param values: The search space to set. + :type values: Dict[str, Dict[str, Union[str, int, float, dict]]] + """ + search_space: Dict = {} + for name, value in values.items(): + # If value is a SearchSpace object, directly pass it to job.search_space[name] + search_space[name] = self._value_type_to_class(value) if isinstance(value, dict) else value + self._search_space = search_space + + @classmethod + def _value_type_to_class(cls, value: Any) -> Dict: + value_type = value["type"] + search_space_dict = { + SearchSpace.CHOICE: Choice, + SearchSpace.RANDINT: Randint, + SearchSpace.LOGNORMAL: LogNormal, + SearchSpace.NORMAL: Normal, + SearchSpace.LOGUNIFORM: LogUniform, + SearchSpace.UNIFORM: Uniform, + SearchSpace.QLOGNORMAL: QLogNormal, + SearchSpace.QNORMAL: QNormal, + SearchSpace.QLOGUNIFORM: QLogUniform, + SearchSpace.QUNIFORM: QUniform, + } + + res: dict = search_space_dict[value_type](**value) + return res + + @classmethod + def _get_supported_inputs_types(cls) -> Tuple: + supported_types = super()._get_supported_inputs_types() or () + return ( + SweepDistribution, + *supported_types, + ) + + @classmethod + def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Sweep": + raise NotImplementedError("Sweep._load_from_dict is not supported") + + @classmethod + def _picked_fields_from_dict_to_rest_object(cls) -> List[str]: + return [ + "limits", + "sampling_algorithm", + "objective", + "early_termination", + "search_space", + "queue_settings", + "resources", + ] + + def _to_rest_object(self, **kwargs: Any) -> dict: + rest_obj: dict = super(Sweep, self)._to_rest_object(**kwargs) + # hack: ParameterizedSweep.early_termination is not allowed to be None + for key in ["early_termination"]: + if key in rest_obj and rest_obj[key] is None: + del rest_obj[key] + + # hack: only early termination policy does not follow yaml schema now, should be removed after server-side made + # the change + if "early_termination" in rest_obj: + _early_termination: EarlyTerminationPolicy = self.early_termination # type: ignore + rest_obj["early_termination"] = _early_termination._to_rest_object().as_dict() + + rest_obj.update( + { + "type": self.type, + "trial": self._get_trial_component_rest_obj(), + } + ) + return rest_obj + + @classmethod + def _from_rest_object_to_init_params(cls, obj: dict) -> Dict: + obj = super()._from_rest_object_to_init_params(obj) + + # hack: only early termination policy does not follow yaml schema now, should be removed after server-side made + # the change + if "early_termination" in obj and "policy_type" in obj["early_termination"]: + # can't use _from_rest_object here, because obj is a dict instead of an EarlyTerminationPolicy rest object + obj["early_termination"]["type"] = camel_to_snake(obj["early_termination"].pop("policy_type")) + + # TODO: use cls._get_schema() to load from rest object + from azure.ai.ml._schema._sweep.parameterized_sweep import ParameterizedSweepSchema + + schema = ParameterizedSweepSchema(context={BASE_PATH_CONTEXT_KEY: "./"}) + support_data_binding_expression_for_fields(schema, ["type", "component", "trial"]) + + base_sweep = schema.load(obj, unknown=EXCLUDE, partial=True) + for key, value in base_sweep.items(): + obj[key] = value + + # trial + trial_component_id = pydash.get(obj, "trial.componentId", None) + obj["trial"] = trial_component_id # check this + + return obj + + def _get_trial_component_rest_obj(self) -> Union[Dict, ComponentVersion, None]: + # trial component to rest object is different from usual component + trial_component_id = self._get_component_id() + if trial_component_id is None: + return None + if isinstance(trial_component_id, str): + return {"componentId": trial_component_id} + if isinstance(trial_component_id, CommandComponent): + return trial_component_id._to_rest_object() + raise UserErrorException(f"invalid trial in sweep node {self.name}: {str(self.trial)}") + + def _to_job(self) -> SweepJob: + command = self.trial.command + if self.search_space is not None: + for key, _ in self.search_space.items(): + if command is not None: + # Double curly brackets to escape + command = command.replace(f"${{{{inputs.{key}}}}}", f"${{{{search_space.{key}}}}}") + + # TODO: raise exception when the trial is a pre-registered component + if command != self.trial.command and isinstance(self.trial, CommandComponent): + self.trial.command = command + + return SweepJob( + name=self.name, + display_name=self.display_name, + description=self.description, + properties=self.properties, + tags=self.tags, + experiment_name=self.experiment_name, + trial=self.trial, + compute=self.compute, + sampling_algorithm=self.sampling_algorithm, + search_space=self.search_space, + limits=self.limits, + early_termination=self.early_termination, # type: ignore[arg-type] + objective=self.objective, + inputs=self._job_inputs, + outputs=self._job_outputs, + identity=self.identity, + queue_settings=self.queue_settings, + resources=self.resources, + ) + + @classmethod + def _get_component_attr_name(cls) -> str: + return "trial" + + def _build_inputs(self) -> Dict: + inputs = super(Sweep, self)._build_inputs() + built_inputs = {} + # Validate and remove non-specified inputs + for key, value in inputs.items(): + if value is not None: + built_inputs[key] = value + + return built_inputs + + @classmethod + def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]: + from azure.ai.ml._schema.pipeline.component_job import SweepSchema + + return SweepSchema(context=context) + + @classmethod + def _get_origin_inputs_and_search_space(cls, built_inputs: Optional[Dict[str, NodeInput]]) -> Tuple: + """Separate mixed true inputs & search space definition from inputs of + this node and return them. + + Input will be restored to Input/LiteralInput before returned. + + :param built_inputs: The built inputs + :type built_inputs: Optional[Dict[str, NodeInput]] + :return: A tuple of the inputs and search space + :rtype: Tuple[ + Dict[str, Union[Input, str, bool, int, float]], + Dict[str, SweepDistribution], + ] + """ + search_space: Dict = {} + inputs: Dict = {} + if built_inputs is not None: + for input_name, input_obj in built_inputs.items(): + if isinstance(input_obj, NodeInput): + if isinstance(input_obj._data, SweepDistribution): + search_space[input_name] = input_obj._data + else: + inputs[input_name] = input_obj._data + else: + msg = "unsupported built input type: {}: {}" + raise ValidationException( + message=msg.format(input_name, type(input_obj)), + no_personal_data_message=msg.format("[input_name]", type(input_obj)), + target=ErrorTarget.SWEEP_JOB, + error_type=ValidationErrorType.INVALID_VALUE, + ) + return inputs, search_space + + def _is_input_set(self, input_name: str) -> bool: + if super(Sweep, self)._is_input_set(input_name): + return True + return self.search_space is not None and input_name in self.search_space + + def __setattr__(self, key: Any, value: Any) -> None: + super(Sweep, self).__setattr__(key, value) + if key == "early_termination" and isinstance(self.early_termination, BanditPolicy): + # only one of slack_amount and slack_factor can be specified but default value is 0.0. + # Need to keep track of which one is null. + if self.early_termination.slack_amount == 0.0: + self.early_termination.slack_amount = None # type: ignore[assignment] + if self.early_termination.slack_factor == 0.0: + self.early_termination.slack_factor = None # type: ignore[assignment] + + @property + def early_termination(self) -> Optional[Union[str, EarlyTerminationPolicy]]: + """The early termination policy for the sweep job. + + :rtype: Union[str, ~azure.ai.ml.sweep.BanditPolicy, ~azure.ai.ml.sweep.MedianStoppingPolicy, + ~azure.ai.ml.sweep.TruncationSelectionPolicy] + """ + return self._early_termination + + @early_termination.setter + def early_termination(self, value: Optional[Union[str, EarlyTerminationPolicy]]) -> None: + """Sets the early termination policy for the sweep job. + + :param value: The early termination policy for the sweep job. + :type value: Union[~azure.ai.ml.sweep.BanditPolicy, ~azure.ai.ml.sweep.MedianStoppingPolicy, + ~azure.ai.ml.sweep.TruncationSelectionPolicy, dict[str, Union[str, float, int, bool]]] + """ + if isinstance(value, dict): + early_termination_schema = EarlyTerminationField() + value = early_termination_schema._deserialize(value=value, attr=None, data=None) + self._early_termination = value # type: ignore[assignment] |
