diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/dsl | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/dsl')
17 files changed, 2436 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py new file mode 100644 index 00000000..72c77310 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py @@ -0,0 +1,9 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) + +from azure.ai.ml.dsl._pipeline_decorator import pipeline + +__all__ = ["pipeline"] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py new file mode 100644 index 00000000..69547cd1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py @@ -0,0 +1,103 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +from typing import Any, Callable, List, Mapping + +from azure.ai.ml.dsl._dynamic import KwParameter, create_kw_function_from_parameters +from azure.ai.ml.entities import Component as ComponentEntity +from azure.ai.ml.entities._builders import Command +from azure.ai.ml.entities._component.datatransfer_component import DataTransferImportComponent + + +def get_dynamic_input_parameter(inputs: Mapping) -> List: + """Return the dynamic parameter of the definition's input ports. + + :param inputs: The mapping of input names to input objects. + :type inputs: Mapping + :return: The list of dynamic parameters. + :rtype: List[~azure.ai.ml.dsl._dynamic.KwParameter] + """ + return [ + KwParameter( + name=name, + annotation=input._get_python_builtin_type_str(), + default=None, + _type=input._get_python_builtin_type_str(), + ) + for name, input in inputs.items() + ] + + +def get_dynamic_source_parameter(source: Any) -> List: + """Return the dynamic parameter of the definition's source port. + + :param source: The source object. + :type source: Any + :return: The list of dynamic parameters. + :rtype: List[~azure.ai.ml.dsl._dynamic.KwParameter] + """ + return [ + KwParameter( + name="source", + annotation=source.type, + default=None, + _type=source.type, + ) + ] + + +def to_component_func(entity: ComponentEntity, component_creation_func: Callable) -> Callable[..., Command]: + """Convert a ComponentEntity to a callable component function. + + :param entity: The ComponentEntity to convert. + :type entity: ~azure.ai.ml.entities.Component + :param component_creation_func: The function for creating a component. + :type component_creation_func: Callable + :return: The callable component function. + :rtype: Callable[..., ~azure.ai.ml.entities._builders.Command] + """ + func_name = "[component] {}".format(entity.display_name) + + func_docstring_lines = [] + if entity.description is not None: + func_docstring_lines.append(entity.description.strip()) + + if isinstance(entity, DataTransferImportComponent): + all_params = get_dynamic_source_parameter(entity.source) + else: + all_params = get_dynamic_input_parameter(entity.inputs) + + flattened_group_keys = [] + # Flatten all group parameters, for function parameter validation. + from azure.ai.ml.entities._inputs_outputs import GroupInput + + for name, item in entity.inputs.items(): + if isinstance(item, GroupInput): + flattened_group_keys.extend(list(item.flatten(group_parameter_name=name).keys())) + + doc_string = entity.description + # Try add yaml to doc string + try: + yaml_str = entity._yaml_str if entity._yaml_str else entity._to_yaml() + doc_string = "{0}\n\nComponent yaml:\n```yaml\n{1}\n```".format(doc_string, yaml_str) + except Exception: # pylint: disable=W0718 + pass + + params_assignment_str = ", ".join([f"{param.name}=xxx" for param in all_params]) + example = f"component_func({params_assignment_str})" + + dynamic_func = create_kw_function_from_parameters( + component_creation_func, + documentation=str(doc_string), + parameters=all_params, + func_name=func_name, + flattened_group_keys=flattened_group_keys, + ) + + # Bug Item number: 2883188 + dynamic_func._func_calling_example = example # type: ignore + dynamic_func._has_parameters = bool(all_params) # type: ignore + return dynamic_func diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py new file mode 100644 index 00000000..20c46169 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py @@ -0,0 +1,75 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import List, Optional, Union + +from azure.ai.ml.entities._builders import BaseNode +from azure.ai.ml.entities._builders.condition_node import ConditionNode +from azure.ai.ml.entities._job.pipeline._io import InputOutputBase +from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression +from azure.ai.ml.exceptions import UserErrorException + +# pylint: disable=redefined-outer-name + + +def condition( + condition: Union[str, bool, InputOutputBase, BaseNode, PipelineExpression], + *, + true_block: Optional[Union[BaseNode, List[BaseNode]]] = None, + false_block: Optional[Union[BaseNode, List[BaseNode]]] = None, +) -> ConditionNode: + """Create a condition node to provide runtime condition graph experience. + + Below is an example of using an expression result to control which step is executed. + If the pipeline parameter 'int_param1' is greater than 'int_param2', then 'true_step' will be executed, + otherwise, the 'false_step' will be executed. + + .. code-block:: python + + @dsl.pipeline + def pipeline_func(int_param1: int, int_param2: int): + true_step = component_func() + false_step = another_component_func() + dsl.condition( + int_param1 > int_param2, + true_block=true_step, + false_block=false_step + ) + + :param condition: The condition of the execution flow. + The value could be a boolean type control output, a node with exactly one boolean type output, + or a pipeline expression. + :type condition: Union[ + str, + bool, + ~azure.ai.ml.entities._job.pipeline._io.InputOutputBase, + ~azure.ai.ml.entities._builders.BaseNode, + ~azure.ai.ml.entities._job.pipeline._pipeline_expression.PipelineExpression] + :keyword true_block: The block to be executed if the condition resolves to True. + :paramtype true_block: Union[ + ~azure.ai.ml.entities._builders.BaseNode, + List[~azure.ai.ml.entities._builders.BaseNode]] + :keyword false_block: The block to be executed if the condition resolves to False. + :paramtype false_block: Union[ + ~azure.ai.ml.entities._builders.BaseNode, + List[~azure.ai.ml.entities._builders.BaseNode]] + :return: The condition node. + :rtype: ConditionNode + :raises UserErrorException: Raised if the condition node has an incorrect number of outputs. + """ + # resolve expression as command component + if isinstance(condition, PipelineExpression): + condition = condition.resolve() + if isinstance(condition, BaseNode): + if len(condition.outputs) != 1: + error_message = ( + f"Exactly one output is expected for condition node, {len(condition.outputs)} outputs found." + ) + raise UserErrorException(message=error_message, no_personal_data_message=error_message) + condition = list(condition.outputs.values())[0] + return ConditionNode( + condition=condition, + true_block=true_block, # type: ignore[arg-type] + false_block=false_block, # type: ignore[arg-type] + _from_component_func=True, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py new file mode 100644 index 00000000..461c8c4c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py @@ -0,0 +1,4 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +VALID_NAME_CHARS = set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._-") diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py new file mode 100644 index 00000000..59ce50e9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py @@ -0,0 +1,101 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import Dict, Optional, Union + +from azure.ai.ml.entities._builders import Command +from azure.ai.ml.entities._builders.do_while import DoWhile +from azure.ai.ml.entities._builders.pipeline import Pipeline +from azure.ai.ml.entities._inputs_outputs import Output +from azure.ai.ml.entities._job.pipeline._io import NodeOutput + + +def do_while( + body: Union[Pipeline, Command], mapping: Dict, max_iteration_count: int, condition: Optional[Output] = None +) -> DoWhile: + """Build a do_while node by specifying the loop body, output-input mapping, and termination condition. + + .. note:: + The following example shows how to use the `do_while` function to create a pipeline with a `do_while` node. + + .. code-block:: python + + from azure.ai.ml.dsl import pipeline + from mldesigner.dsl import do_while + + @pipeline() + def your_do_while_body(): + pass + + @pipeline() + def pipeline_with_do_while_node(): + do_while_body = your_do_while_body() + do_while_node = do_while( + body=do_while_body, + condition=do_while_body.outputs.condition_output, + mapping={ + do_while_body.outputs.output1: do_while_body_inputs.input1, + do_while_body.outputs.output2: [ + do_while_body_inputs.input2, + do_while_body_inputs.input3, + ], + }, + ) + # Connect to the do_while_node outputs + component = component_func( + input1=do_while_body.outputs.output1, input2=do_while_body.outputs.output2 + ) + + :param body: The pipeline job or command node for the do-while loop body. + :type body: Union[~azure.ai.ml.entities._builders.pipeline.Pipeline, ~azure.ai.ml.entities._builders.Command] + :param mapping: The output-input mapping for each round of the do-while loop. + The key is the last round's output of the body, and the value is the input port for the current body. + :type mapping: Dict[ + Union[str, ~azure.ai.ml.entities.Output], + Union[str, ~azure.ai.ml.entities.Input, List]] + :param max_iteration_count: The limit on running the do-while node. + :type max_iteration_count: int + :param condition: The name of a boolean output of the body. + The do-while loop stops if its value is evaluated to be negative. + If not specified, it handles as a while-true loop. + :type condition: ~azure.ai.ml.entities.Output + :return: The do-while node. + :rtype: ~azure.ai.ml.entities._builders.do_while.DoWhile + """ + do_while_node = DoWhile( + body=body, + condition=condition, # type: ignore[arg-type] + mapping=mapping, + _from_component_func=True, + ) + do_while_node.set_limits(max_iteration_count=max_iteration_count) + + def _infer_and_update_body_input_from_mapping() -> None: + # pylint: disable=protected-access + for source_output, body_input in mapping.items(): + # handle case that mapping key is a NodeOutput + output_name = source_output._port_name if isinstance(source_output, NodeOutput) else source_output + # if loop body output type is not specified, skip as we have no place to infer + if body.outputs[output_name].type is None: + continue + # body input can be a list of inputs, normalize as a list to process + if not isinstance(body_input, list): + body_input = [body_input] + for single_input in body_input: + # if input type is specified, no need to infer and skip + if single_input.type is not None: + continue + inferred_type = body.outputs[output_name].type + # update node input + single_input._meta._is_inferred_optional = True + single_input.type = inferred_type + # update node corresponding component input + input_name = single_input._meta.name + body.component.inputs[input_name]._is_inferred_optional = True # type: ignore[union-attr] + body.component.inputs[input_name].type = inferred_type # type: ignore[union-attr] + + # when mapping is a dictionary, infer and update for dynamic input + if isinstance(mapping, dict): + _infer_and_update_body_input_from_mapping() + + return do_while_node diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py new file mode 100644 index 00000000..3fcb42fb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py @@ -0,0 +1,192 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import logging +import types +from inspect import Parameter, Signature +from typing import Any, Callable, Dict, Sequence, cast + +from azure.ai.ml.entities import Component +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, UnexpectedKeywordError, ValidationException + +module_logger = logging.getLogger(__name__) + + +class KwParameter(Parameter): + """A keyword-only parameter with a default value. + + :param name: The name of the parameter. + :type name: str + :param default: The default value of the parameter. + :param annotation: The annotation type of the parameter, defaults to `Parameter.empty`. + :type annotation: Any + :param _type: The type of the parameter, defaults to "str". + :type _type: str + :param _optional: Indicates if the parameter is optional, defaults to False. + :type _optional: bool + """ + + def __init__( + self, name: str, default: Any, annotation: Any = Parameter.empty, _type: str = "str", _optional: bool = False + ) -> None: + super().__init__(name, Parameter.KEYWORD_ONLY, default=default, annotation=annotation) + self._type = _type + self._optional = _optional + + +def _replace_function_name(func: types.FunctionType, new_name: str) -> types.FunctionType: + """Replaces the name of a function with a new name + + :param func: The function to update + :type func: types.FunctionType + :param new_name: The new function name + :type new_name: str + :return: The function with a replaced name, but otherwise unchanged body + :rtype: types.FunctionType + """ + try: + # Use the original code of the function to initialize a new code object for the new function. + code_template = func.__code__ + # For python>=3.8, it is recommended to use `CodeType.replace`, since the interface is change in py3.8 + # See https://github.com/python/cpython/blob/384621c42f9102e31ba2c47feba144af09c989e5/Objects/codeobject.c#L646 + # The interface has been changed in py3.8, so the CodeType initializing code is invalid. + # See https://github.com/python/cpython/blob/384621c42f9102e31ba2c47feba144af09c989e5/Objects/codeobject.c#L446 + if hasattr(code_template, "replace"): + code = code_template.replace(co_name=new_name) + else: + # Before python<3.8, replace is not available, we can only initialize the code as following. + # https://github.com/python/cpython/blob/v3.7.8/Objects/codeobject.c#L97 + + # Bug Item number: 2881688 + code = types.CodeType( # type: ignore + code_template.co_argcount, + code_template.co_kwonlyargcount, + code_template.co_nlocals, + code_template.co_stacksize, + code_template.co_flags, + code_template.co_code, # type: ignore + code_template.co_consts, # type: ignore + code_template.co_names, + code_template.co_varnames, + code_template.co_filename, # type: ignore + new_name, # Use the new name for the new code object. + code_template.co_firstlineno, # type: ignore + code_template.co_lnotab, # type: ignore + # The following two values are required for closures. + code_template.co_freevars, # type: ignore + code_template.co_cellvars, # type: ignore + ) + # Initialize a new function with the code object and the new name, see the following ref for more details. + # https://github.com/python/cpython/blob/4901fe274bc82b95dc89bcb3de8802a3dfedab32/Objects/clinic/funcobject.c.h#L30 + return types.FunctionType( + code, + globals=func.__globals__, + name=new_name, + argdefs=func.__defaults__, + # Closure must be set to make sure free variables work. + closure=func.__closure__, + ) + except BaseException: # pylint: disable=W0718 + # If the dynamic replacing failed in corner cases, simply set the two fields. + func.__name__ = func.__qualname__ = new_name + return func + + +# pylint: disable-next=docstring-missing-param +def _assert_arg_valid(kwargs: dict, keys: list, func_name: str) -> None: + """Assert the arg keys are all in keys.""" + # pylint: disable=protected-access + # validate component input names + Component._validate_io_names(kwargs, raise_error=True) + lower2original_parameter_names = {x.lower(): x for x in keys} + kwargs_need_to_update = [] + for key in kwargs: + if key not in keys: + lower_key = key.lower() + if lower_key in lower2original_parameter_names: + # record key that need to update + kwargs_need_to_update.append(key) + if key != lower_key: + # raise warning if name not match sanitize version + module_logger.warning( + "Component input name %s, treat it as %s", key, lower2original_parameter_names[lower_key] + ) + else: + raise UnexpectedKeywordError(func_name=func_name, keyword=key, keywords=keys) + # update kwargs to align with yaml definition + for key in kwargs_need_to_update: + kwargs[lower2original_parameter_names[key.lower()]] = kwargs.pop(key) + + +def _update_dct_if_not_exist(dst: Dict, src: Dict) -> None: + """Computes the union of `src` and `dst`, in-place within `dst` + + If a key exists in `dst` and `src` the value in `dst` is preserved + + :param dst: The destination to compute the union within + :type dst: Dict + :param src: A dictionary to include in the union + :type src: Dict + """ + for k, v in src.items(): + if k not in dst: + dst[k] = v + + +def create_kw_function_from_parameters( + func: Callable, + parameters: Sequence[Parameter], + flattened_group_keys: list, + func_name: str, + documentation: str, +) -> Callable: + """Create a new keyword-only function with provided parameters. + + :param func: The original function to be wrapped. + :type func: Callable + :param parameters: The sequence of parameters for the new function. + :type parameters: Sequence[Parameter] + :param flattened_group_keys: The list of valid group keys. + :type flattened_group_keys: list + :param func_name: The name of the new function. + :type func_name: str + :param documentation: The documentation string for the new function. + :type documentation: str + :return: The new keyword-only function. + :rtype: Callable + :raises ValidationException: If the provided function parameters are not keyword-only. + """ + if any(p.default == p.empty or p.kind != Parameter.KEYWORD_ONLY for p in parameters): + msg = "This function only accept keyword only parameters." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + target=ErrorTarget.COMPONENT, + ) + default_kwargs = {p.name: p.default for p in parameters} + + def f(**kwargs: Any) -> Any: + # We need to make sure all keys of kwargs are valid. + # Merge valid group keys with original keys. + _assert_arg_valid(kwargs, [*list(default_kwargs.keys()), *flattened_group_keys], func_name=func_name) + # We need to put the default args to the kwargs before invoking the original function. + _update_dct_if_not_exist(kwargs, default_kwargs) + return func(**kwargs) + + f = _replace_function_name(cast(types.FunctionType, f), func_name) + # Set the signature so jupyter notebook could have param hint by calling inspect.signature() + # Bug Item number: 2883223 + f.__signature__ = Signature(parameters) # type: ignore + # Set doc/name/module to make sure help(f) shows following expected result. + # Expected help(f): + # + # Help on function FUNC_NAME: + # FUNC_NAME(SIGNATURE) + # FUNC_DOC + # + f.__doc__ = documentation # Set documentation to update FUNC_DOC in help. + # Set module = None to avoid showing the sentence `in module 'azure.ai.ml.component._dynamic' in help.` + # See https://github.com/python/cpython/blob/2145c8c9724287a310bc77a2760d4f1c0ca9eb0c/Lib/pydoc.py#L1757 + f.__module__ = None # type: ignore + return f diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py new file mode 100644 index 00000000..7f88a70b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py @@ -0,0 +1,148 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import importlib + +from typing import Any, Dict, List, Optional, Union + +from azure.ai.ml._utils._experimental import experimental +from azure.ai.ml.entities import CommandComponent, PipelineJob +from azure.ai.ml.entities._assets.federated_learning_silo import FederatedLearningSilo +from azure.ai.ml.entities._builders.fl_scatter_gather import FLScatterGather + + +def _check_for_import(package_name: str) -> None: + try: + importlib.import_module(package_name) + except ImportError as e: + raise ImportError( + "The DSL FL Node has an additional requirement above the rest of the " + + "AML SDK repo in that the mldesigner package is required. Please run `pip install mldesigner` " + + "and try again." + ) from e + + +@experimental +def fl_scatter_gather( + *, + silo_configs: List[FederatedLearningSilo], + silo_component: Union[PipelineJob, CommandComponent], + aggregation_component: Union[PipelineJob, CommandComponent], + aggregation_compute: Optional[str] = None, + aggregation_datastore: Optional[str] = None, + shared_silo_kwargs: Optional[Dict] = None, + aggregation_kwargs: Optional[Dict] = None, + silo_to_aggregation_argument_map: Optional[Dict] = None, + aggregation_to_silo_argument_map: Optional[Dict] = None, + max_iterations: int = 1, + _create_default_mappings_if_needed: bool = False, + **kwargs: Any, +) -> FLScatterGather: + """A federated learning scatter-gather subgraph node. + + It's assumed that this will be used inside of a `@pipeline`-decorated function in order to create a subgraph which + will: + - Execute a specified pipeline step multiple times (the silo step), with each execution using slightly + different inputs, datastores, and computes based on an inputted config. + - Merge the outputs of the multiple silo steps into a single input for another step (the aggregation step), + which will then process the values into a single unified result. + - With the process above being a 'scatter gather' sequence, iterate and perform the scatter gather + a number of times according to the max_iterations input, with the output of any given iteration's + aggregation step being fed back into the silo steps of the subsequent iteration. + - Return the outputs of the last iteration's aggregation step as the node's final output. + + The process of assigning computes, datastores, and inputs to steps is called 'anchoring'. The following + details of the anchoring process should be noted: + - Computes will always be overridden to their respective compute. + - The paths of 'internal' outputs for silo steps (i.e. a component output that becomes an input for another + component within the silo step) are anchored to that silo's datastore. All other outputs are anchored + to the aggregation datastore. + - Some steps are automatically created by this node to merge inputs from the silo steps to the aggregation + step. These steps are anchored in the same manner as the aggregation step. + - Datastore anchoring ONLY occurs if an output's path value is empty. If a path already exists, it will NOT + try to replace it. + + The process of trimming down inputs from the silo steps to a single input for the aggregate step has some + caveats: + - Only silo outputs of type mltable and uri_folder are supported. + - Both the above output types become an mltable which mounts all the silo step outputs as sub-folders. + + The expected use case of this node is shown in the following code snippet: + + .. code-block:: python + + @experimental + def fl_pipeline(): + fl_node = fl_scatter_gather(**many_inputs) + return {"pipeline_output" : fl_node.outputs["aggregated_model"]} + + submitted_pipeline_job = my_client.jobs.create_or_update(fl_pipeline(), experiment_name="example_fl_pipeline") + + :keyword silo_configs: A list of FederatedLearningSilo objects, + which contain the necessary data to reconfigure components to run on specific computes and datastores, + while also targeting specific inputs located on the aforementioned datastores. + :paramtype silo_configs: List[~azure.ai.ml.entities._assets.federated_learning_silo.FederatedLearningSilo] + :keyword silo_component: A pipeline step that will be run multiple times across different silos, as specified + by the silo_configs input. In a typical horizontal federated learning context, this step is what will perform + model training using siloed subsets of data. Can be either a PipelineJob or a CommandComponent. + :paramtype silo_component: Union[~azure.ai.ml.entities.PipelineJob, ~azure.ai.ml.entities.CommandComponent] + :keyword aggregation_component: A pipeline step which receives inputs from the myriad executed silo components, + and does something with them. In a typical horizontal federated learning context, this component will merge + the models that were independently trained on each silo's data in a single model. Can be either a + PipelineJob or a CommandComponent. + :paramtype aggregation_component: Union[ + ~azure.ai.ml.entities.PipelineJob, + ~azure.ai.ml.entities.CommandComponent] + :keyword aggregation_compute: The name of the compute that the aggregation component will use. + :paramtype aggregation_compute: str + :keyword aggregation_datastore: The name of the datastore that the aggregation component will use. + :paramtype aggregation_datastore: str + :keyword shared_silo_kwargs: A dictionary of string keywords to component inputs. This dictionary is treated + like kwargs and is injected into ALL executed silo components. + :paramtype shared_silo_kwargs: Dict + :keyword aggregation_kwargs: A dictionary of string keywords to component inputs. This dictionary is treated + like kwargs and is injected into ALL executed aggregation components. + :paramtype aggregation_kwargs: Dict + :keyword silo_to_aggregation_argument_map: A dictionary specifying the mapping of outputs from the silo step to + inputs in the aggregation step. The keys should be output names from the silo step, and the values should be + input names in the aggregation step. This allows for customization of the mapping between the steps. + :paramtype silo_to_aggregation_argument_map: Dict + :keyword aggregation_to_silo_argument_map: A dictionary specifying the mapping of outputs from the aggregation step + to inputs in the silo step. The keys should be output names from the aggregation step, and the values should be + input names in the silo step. This allows for customization of the mapping between the steps. + :paramtype aggregation_to_silo_argument_map: Dict + :keyword max_iterations: The maximum number of scatter gather iterations that should be performed. + :paramtype max_iterations: int + :keyword _create_default_mappings_if_needed: + If True, try to automatically create input/output mappings if they're unset. + :paramtype _create_default_mappings_if_needed: bool + :return: The federated learning scatter-gather subgraph node. + :rtype: ~azure.ai.ml.entities._builders.fl_scatter_gather.FLScatterGather + + .. warning:: + + Using this node directly from the SDK repo requires that the user have the 'mldesigner' package installed, + which is not a standard dependency of this package. + """ + # Private kwargs: + # _create_default_mappings_if_needed: if true, then try to automatically create i/o mappings if they're unset. + + # check that mldesigner is available + _check_for_import("mldesigner") + + # Like other DSL nodes, this is just a wrapper around a node builder entity initializer. + return FLScatterGather( + silo_configs=silo_configs, + silo_component=silo_component, # type: ignore[arg-type] + aggregation_component=aggregation_component, # type: ignore[arg-type] + shared_silo_kwargs=shared_silo_kwargs, + aggregation_compute=aggregation_compute, + aggregation_datastore=aggregation_datastore, + aggregation_kwargs=aggregation_kwargs, + silo_to_aggregation_argument_map=silo_to_aggregation_argument_map, + aggregation_to_silo_argument_map=aggregation_to_silo_argument_map, + max_iterations=max_iterations, + create_default_mappings_if_needed=_create_default_mappings_if_needed, + **kwargs, + ) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py new file mode 100644 index 00000000..6455f793 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py @@ -0,0 +1,301 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +# pylint: disable=protected-access, redefined-builtin +# disable redefined-builtin to use globals/locals as argument name + +# Attribute on customized group class to mark a value type as a group of inputs/outputs. +import threading +import functools +from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union + +from azure.ai.ml import Input, Output +from azure.ai.ml.constants._component import IOConstants +from azure.ai.ml.entities._inputs_outputs import GroupInput, _get_param_with_standard_annotation +from azure.ai.ml.entities._inputs_outputs.utils import Annotation + +T = TypeVar("T") + + +def group(_cls: Type[T]) -> Type[T]: + """Group decorator to make user-defined class as a group of inputs/outputs. + + Usage: + Group a set of component inputs make component configuration easier. + + e.g. Define a group + + .. code-block:: python + + # define a group + @dsl.group + class ParamClass: + str_param: str + int_param: int = 1 + + + # see the help of auto-gen __init__ function + help(ParamClass.__init__) + + e.g. Define a group with inheritance + + .. code-block:: python + + # define the parent group + @dsl.group + class ParentClass: + str_param: str + int_param: int = 1 + + + @dsl.group + class GroupClass(ParentClass): + float_param: float + str_param: str = "test" + + + # see the help of auto-gen __init__ function + help(GroupClass.__init__) + + e.g. Define a multi-level group + + .. code-block:: python + + # define a sub group + @dsl.group + class SubGroupClass: + str_param: str + int_param: int = 1 + + + @dsl.group + class ParentClass: + param_group1: SubGroupClass + # declare a sub group field with default + param_group2: SubGroupClass = SubGroupClass(str_param="test") + + + # see the help of auto-gen __init__ function + help(ParentClass.__init__) + + e.g. Link and assign Group + + .. code-block:: python + + # create a sub group value + my_param_group = SubGroupClass(str_param="dataset", int_param=2) + + # create a parent group value + my_parent_group = ParentClass(param_group1=my_param_group) + + + # option 1. use annotation to declare the input is a group. + @pipeline + def pipeline_func(params: SubGroupClass): + component = component_func( + string_parameter=params.str_param, int_parameter=params.int_param + ) + return component.outputs + + + # create a pipeline instance + pipeline = pipeline_func(my_param_group) + + + # option 2. use default of input to declare itself a group. + @pipeline + def pipeline_func(params=my_param_group): + component = component_func( + string_parameter=params.str_param, int_parameter=params.int_param + ) + return component.outputs + + + # create a pipeline instance + pipeline = pipeline_func() + + + # use multi-level group in pipeline. + @pipeline + def parent_func(params: ParentClass): + # pass sub group to sub pipeline. + pipeline = pipeline_func(params.param_group1) + return pipeline.outputs + + Supported Types: + * Primitive type: int, str, float, bool + * Declare with python type annotation: `param: int` + * Declare with Input annotation: `param: Input(type='integer', min=1, max=5)` + + * For sub group class + * Sub group class should be decorated with `dsl.group` + + Restrictions: + * Each group member's name must be public (not start with '_'). + * When use group as a pipeline input, user **MUST** write the type annotation + or give it a non-None default value to infer the group class. + + :param _cls: The class to decorate + :type _cls: Type[T] + :return: The decorated class + :rtype: Type[T] + """ + + T2 = TypeVar("T2") + + def _create_fn( + name: str, + args: Union[List, str], + body: Union[List, str], + *, + globals: Optional[Dict[str, Any]] = None, + locals: Optional[Dict[str, Any]] = None, + return_type: Optional[Type[T2]], + ) -> Callable[..., T2]: + """To generate function in class. + + :param name: The name of the new function + :type name: str + :param args: The parameter names of the new function + :type args: List[str] + :param body: The source code of the body of the new function + :type body: List[str] + :keyword globals: The global variables to make available to the new function. + :paramtype globals: Dict[str, Any] + :keyword locals: The local variables to make available to the new function + :paramtype locals: Dict[str, Any] + :keyword return_type: The return type of the new function + :paramtype return_type: Type[T2] + :return: The created function + :rtype: Callable[..., T2] + """ + # Reference: Source code of dataclasses.dataclass + # Doc link: https://docs.python.org/3/library/dataclasses.html + # Reference code link: + # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L412 + # Note that we mutate locals when exec() is called + if locals is None: + locals = {} + if "BUILTINS" not in locals: + import builtins + + locals["BUILTINS"] = builtins + locals["_return_type"] = return_type + return_annotation = "->_return_type" + args = ",".join(args) + body = "\n".join(f" {b}" for b in body) + # Compute the text of the entire function. + txt = f" def {name}({args}){return_annotation}:\n{body}" + local_vars = ", ".join(locals.keys()) + txt = f"def __create_fn__({local_vars}):\n{txt}\n return {name}" + ns: Dict = {} + exec(txt, globals, ns) # pylint: disable=exec-used # nosec + res: Callable = ns["__create_fn__"](**locals) + return res + + def _create_init_fn( # pylint: disable=unused-argument + cls: Type[T], fields: Dict[str, Union[Annotation, Input, Output]] + ) -> Callable[..., None]: + """Generate the __init__ function for user-defined class. + + :param cls: The class to update + :type cls: Type[T] + :param fields: The fields + :type fields: Dict[str, Union[Annotation, Input, Output]] + :return: The __init__ function + :rtype: Callable[..., None] + """ + + # Reference code link: + # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L523 + def _get_data_type_from_annotation(anno: Any) -> Any: + if isinstance(anno, GroupInput): + return anno._group_class + # keep original annotation for Outputs + if isinstance(anno, Output): + return anno + try: + # convert to primitive type annotation if possible + return IOConstants.PRIMITIVE_STR_2_TYPE[anno.type] + except KeyError: + # otherwise, keep original annotation + return anno + + def _get_default(key: str) -> Any: + # will set None as default value when default not exist so won't need to reorder the init params + val = fields[key] + if val is not None and hasattr(val, "default"): + return val.default + return None + + locals = {f"_type_{key}": _get_data_type_from_annotation(val) for key, val in fields.items()} + # Collect field defaults if val is parameter and is optional + defaults = {f"_default_{key}": _get_default(key) for key, val in fields.items()} + locals.update(defaults) + # Ban positional init as we reordered the parameter. + _init_param = ["self", "*"] + for key in fields: + _init_param.append(f"{key}:_type_{key}=_default_{key}") + + body_lines = [f"self.{key}={key}" for key in fields] + # If no body lines, use 'pass'. + if not body_lines: + body_lines = ["pass"] + return _create_fn("__init__", _init_param, body_lines, locals=locals, return_type=None) + + def _create_repr_fn(fields: Dict[str, Union[Annotation, Input, Output]]) -> Callable[..., str]: + """Generate the __repr__ function for user-defined class. + + :param fields: The fields + :type fields: Dict[str, Union[Annotation, Input, Output]] + :return: The __repr__ function + :rtype: Callable[..., str] + """ + # Reference code link: + # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L582 + fn = _create_fn( + "__repr__", + [ + "self", + ], + ['return self.__class__.__qualname__ + f"(' + ", ".join([f"{k}={{self.{k}!r}}" for k in fields]) + ')"'], + return_type=str, + ) + + # This function's logic is copied from "recursive_repr" function in + # reprlib module to avoid dependency. + def _recursive_repr(user_function: Any) -> Any: + # Decorator to make a repr function return "..." for a recursive + # call. + repr_running = set() + + @functools.wraps(user_function) + def wrapper(self: Any) -> Any: + key = id(self), threading.get_ident() + if key in repr_running: + return "..." + repr_running.add(key) + try: + result = user_function(self) + finally: + repr_running.discard(key) + return result + + return wrapper + + res: Callable = _recursive_repr(fn) + return res + + def _process_class(cls: Type[T], all_fields: Dict[str, Union[Annotation, Input, Output]]) -> Type[T]: + setattr(cls, "__init__", _create_init_fn(cls, all_fields)) + setattr(cls, "__repr__", _create_repr_fn(all_fields)) + return cls + + def _wrap(cls: Type[T]) -> Type[T]: + all_fields = _get_param_with_standard_annotation(cls) + # Set group info on cls + setattr(cls, IOConstants.GROUP_ATTR_NAME, GroupInput(all_fields, _group_class=cls)) + # Add init, repr, eq to class with user-defined annotation type + return _process_class(cls, all_fields) + + return _wrap(_cls) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py new file mode 100644 index 00000000..074c0502 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py @@ -0,0 +1,43 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +from typing import Any, Callable + +from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY +from azure.ai.ml.entities._builders import Command +from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin + + +# pylint: disable=unused-argument +def to_component(*, job: ComponentTranslatableMixin, **kwargs: Any) -> Callable[..., Command]: + """Translate a job object to a component function, provided job should be able to translate to a component. + + For example: + + .. code-block:: python + + # Load a local command job to a component function. + my_job = load_job("my_job.yaml") + component_func = dsl.to_component(my_job) + # Load a remote command job component to a component function. + my_job = ml_client.jobs.get("my_job") + component_func = dsl.to_component(my_job) + + # Consuming the component func + component = component_func(param1=xxx, param2=xxx) + + :keyword job: Job load from local or remote. + :paramtype job: ~azure.ai.ml.entities._job.pipeline._component_translatable.ComponentTranslatableMixin + + :return: Wrapped function call. + :rtype: typing.Callable[..., ~azure.ai.ml.entities._builders.command.Command] + """ + from pathlib import Path + + # set default base path as "./". Because if code path is relative path and base path is None, will raise error when + # get arm id of Code + res: Callable = job._to_component(context={BASE_PATH_CONTEXT_KEY: Path("./")}) # type: ignore[arg-type, assignment] + return res diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py new file mode 100644 index 00000000..2732cd75 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py @@ -0,0 +1,46 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +"""This file stores functions and objects that will be used in mldesigner package. + +DO NOT change the module names in "all" list. If the interface has changed in source code, wrap it here and keep +original function/module names the same as before, otherwise mldesigner will be broken by this change. +""" + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) + +from azure.ai.ml._internal.entities import InternalComponent +from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes +from azure.ai.ml._utils._asset_utils import get_ignore_file +from azure.ai.ml._utils.utils import try_enable_internal_components +from azure.ai.ml.dsl._condition import condition +from azure.ai.ml.dsl._do_while import do_while +from azure.ai.ml.dsl._group_decorator import group +from azure.ai.ml.dsl._parallel_for import ParallelFor, parallel_for +from azure.ai.ml.entities._component.component_factory import component_factory +from azure.ai.ml.entities._inputs_outputs import _get_param_with_standard_annotation +from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function + +from ._constants import V1_COMPONENT_TO_NODE + +component_factory_load_from_dict = component_factory.load_from_dict + + +__all__ = [ + # to be put in main package + "condition", + "do_while", + "group", + "parallel_for", + "ParallelFor", + # must keep + "get_ignore_file", + "_get_param_with_standard_annotation", + "_generate_component_function", + "component_factory_load_from_dict", + "V1_COMPONENT_TO_NODE", + "try_enable_internal_components", + "InternalAdditionalIncludes", + "InternalComponent", +] diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py new file mode 100644 index 00000000..f770c97d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py @@ -0,0 +1,33 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +"""This file stores constants that will be used in mldesigner package.""" +from azure.ai.ml._internal._schema.component import NodeType as V1NodeType +from azure.ai.ml._internal.entities import ( + Ae365exepool, + AetherBridge, + Command as InternalCommand, + Parallel as InternalParallel, + Pipeline as InternalPipeline, + DataTransfer, + Distributed, + HDInsight, + Hemera, + Scope, + Starlite, +) + +V1_COMPONENT_TO_NODE = { + V1NodeType.SCOPE: Scope, + V1NodeType.COMMAND: InternalCommand, + V1NodeType.PARALLEL: InternalParallel, + V1NodeType.PIPELINE: InternalPipeline, + V1NodeType.DATA_TRANSFER: DataTransfer, + V1NodeType.DISTRIBUTED: Distributed, + V1NodeType.HDI: HDInsight, + V1NodeType.STARLITE: Starlite, + V1NodeType.HEMERA: Hemera, + V1NodeType.AE365EXEPOOL: Ae365exepool, + V1NodeType.AETHER_BRIDGE: AetherBridge, +} diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py new file mode 100644 index 00000000..6460ec6f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py @@ -0,0 +1,22 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +from typing import Mapping, Optional + + +class OverrideDefinition(dict): + """Definition of a overridable field of a component job.""" + + +def get_override_definition_from_schema( + schema: str, # pylint: disable=unused-argument +) -> Optional[Mapping[str, OverrideDefinition]]: + """Ger override definition from a json schema. + + :param schema: Json schema of component job. + :type schema: str + :return: A dictionary from a override definition name to a override definition. + """ + # TODO: gen override definition + return None diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py new file mode 100644 index 00000000..4158f744 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py @@ -0,0 +1,67 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +from typing import Any, Dict, List, Union + +from azure.ai.ml.entities._builders import BaseNode +from azure.ai.ml.entities._builders.parallel_for import ParallelFor +from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput + + +def parallel_for( + *, body: BaseNode, items: Union[List, Dict, str, PipelineInput, NodeOutput], **kwargs: Any +) -> ParallelFor: + """Build a parallel for loop by specifying the loop body and input items. + + .. note:: + The following example shows how to use parallel for API to create a pipeline with parallel for node. + + .. code-block:: python + + from azure.ai.ml.dsl import pipeline + from mldesigner.dsl import parallel_for + + + @pipeline + def your_loop_body(input): + pass + + + @pipeline + def pipeline_with_parallel_for_node(): + # specify fixed inputs and leave loop config inputs unprovided + loop_body = your_loop_body() + + # link loop body & loop config to loop node + loop_node = parallel_for( + body=loop_body, + items=[ + {"loop_body_input": 1}, + {"loop_body_input": 2}, + ], + ) + + # collect aggregated output from loop body + aggregate_node = collect_aggregated_outputs_from_for_each_node( + input=loop_node.outputs.output, + ) + + :keyword body: Node to execute as the loop body. + :paramtype body: ~azure.ai.ml.entities._builders.BaseNode + :keyword items: The loop body's input which will bind to the loop node. + :paramtype items: Union[ + list, + dict, + str, + ~azure.ai.ml.entities._job.pipeline._io.PipelineInput, + ~azure.ai.ml.entities._job.pipeline._io.NodeOutput] + :return: The parallel for loop + :rtype: ~azure.ai.ml.entities._builders.parallel_for.ParallelFor + """ + parallel_for_node = ParallelFor( + body=body, # type: ignore[arg-type] + items=items, + _from_component_func=True, + **kwargs, + ) + return parallel_for_node diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py new file mode 100644 index 00000000..44026174 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py @@ -0,0 +1,615 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access +import copy +import inspect +import logging +import typing +from collections import OrderedDict +from inspect import Parameter, signature +from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union + +from azure.ai.ml._utils._func_utils import get_outputs_and_locals +from azure.ai.ml._utils.utils import is_valid_node_name, parse_args_description_from_docstring +from azure.ai.ml.constants._component import ComponentSource, IOConstants +from azure.ai.ml.constants._job.pipeline import COMPONENT_IO_KEYWORDS +from azure.ai.ml.dsl._utils import _sanitize_python_variable_name +from azure.ai.ml.entities import PipelineJob +from azure.ai.ml.entities._builders import BaseNode +from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode +from azure.ai.ml.entities._component.component import Component +from azure.ai.ml.entities._component.pipeline_component import PipelineComponent +from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output, _get_param_with_standard_annotation +from azure.ai.ml.entities._inputs_outputs.utils import _get_annotation_by_value, is_group +from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob +from azure.ai.ml.entities._job.pipeline._attr_dict import has_attr_safe +from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, PipelineOutput, _GroupAttrDict +from azure.ai.ml.entities._util import copy_output_setting + +# We need to limit the depth of pipeline to avoid the built graph goes too deep and prevent potential +# stack overflow in dsl.pipeline. +from azure.ai.ml.exceptions import UserErrorException + +_BUILDER_STACK_MAX_DEPTH = 100 + +module_logger = logging.getLogger(__name__) + + +class _PipelineComponentBuilderStack: + def __init__(self) -> None: + self.items: List["PipelineComponentBuilder"] = [] + + def top(self) -> Optional["PipelineComponentBuilder"]: + if self.is_empty(): + return None + return self.items[-1] + + def pop(self) -> Optional["PipelineComponentBuilder"]: + if self.is_empty(): + return None + return self.items.pop() + + def push(self, item: object) -> None: + error_msg = f"{self.__class__.__name__} only " f"allows pushing `{PipelineComponentBuilder.__name__}` element" + assert isinstance(item, PipelineComponentBuilder), error_msg + + # TODO: validate cycle + self.items.append(item) + if self.size() >= _BUILDER_STACK_MAX_DEPTH: + current_pipeline = self.items[0].name + # clear current pipeline stack + self.items = [] + msg = "Pipeline {} depth exceeds limitation. Max depth: {}" + raise UserErrorException( + message=msg.format(current_pipeline, _BUILDER_STACK_MAX_DEPTH), + no_personal_data_message=msg.format("[current_pipeline]", _BUILDER_STACK_MAX_DEPTH), + ) + + def is_empty(self) -> bool: + return len(self.items) == 0 + + def size(self) -> int: + return len(self.items) + + +# This collection is used to record pipeline component builders in current call stack +_definition_builder_stack = _PipelineComponentBuilderStack() + + +def _is_inside_dsl_pipeline_func() -> bool: + """Checks whether executing within a dsl pipeline func + + :return: True if is inside DSL pipeline func. + :rtype: bool + """ + return _definition_builder_stack.size() > 0 + + +def _add_component_to_current_definition_builder(component: Union[BaseNode, AutoMLJob]) -> None: + if _is_inside_dsl_pipeline_func(): + builder = _definition_builder_stack.top() + if builder is not None: + builder.add_node(component) + + +class PipelineComponentBuilder: + # map from python built-in type to component type + # pylint: disable=too-many-instance-attributes + DEFAULT_DATA_TYPE_MAPPING = { + "float": "number", + "int": "integer", + "bool": "boolean", + "str": "string", + } + DEFAULT_OUTPUT_NAME = "output" + + def __init__( + self, + func: Callable, + name: Optional[str] = None, + version: Optional[str] = None, + display_name: Optional[str] = None, + description: Optional[str] = None, + default_datastore: Any = None, + tags: Optional[Union[Dict[str, str], str]] = None, + source_path: Optional[str] = None, + non_pipeline_inputs: Optional[List] = None, + ): + self.func = func + name = name if name is not None else func.__name__ + display_name = display_name if display_name else name + description = description if description else func.__doc__ + self._args_description = parse_args_description_from_docstring(func.__doc__) + # List of nodes, order by it's creation order in pipeline. + self.nodes: List = [] + self.non_pipeline_parameter_names = non_pipeline_inputs or [] + # A dict of inputs name to InputDefinition. + # TODO: infer pipeline component input meta from assignment + self.inputs = self._build_inputs(func) + self.output_annotation = self._get_output_annotation(func) + self._name = name + self.version = version + self.display_name = display_name + self.description = description + self.default_datastore = default_datastore + self.tags = tags + self.source_path = source_path + + @property + def name(self) -> str: + """Name of pipeline builder, it's name will be same as the pipeline definition it builds. + + :return: Pipeline builder name + :rtype: str + """ + return self._name + + def add_node(self, node: Union[BaseNode, AutoMLJob]) -> None: + """Add node to pipeline builder. + + :param node: A pipeline node. + :type node: Union[BaseNode, AutoMLJob] + """ + self.nodes.append(node) + + def build( + self, + *, + user_provided_kwargs: Optional[Dict] = None, + non_pipeline_inputs_dict: Optional[Dict] = None, + non_pipeline_inputs: Optional[List] = None, + ) -> PipelineComponent: + """Build a pipeline component from current pipeline builder. + + :keyword user_provided_kwargs: The kwargs user provided to dsl pipeline function. None if not provided. + :keyword non_pipeline_inputs_dict: The non-pipeline input provided key-value. None if not exist. + :keyword non_pipeline_inputs: List of non-pipeline input name. None if not exist. + :return: The built PipelineComponent + :rtype: PipelineComponent + """ + if user_provided_kwargs is None: + user_provided_kwargs = {} + # Clear nodes as we may call build multiple times. + self.nodes = [] + + kwargs = _build_pipeline_parameter( + func=self.func, + user_provided_kwargs=user_provided_kwargs, + # TODO: support result() for pipeline input inside parameter group + group_default_kwargs=self._get_group_parameter_defaults(), + non_pipeline_inputs=non_pipeline_inputs, + ) + kwargs.update(non_pipeline_inputs_dict or {}) + + # Use a dict to store all variables in self.func + # We use this stack to store the dsl pipeline definition hierarchy + _definition_builder_stack.push(self) + + try: + outputs, _locals = get_outputs_and_locals(self.func, kwargs) + finally: + _definition_builder_stack.pop() + + if outputs is None: + outputs = {} + + jobs: Dict = self._update_nodes_variable_names(_locals) + pipeline_component = PipelineComponent( + name=self.name, + version=self.version, + display_name=self.display_name, + description=self.description, + inputs=self.inputs, + jobs=jobs, + tags=self.tags, # type: ignore[arg-type] + source_path=self.source_path, + _source=ComponentSource.DSL, + ) + # TODO: Refine here. The output can not be built first then pass into pipeline component creation, + # exception will be raised in component.build_validate_io(). + pipeline_component._outputs = self._build_pipeline_outputs(outputs) + return pipeline_component + + def _validate_group_annotation(self, name: str, val: GroupInput) -> None: + for k, v in val.values.items(): + if isinstance(v, GroupInput): + self._validate_group_annotation(k, v) + elif isinstance(v, Output): + # TODO(2097468): automatically change it to Input when used in input annotation + raise UserErrorException("Output annotation cannot be used in @pipeline.") + elif isinstance(v, Input): + if v.type not in IOConstants.PRIMITIVE_STR_2_TYPE: + # TODO(2097478): support port type groups + raise UserErrorException(f"Only primitive types can be used as input of group, got {v.type}") + else: + raise UserErrorException(f"Unsupported annotation type {type(v)} for group field {name}.{k}") + + def _build_inputs(self, func: Union[Callable, Type]) -> Dict: + inputs: Dict = _get_param_with_standard_annotation( + func, is_func=True, skip_params=self.non_pipeline_parameter_names + ) + for k, v in inputs.items(): + if isinstance(v, GroupInput): + self._validate_group_annotation(name=k, val=v) + # add arg description + if k in self._args_description: + v["description"] = self._args_description[k] + return inputs + + def _build_pipeline_outputs(self, outputs: typing.Dict[str, NodeOutput]) -> Dict[str, PipelineOutput]: + """Validate if dsl.pipeline returns valid outputs and set output binding. Create PipelineOutput as pipeline's + output definition based on node outputs from return. + + :param outputs: Outputs of pipeline + :type outputs: Mapping[str, azure.ai.ml.Output] + :return: The mapping of output names to PipelineOutput + :rtype: Dict[str, PipelineOutput] + """ + error_msg = ( + "The return type of dsl.pipeline decorated function should be a mapping from output name to " + "azure.ai.ml.Output with owner." + ) + if is_group(outputs): + outputs = {key: val for key, val in outputs.__dict__.items() if val} + if not isinstance(outputs, dict): + raise UserErrorException(message=error_msg, no_personal_data_message=error_msg) + output_dict = {} + output_meta_dict = {} + for key, value in outputs.items(): + if not isinstance(key, str) or not isinstance(value, NodeOutput) or value._owner is None: + raise UserErrorException(message=error_msg, no_personal_data_message=error_msg) + if value._meta is not None: + meta = value._meta + else: + meta = Output(type=value.type, path=value.path, mode=value.mode, description=value.description) + + # Hack: map internal output type to pipeline output type + def _map_internal_output_type(_meta: Output) -> str: + """Map component output type to valid pipeline output type. + + :param _meta: The output + :type _meta: Output + :return: Output type + :rtype: str + """ + if type(_meta).__name__ != "InternalOutput": + return str(_meta.type) + return str(_meta.map_pipeline_output_type()) # type: ignore[attr-defined] + + # Note: Here we set PipelineOutput as Pipeline's output definition as we need output binding. + output_meta = Output( + type=_map_internal_output_type(meta), # type: ignore[arg-type] + description=meta.description, + mode=meta.mode, + ) + pipeline_output = PipelineOutput( + port_name=key, + data=None, + # meta is used to create pipeline component, store it here to make sure pipeline component and inner + # node output type are consistent + meta=output_meta, + owner="pipeline", + description=self._args_description.get(key, None), + # store original node output to be able to trace back to inner node from a pipeline output builder. + binding_output=value, + ) + # copy node level output setting to pipeline output + copy_output_setting( + source=value._owner.outputs[value._port_name], target=pipeline_output # type: ignore[arg-type] + ) + + value._owner.outputs[value._port_name]._data = pipeline_output # type: ignore[union-attr] + + output_dict[key] = pipeline_output + output_meta_dict[key] = output_meta._to_dict() + + self._validate_inferred_outputs(output_meta_dict, output_dict) + return output_dict + + def _get_group_parameter_defaults(self) -> Dict: + group_defaults = {} + for key, val in self.inputs.items(): + if not isinstance(val, GroupInput): + continue + # Copy and insert top-level parameter name into group names for all items + group_defaults[key] = copy.deepcopy(val.default) + group_defaults[key].insert_group_name_for_items(key) + return group_defaults + + def _update_nodes_variable_names(self, func_variables: dict) -> Dict[str, Union[BaseNode, AutoMLJob]]: + """Update nodes list to ordered dict with variable name key and component object value. + + Variable naming priority: + 1. Specified by using xxx.name. + e.g. + module1 = module_func() + module1.name = "node1" # final node name is "node1" + + 2. Variable name + e.g. + my_node = module_func() # final node name is "my_node" + + 3. Anonymous node, but another node with same component.name has user-defined name + e.g. + my_node = module_func() # final node name is "my_node" + module_fun() # final node name is "my_node_1" + module_fun() # final node name is "my_node_2" + + 4. Anonymous node + e.g. + my_node = module_func() # final node name is "my_node" + module_func_1() # final node name is its component name + + :param func_variables: The function variables + :type func_variables: dict + :return: Map of variable name to component object + :rtype: Dict[str, Union[BaseNode, AutoMLJob]] + """ + + def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]) -> Optional[Union[str, Component]]: + # TODO(1979547): refactor this + if isinstance(node, AutoMLJob): + return node.name or _sanitize_python_variable_name(node.__class__.__name__) + if isinstance(node, ControlFlowNode): + return _sanitize_python_variable_name(node.__class__.__name__) + return node.name or node._get_component_name() + + valid_component_ids = set(item._instance_id for item in self.nodes) + id_name_dict = {} + name_count_dict = {} + compname_udfname_dict = {} + local_names = set() + result = OrderedDict() + + for k, v in func_variables.items(): + # TODO(1979547): refactor this + if not isinstance(v, (BaseNode, AutoMLJob, PipelineJob, ControlFlowNode)): + continue + instance_id = getattr(v, "_instance_id", None) + if instance_id not in valid_component_ids: + continue + name = getattr(v, "name", None) or k + # for node name _, treat it as anonymous node with name unset + if name == "_": + continue + + # User defined name must be valid python identifier + if not is_valid_node_name(name): + raise UserErrorException( + f"Invalid node name found: {name!r}. Node name must start with a lower letter or underscore, " + "and can only contain lower letters, numbers and underscore." + ) + + # Raise error when setting a name that already exists, likely conflict with a variable name + if name in local_names and instance_id not in id_name_dict: + raise UserErrorException( + f"Duplicate node name found in pipeline: {self.name!r}, " + f"node name: {name!r}. Duplicate check is case-insensitive." + ) + local_names.add(name) + id_name_dict[v._instance_id] = name # type: ignore[union-attr] + name_count_dict[name] = 1 + + # Find the last user-defined name for the same type of components + for node in self.nodes: + _id = node._instance_id + if _id in id_name_dict: + compname_udfname_dict[_get_name_or_component_name(node)] = id_name_dict[_id] + + # Refine and fill default name + # If component name is same, append '_{count}' suffix + for node in self.nodes: + _id = node._instance_id + if _id not in id_name_dict: + target_name = _get_name_or_component_name(node) + if node.name is None and target_name in compname_udfname_dict: + target_name = compname_udfname_dict[target_name] + if target_name not in name_count_dict: + name_count_dict[target_name] = 0 + name_count_dict[target_name] += 1 + suffix = "" if name_count_dict[target_name] == 1 else f"_{name_count_dict[target_name] - 1}" + id_name_dict[_id] = f"{_sanitize_python_variable_name(str(target_name))}{suffix}" + final_name = id_name_dict[_id] + node.name = final_name + result[final_name] = node + + # Validate IO name of node with correct node name, and log warning if there is keyword. + self._validate_keyword_in_node_io(node) + return result + + def _update_inputs(self, pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]) -> None: + """Update the pipeline inputs by the dict. + + :param pipeline_inputs: The pipeline inputs + :type pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]] + """ + for input_name, value in pipeline_inputs.items(): + anno: Any = None + if input_name not in self.inputs: + if isinstance(value, PipelineInput): + value = value._data + if isinstance(value, Input): + anno = copy.copy(value) + elif isinstance(value, NodeOutput): + anno = Input(type=value.type) + else: + anno = _get_annotation_by_value(value) + anno.name = input_name + anno.description = self._args_description.get(input_name) + self.inputs[input_name] = anno + + @classmethod + def _get_output_annotation(cls, func: Callable) -> Dict[str, Dict]: + """Get the output annotation of the function, validate & refine it. + + :param func: The function to retrieve output annotations from + :type func: Callable + :return: A dict of output annotations + :rtype: Dict[str, Dict] + """ + return_annotation = inspect.signature(func).return_annotation + + if is_group(return_annotation): + outputs = _get_param_with_standard_annotation(return_annotation, is_func=False) + elif isinstance(return_annotation, Output): + outputs = {cls.DEFAULT_OUTPUT_NAME: return_annotation} + else: + # skip if return annotation is not group or output + return {} + + output_annotations = {} + for key, val in outputs.items(): + if isinstance(val, GroupInput): + raise UserErrorException(message="Nested group annotation is not supported in pipeline output.") + # normalize annotation since currently annotation in @group will be converted to Input + if isinstance(val, Input): + val = Output(type=val.type) + if not isinstance(val, Output): + raise UserErrorException( + message="Invalid output annotation. " + f"Only Output annotation in return annotation is supported. Got {type(val)}." + ) + output_annotations[key] = val._to_dict() + return output_annotations + + def _validate_inferred_outputs(self, output_meta_dict: dict, output_dict: Dict[str, PipelineOutput]) -> None: + """Validate inferred output dict against annotation. + + :param output_meta_dict: The output meta dict + :type output_meta_dict: dict + :param output_dict: The output dict + :type output_dict: Dict[str, PipelineOutput] + """ + if not self.output_annotation: + return + error_prefix = "Unmatched outputs between actual pipeline output and output in annotation" + if output_meta_dict.keys() != self.output_annotation.keys(): + raise UserErrorException( + "{}: actual pipeline component outputs: {}, annotation outputs: {}".format( + error_prefix, output_meta_dict.keys(), self.output_annotation.keys() + ) + ) + + unmatched_outputs = [] + for key, actual_output in output_meta_dict.items(): + expected_output = self.output_annotation[key] + actual_output.pop("description", None) + expected_description = expected_output.pop("description", None) + # skip comparing mode since when component's from remote, output mode is not available + actual_output.pop("mode", None) + expected_mode = expected_output.pop("mode", None) + if expected_output != actual_output: + unmatched_outputs.append( + f"{key}: pipeline component output: {actual_output} != annotation output {expected_output}" + ) + res = output_dict[key]._meta + if expected_description: + if res is not None: + res.description = expected_description + # also copy the description to pipeline job + output_dict[key].description = expected_description + if expected_mode: + if res is not None: + res.mode = expected_mode + # also copy the mode to pipeline job + output_dict[key].mode = expected_mode + + if unmatched_outputs: + raise UserErrorException(f"{error_prefix}: {unmatched_outputs}") + + @staticmethod + def _validate_keyword_in_node_io(node: Union[BaseNode, AutoMLJob]) -> None: + if has_attr_safe(node, "inputs"): + for input_name in set(node.inputs) & COMPONENT_IO_KEYWORDS: # type: ignore[arg-type] + module_logger.warning( + 'Reserved word "%s" is used as input name in node "%s", ' + "can only be accessed with '%s.inputs[\"%s\"]'", + input_name, + node.name, + node.name, + input_name, + ) + if has_attr_safe(node, "outputs"): + for output_name in set(node.outputs) & COMPONENT_IO_KEYWORDS: # type: ignore[arg-type] + module_logger.warning( + 'Reserved word "%s" is used as output name in node "%s", ' + "can only be accessed with '%s.outputs[\"%s\"]'", + output_name, + node.name, + node.name, + output_name, + ) + + +def _build_pipeline_parameter( + func: Optional[Callable], + *, + user_provided_kwargs: Dict, + group_default_kwargs: Optional[Dict] = None, + non_pipeline_inputs: Optional[List] = None, +) -> Dict: + # Pass group defaults into kwargs to support group.item can be used even if no default on function. + # example: + # @group + # class Group: + # key = 'val' + # + # @pipeline + # def pipeline_func(param: Group): + # component_func(input=param.key) <--- param.key should be val. + + # transform kwargs + transformed_kwargs, non_pipeline_inputs = {}, non_pipeline_inputs or [] + if group_default_kwargs: + transformed_kwargs.update( + { + key: _wrap_pipeline_parameter(key, default_value=value, actual_value=value) + for key, value in group_default_kwargs.items() + if key not in non_pipeline_inputs + } + ) + + def all_params(parameters: Any) -> Generator: + yield from parameters.values() + + if func is None: + return transformed_kwargs + + parameters = all_params(signature(func).parameters) + # transform default values + for left_args in parameters: + if ( + left_args.name not in transformed_kwargs + and left_args.kind != Parameter.VAR_KEYWORD + and left_args.name not in non_pipeline_inputs + ): + default_value = left_args.default if left_args.default is not Parameter.empty else None + actual_value = user_provided_kwargs.get(left_args.name) + transformed_kwargs[left_args.name] = _wrap_pipeline_parameter( + key=left_args.name, default_value=default_value, actual_value=actual_value + ) + # Add variable kwargs to transformed_kwargs. + for key, value in user_provided_kwargs.items(): + if key not in transformed_kwargs: + transformed_kwargs[key] = _wrap_pipeline_parameter(key=key, default_value=None, actual_value=value) + return transformed_kwargs + + +def _wrap_pipeline_parameter( + key: str, default_value: Any, actual_value: Any, group_names: Optional[List[str]] = None +) -> Union[_GroupAttrDict, PipelineInput]: + # Append parameter path in group + group_names = [*group_names] if group_names else [] + if isinstance(default_value, _GroupAttrDict): + group_names.append(key) + return _GroupAttrDict( + { + k: _wrap_pipeline_parameter(k, default_value=v, actual_value=v, group_names=group_names) + for k, v in default_value.items() + } + ) + # Note: this PipelineInput object is built to mark input as a data binding. + # It only exists in dsl.pipeline function execution time and won't store in pipeline job or pipeline component. + return PipelineInput(name=key, meta=None, default_data=default_value, data=actual_value, group_names=group_names) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py new file mode 100644 index 00000000..21b5b6e3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py @@ -0,0 +1,327 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import inspect +import logging +from collections import OrderedDict +from functools import wraps +from inspect import Parameter, signature +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, TypeVar, Union, overload + +from typing_extensions import ParamSpec + +from azure.ai.ml._utils.utils import is_private_preview_enabled +from azure.ai.ml.entities import Data, Model, PipelineJob, PipelineJobSettings +from azure.ai.ml.entities._builders.pipeline import Pipeline +from azure.ai.ml.entities._inputs_outputs import Input, is_group +from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, _GroupAttrDict +from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression +from azure.ai.ml.exceptions import ( + MultipleValueError, + ParamValueNotExistsError, + TooManyPositionalArgsError, + UnexpectedKeywordError, + UnsupportedParameterKindError, + UserErrorException, +) + +from ..entities._builders import BaseNode +from ._pipeline_component_builder import PipelineComponentBuilder, _is_inside_dsl_pipeline_func +from ._settings import _dsl_settings_stack +from ._utils import _resolve_source_file + +SUPPORTED_INPUT_TYPES = ( + PipelineInput, + NodeOutput, + Input, + Model, + Data, # For the case use a Data object as an input, we will convert it to Input object + Pipeline, # For the case use a pipeline node as the input, we use its only one output as the real input. + str, + bool, + int, + float, + PipelineExpression, + _GroupAttrDict, +) +module_logger = logging.getLogger(__name__) + +T = TypeVar("T") +P = ParamSpec("P") + + +# Overload the returns a decorator when func is None +@overload +def pipeline( + func: None, + *, + name: Optional[str] = None, + version: Optional[str] = None, + display_name: Optional[str] = None, + description: Optional[str] = None, + experiment_name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + **kwargs: Any, +) -> Callable[[Callable[P, T]], Callable[P, PipelineJob]]: ... + + +# Overload the returns a decorated function when func isn't None +@overload +def pipeline( + func: Callable[P, T], + *, + name: Optional[str] = None, + version: Optional[str] = None, + display_name: Optional[str] = None, + description: Optional[str] = None, + experiment_name: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + **kwargs: Any, +) -> Callable[P, PipelineJob]: ... + + +def pipeline( + func: Optional[Callable[P, T]] = None, + *, + name: Optional[str] = None, + version: Optional[str] = None, + display_name: Optional[str] = None, + description: Optional[str] = None, + experiment_name: Optional[str] = None, + tags: Optional[Union[Dict[str, str], str]] = None, + **kwargs: Any, +) -> Union[Callable[[Callable[P, T]], Callable[P, PipelineJob]], Callable[P, PipelineJob]]: + """Build a pipeline which contains all component nodes defined in this function. + + :param func: The user pipeline function to be decorated. + :type func: types.FunctionType + :keyword name: The name of pipeline component, defaults to function name. + :paramtype name: str + :keyword version: The version of pipeline component, defaults to "1". + :paramtype version: str + :keyword display_name: The display name of pipeline component, defaults to function name. + :paramtype display_name: str + :keyword description: The description of the built pipeline. + :paramtype description: str + :keyword experiment_name: Name of the experiment the job will be created under, \ + if None is provided, experiment will be set to current directory. + :paramtype experiment_name: str + :keyword tags: The tags of pipeline component. + :paramtype tags: dict[str, str] + :return: Either + * A decorator, if `func` is None + * The decorated `func` + + :rtype: Union[ + Callable[[Callable], Callable[..., ~azure.ai.ml.entities.PipelineJob]], + Callable[P, ~azure.ai.ml.entities.PipelineJob] + + ] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_pipeline_job_configurations.py + :start-after: [START configure_pipeline] + :end-before: [END configure_pipeline] + :language: python + :dedent: 8 + :caption: Shows how to create a pipeline using this decorator. + """ + + def pipeline_decorator(func: Callable[P, T]) -> Callable: + if not isinstance(func, Callable): # type: ignore + raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") + + non_pipeline_inputs = kwargs.get("non_pipeline_inputs", []) or kwargs.get("non_pipeline_parameters", []) + # compute variable names changed from default_compute_targe -> compute -> default_compute -> none + # to support legacy usage, we support them with priority. + compute = kwargs.get("compute", None) + default_compute_target = kwargs.get("default_compute_target", None) + default_compute_target = kwargs.get("default_compute", None) or default_compute_target + continue_on_step_failure = kwargs.get("continue_on_step_failure", None) + on_init = kwargs.get("on_init", None) + on_finalize = kwargs.get("on_finalize", None) + + default_datastore = kwargs.get("default_datastore", None) + force_rerun = kwargs.get("force_rerun", None) + job_settings = { + "default_datastore": default_datastore, + "continue_on_step_failure": continue_on_step_failure, + "force_rerun": force_rerun, + "default_compute": default_compute_target, + "on_init": on_init, + "on_finalize": on_finalize, + } + func_entry_path = _resolve_source_file() + if not func_entry_path: + func_path = Path(inspect.getfile(func)) + # in notebook, func_path may be a fake path and will raise error when trying to resolve this fake path + if func_path.exists(): + func_entry_path = func_path.resolve().absolute() + + job_settings = {k: v for k, v in job_settings.items() if v is not None} + pipeline_builder = PipelineComponentBuilder( + func=func, + name=name, + version=version, + display_name=display_name, + description=description, + default_datastore=default_datastore, + tags=tags, + source_path=str(func_entry_path), + non_pipeline_inputs=non_pipeline_inputs, + ) + + @wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> Union[Pipeline, PipelineJob]: + # Default args will be added here. + # Node: push/pop stack here instead of put it inside build() + # Because we only want to enable dsl settings on top level pipeline + _dsl_settings_stack.push() # use this stack to track on_init/on_finalize settings + try: + # Convert args to kwargs + provided_positional_kwargs = _validate_args(func, args, kwargs, non_pipeline_inputs) + + # When pipeline supports variable params, update pipeline component to support the inputs in **kwargs. + pipeline_parameters = { + k: v for k, v in provided_positional_kwargs.items() if k not in non_pipeline_inputs + } + pipeline_builder._update_inputs(pipeline_parameters) + + non_pipeline_params_dict = { + k: v for k, v in provided_positional_kwargs.items() if k in non_pipeline_inputs + } + + # TODO: cache built pipeline component + pipeline_component = pipeline_builder.build( + user_provided_kwargs=provided_positional_kwargs, + non_pipeline_inputs_dict=non_pipeline_params_dict, + non_pipeline_inputs=non_pipeline_inputs, + ) + finally: + # use `finally` to ensure pop operation from the stack + dsl_settings = _dsl_settings_stack.pop() + + # update on_init/on_finalize settings if init/finalize job is set + if dsl_settings.init_job_set: + job_settings["on_init"] = dsl_settings.init_job_name(pipeline_component.jobs) + if dsl_settings.finalize_job_set: + job_settings["on_finalize"] = dsl_settings.finalize_job_name(pipeline_component.jobs) + + # TODO: pass compute & default_compute separately? + common_init_args: Any = { + "experiment_name": experiment_name, + "component": pipeline_component, + "inputs": pipeline_parameters, + "tags": tags, + } + built_pipeline: Any = None + if _is_inside_dsl_pipeline_func(): + # on_init/on_finalize is not supported for pipeline component + if job_settings.get("on_init") is not None or job_settings.get("on_finalize") is not None: + raise UserErrorException("On_init/on_finalize is not supported for pipeline component.") + # Build pipeline node instead of pipeline job if inside dsl. + built_pipeline = Pipeline(_from_component_func=True, **common_init_args) + if job_settings: + module_logger.warning( + ("Job settings %s on pipeline function %r are ignored when using inside PipelineJob."), + job_settings, + func.__name__, + ) + else: + built_pipeline = PipelineJob( + jobs=pipeline_component.jobs, + compute=compute, + settings=PipelineJobSettings(**job_settings), + **common_init_args, + ) + + return built_pipeline + + # Bug Item number: 2883169 + wrapper._is_dsl_func = True # type: ignore + wrapper._job_settings = job_settings # type: ignore + wrapper._pipeline_builder = pipeline_builder # type: ignore + return wrapper + + # enable use decorator without "()" if all arguments are default values + if func is not None: + return pipeline_decorator(func) + return pipeline_decorator + + +# pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype +def _validate_args(func: Callable, args: Any, kwargs: Dict, non_pipeline_inputs: List) -> OrderedDict: + """Validate customer function args and convert them to kwargs.""" + if not isinstance(non_pipeline_inputs, List) or any(not isinstance(param, str) for param in non_pipeline_inputs): + msg = "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string" + raise UserErrorException(message=msg, no_personal_data_message=msg) + # Positional arguments validate + is_support_variable_params = is_private_preview_enabled() + all_parameters = signature(func).parameters + # Implicit parameter are *args + var_positional_arg = next(filter(lambda param: param.kind == param.VAR_POSITIONAL, all_parameters.values()), None) + if var_positional_arg: + raise UnsupportedParameterKindError(func.__name__, parameter_kind=f"*{var_positional_arg.name}") + + non_pipeline_inputs = non_pipeline_inputs or [] + unexpected_non_pipeline_inputs = [param for param in non_pipeline_inputs if param not in all_parameters] + if unexpected_non_pipeline_inputs: + raise ParamValueNotExistsError(func.__name__, unexpected_non_pipeline_inputs) + + named_parameters = [ + param for param in all_parameters.values() if param.kind not in [param.VAR_KEYWORD, param.VAR_POSITIONAL] + ] + empty_parameters = {param.name: param for param in named_parameters if param.default is Parameter.empty} + # Implicit parameter are *args and **kwargs + if not is_support_variable_params: + if len(all_parameters.values()) != len(named_parameters): + raise UnsupportedParameterKindError(func.__name__) + + min_num = len(empty_parameters) + max_num = len(named_parameters) + if len(args) > max_num: + raise TooManyPositionalArgsError(func.__name__, min_num, max_num, len(args)) + + func_args, provided_args = list(args), OrderedDict({}) + provided_args = OrderedDict({param.name: func_args.pop(0) for param in named_parameters if len(func_args) > 0}) + for _k, _v in kwargs.items(): + if not is_support_variable_params and _k not in all_parameters: + raise UnexpectedKeywordError(func.__name__, _k, all_parameters.keys()) + if _k in provided_args.keys(): + raise MultipleValueError(func.__name__, _k) + provided_args[_k] = _v + + def _is_supported_data_type(_data: object) -> bool: + return isinstance(_data, SUPPORTED_INPUT_TYPES) or is_group(_data) + + for pipeline_input_name in provided_args: + data = provided_args[pipeline_input_name] + # for input_key, input_value in kwargs.items(): + if isinstance(data, BaseNode): + if len(data.outputs) != 1: + raise ValueError( + "Provided input {} is not a single output node, cannot be used as a node input.".format( + pipeline_input_name + ) + ) + data = next(iter(data.outputs.values())) + provided_args[pipeline_input_name] = data + + if data is not None and not _is_supported_data_type(data) and pipeline_input_name not in non_pipeline_inputs: + msg = ( + "Pipeline input expected an azure.ai.ml.Input or primitive types (str, bool, int or float), " + "but got type {}." + ) + raise UserErrorException( + message=msg.format(type(data)), + no_personal_data_message=msg.format("[type(pipeline_input_name)]"), + ) + + # Note: unprovided required inputs won't cause exception when calling pipeline func + # the exception will be raised in pipeline's customized validate. + return provided_args diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py new file mode 100644 index 00000000..784aaece --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py @@ -0,0 +1,128 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import logging +from collections import deque +from typing import Any, Deque, Dict, Optional, Union + +from azure.ai.ml.entities._builders import BaseNode +from azure.ai.ml.exceptions import UserErrorException + +module_logger = logging.getLogger(__name__) + + +class _DSLSettingsStack: + def __init__(self) -> None: + self._stack: Deque["_DSLSettings"] = deque() + + def push(self) -> None: + self._stack.append(_DSLSettings()) + + def pop(self) -> "_DSLSettings": + self._check_stack() + return self._stack.pop() + + def top(self) -> "_DSLSettings": + self._check_stack() + return self._stack[-1] + + def _check_stack(self) -> None: + # as there is a separate stack push & pop operation management, if stack is empty, + # it should be the scenario that user call `set_pipeline_settings` out of `pipeline` decorator, + # then directly raise user error. + if len(self._stack) == 0: + error_message = "Please call `set_pipeline_settings` inside a `pipeline` decorated function." + raise UserErrorException( + message=error_message, + no_personal_data_message=error_message, + ) + + +_dsl_settings_stack = _DSLSettingsStack() + + +class _DSLSettings: + """Initialization & finalization job settings for DSL pipeline job. + + Store settings from `dsl.set_pipeline_settings` during pipeline definition. + """ + + def __init__(self) -> None: + self._init_job: Any = None + self._finalize_job: Any = None + + @property + def init_job(self) -> Union[BaseNode, str]: + return self._init_job + + @init_job.setter + def init_job(self, value: Optional[Union[BaseNode, str]]) -> None: + # pylint: disable=logging-fstring-interpolation + if isinstance(value, (BaseNode, str)): + self._init_job = value + else: + module_logger.warning(f"Initialization job setting is ignored as input parameter type is {type(value)!r}.") + + @property + def init_job_set(self) -> bool: + # note: need to use `BaseNode is not None` as `bool(BaseNode)` will return False + return self._init_job is not None + + def init_job_name(self, jobs: Dict[str, BaseNode]) -> Optional[str]: + if isinstance(self._init_job, str): + return self._init_job + for name, job in jobs.items(): + if id(self.init_job) == id(job): + return name + module_logger.warning("Initialization job setting is ignored as cannot find corresponding job node.") + return None + + @property + def finalize_job(self) -> Union[BaseNode, str]: + return self._finalize_job + + @finalize_job.setter + def finalize_job(self, value: Optional[Union[BaseNode, str]]) -> None: + # pylint: disable=logging-fstring-interpolation + if isinstance(value, (BaseNode, str)): + self._finalize_job = value + else: + module_logger.warning(f"Finalization job setting is ignored as input parameter type is {type(value)!r}.") + + @property + def finalize_job_set(self) -> bool: + # note: need to use `BaseNode is not None` as `bool(BaseNode)` will return False + return self._finalize_job is not None + + def finalize_job_name(self, jobs: Dict[str, BaseNode]) -> Optional[str]: + if isinstance(self._finalize_job, str): + return self._finalize_job + for name, job in jobs.items(): + if id(self.finalize_job) == id(job): + return name + module_logger.warning("Finalization job setting is ignored as cannot find corresponding job node.") + return None + + +def set_pipeline_settings( + *, + on_init: Optional[Union[BaseNode, str]] = None, + on_finalize: Optional[Union[BaseNode, str]] = None, +) -> None: + """Set pipeline settings for current `dsl.pipeline` definition. + + This function should be called inside a `dsl.pipeline` decorated function, otherwise will raise exception. + + :keyword on_init: On_init job node or name. On init job will be executed before all other jobs, \ + it should not have data connections to regular nodes. + :paramtype on_init: Union[BaseNode, str] + :keyword on_finalize: On_finalize job node or name. On finalize job will be executed after pipeline run \ + finishes (completed/failed/canceled), it should not have data connections to regular nodes. + :paramtype on_finalize: Union[BaseNode, str] + :return: + """ + dsl_settings = _dsl_settings_stack.top() + if on_init is not None: + dsl_settings.init_job = on_init + if on_finalize is not None: + dsl_settings.finalize_job = on_finalize diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py new file mode 100644 index 00000000..6faf669e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py @@ -0,0 +1,222 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +import contextlib +import importlib +import inspect +import os +import re +import sys +import types +from pathlib import Path +from typing import Generator, Optional, Union + +from azure.ai.ml.dsl._constants import VALID_NAME_CHARS +from azure.ai.ml.exceptions import ComponentException, ErrorCategory, ErrorTarget + + +def _normalize_identifier_name(name: str) -> str: + normalized_name = name.lower() + normalized_name = re.sub(r"[\W_]", " ", normalized_name) # No non-word characters + normalized_name = re.sub(" +", " ", normalized_name).strip() # No double spaces, leading or trailing spaces + if re.match(r"\d", normalized_name): + normalized_name = "n" + normalized_name # No leading digits + return normalized_name + + +def _sanitize_python_variable_name(name: str) -> str: + return _normalize_identifier_name(name).replace(" ", "_") + + +def is_valid_name(name: str) -> bool: + """Indicate whether the name is a valid component name. + + :param name: The component name + :type name: str + :return: True if name is a valid name for a component, False otherwise + :rtype: bool + """ + return all(c in VALID_NAME_CHARS for c in name) + + +def _resolve_source_directory() -> Optional[Union[str, Path]]: + """Resolve source directory as last customer frame's module file dir position. + + :return: The directory path of the last customer owner frame in the callstack + :rtype: Optional[Union[str, Path]] + """ + source_file = _resolve_source_file() + # Fall back to current working directory if not found + return os.getcwd() if not source_file else Path(os.path.dirname(source_file)).absolute() + + +def _resolve_source_file() -> Optional[Path]: + """Resolve source file as last customer frame's module file position. + + + :return: The filepath of the last customer owner frame in the callstack + :rtype: Optional[Path] + """ + try: + frame_list = inspect.stack() + # We find the last frame which is in SDK code instead of customer code or dependencies code + # by checking whether the package name of the frame belongs to azure.ai.ml.component. + pattern = r"(^azure\.ai\.ml(?=\..*|$).*)" + for frame, last_frame in zip(frame_list, frame_list[1:]): + if _assert_frame_package_name(pattern, frame.frame) and not _assert_frame_package_name( + pattern, last_frame.frame + ): + module = inspect.getmodule(last_frame.frame) + return Path(str(module.__file__)).absolute() if module else None + except Exception: # pylint: disable=W0718 + pass + return None + + +def _assert_frame_package_name(pattern: str, frame: types.FrameType) -> bool: + """Check the package name of frame is match pattern. + + :param pattern: The pattern to match the package name of `frame` against. + :type pattern: str + :param frame: The stack frame + :type frame: types.FrameType + :return: True if the package name of the frame matches pattern, False otherwise + :rtype: bool + """ + # f_globals records the function's module globals of the frame. And __package__ of module must be set. + # https://docs.python.org/3/reference/import.html#__package__ + # Although __package__ is set when importing, it may happen __package__ does not exist in globals + # when using exec to execute. + package_name = frame.f_globals.get("__package__", "") + return bool(package_name and re.match(pattern, package_name)) + + +def _relative_to( + path: Union[str, os.PathLike], basedir: Union[str, os.PathLike], raises_if_impossible: bool = False +) -> Optional[Path]: + """Compute the relative path under basedir. + + This is a wrapper function of Path.relative_to, by default Path.relative_to raises if path is not under basedir, In + this function, it returns None if raises_if_impossible=False, otherwise raises. + + :param path: A path + :type path: Union[str, os.PathLike] + :param basedir: The base path to compute `path` relative to + :type basedir: Union[str, os.PathLike] + :param raises_if_impossible: Whether to raise if :attr:`pathlib.Path.relative_to` throws. Defaults to False. + :type raises_if_impossible: bool + :return: + * None if raises_if_impossible is False and basedir is not a parent of path + * path.relative_to(basedir) otherwise + :rtype: Optional[Path] + """ + # The second resolve is to resolve possible win short path. + path = Path(path).resolve().absolute().resolve() + basedir = Path(basedir).resolve().absolute().resolve() + try: + return path.relative_to(basedir) + except ValueError: + if raises_if_impossible: + raise + return None + + +@contextlib.contextmanager +def inject_sys_path(path: object) -> Generator: + original_sys_path = sys.path.copy() + sys.path.insert(0, str(path)) + try: + yield + finally: + sys.path = original_sys_path + + +def _force_reload_module(module: types.ModuleType) -> types.ModuleType: + # Reload the module except the case that module.__spec__ is None. + # In the case module.__spec__ is None (E.g. module is __main__), reload will raise exception. + if module.__spec__ is None: + return module + path = Path(module.__spec__.loader.path).parent # type: ignore + with inject_sys_path(path): + return importlib.reload(module) + + +@contextlib.contextmanager +# pylint: disable-next=docstring-missing-return,docstring-missing-rtype +def _change_working_dir(path: Union[str, os.PathLike], mkdir: bool = True) -> Generator: + """Context manager for changing the current working directory. + + :param path: The path to change to + :type path: Union[str, os.PathLike] + :param mkdir: Whether to ensure `path` exists, creating it if it doesn't exists. Defaults to True. + :type mkdir: bool + """ + + saved_path = os.getcwd() + if mkdir: + os.makedirs(path, exist_ok=True) + os.chdir(str(path)) + try: + yield + finally: + os.chdir(saved_path) + + +def _import_component_with_working_dir( + module_name: str, working_dir: Optional[str] = None, force_reload: bool = False +) -> types.ModuleType: + if working_dir is None: + working_dir = os.getcwd() + working_dir = str(Path(working_dir).resolve().absolute()) + + with _change_working_dir(working_dir, mkdir=False), inject_sys_path(working_dir): + try: + py_module = importlib.import_module(module_name) + except Exception as e: + raise ComponentException( + message=str(e), + no_personal_data_message="Failure importing component with working directory", + target=ErrorTarget.COMPONENT, + error=e, + error_category=ErrorCategory.SYSTEM_ERROR, + ) from e + except BaseException as e: + # raise base exception like system.exit as normal exception + raise ComponentException( + message=str(e), + no_personal_data_message="Failure importing component with working directory", + target=ErrorTarget.COMPONENT, + error=e, + error_category=ErrorCategory.USER_ERROR, + ) from e + loaded_module_file = Path(str(py_module.__file__)).resolve().absolute().as_posix() + posix_working_dir = Path(working_dir).absolute().as_posix() + if _relative_to(loaded_module_file, posix_working_dir) is None: + if force_reload: + # If force_reload is True, reload the module instead of raising exception. + # This is used when we don't care the original module with the same name. + return importlib.reload(py_module) + raise RuntimeError( + "Could not import module: '{}' because module with the same name has been loaded.\n" + "Path of the module: {}\n" + "Working dir: {}".format(module_name, loaded_module_file, posix_working_dir) + ) + return py_module + + +@contextlib.contextmanager +def environment_variable_overwrite(key: str, val: str) -> Generator: + if key in os.environ: + backup_value = os.environ[key] + else: + backup_value = None + os.environ[key] = val + + try: + yield + finally: + if backup_value: + os.environ[key] = backup_value + else: + os.environ.pop(key) |