aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/dsl
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/dsl')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py9
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py103
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py75
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py4
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py101
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py192
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py148
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py301
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py43
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py46
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py33
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py22
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py67
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py615
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py327
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py128
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py222
17 files changed, 2436 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py
new file mode 100644
index 00000000..72c77310
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py
@@ -0,0 +1,9 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
+
+from azure.ai.ml.dsl._pipeline_decorator import pipeline
+
+__all__ = ["pipeline"]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py
new file mode 100644
index 00000000..69547cd1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py
@@ -0,0 +1,103 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+from typing import Any, Callable, List, Mapping
+
+from azure.ai.ml.dsl._dynamic import KwParameter, create_kw_function_from_parameters
+from azure.ai.ml.entities import Component as ComponentEntity
+from azure.ai.ml.entities._builders import Command
+from azure.ai.ml.entities._component.datatransfer_component import DataTransferImportComponent
+
+
+def get_dynamic_input_parameter(inputs: Mapping) -> List:
+ """Return the dynamic parameter of the definition's input ports.
+
+ :param inputs: The mapping of input names to input objects.
+ :type inputs: Mapping
+ :return: The list of dynamic parameters.
+ :rtype: List[~azure.ai.ml.dsl._dynamic.KwParameter]
+ """
+ return [
+ KwParameter(
+ name=name,
+ annotation=input._get_python_builtin_type_str(),
+ default=None,
+ _type=input._get_python_builtin_type_str(),
+ )
+ for name, input in inputs.items()
+ ]
+
+
+def get_dynamic_source_parameter(source: Any) -> List:
+ """Return the dynamic parameter of the definition's source port.
+
+ :param source: The source object.
+ :type source: Any
+ :return: The list of dynamic parameters.
+ :rtype: List[~azure.ai.ml.dsl._dynamic.KwParameter]
+ """
+ return [
+ KwParameter(
+ name="source",
+ annotation=source.type,
+ default=None,
+ _type=source.type,
+ )
+ ]
+
+
+def to_component_func(entity: ComponentEntity, component_creation_func: Callable) -> Callable[..., Command]:
+ """Convert a ComponentEntity to a callable component function.
+
+ :param entity: The ComponentEntity to convert.
+ :type entity: ~azure.ai.ml.entities.Component
+ :param component_creation_func: The function for creating a component.
+ :type component_creation_func: Callable
+ :return: The callable component function.
+ :rtype: Callable[..., ~azure.ai.ml.entities._builders.Command]
+ """
+ func_name = "[component] {}".format(entity.display_name)
+
+ func_docstring_lines = []
+ if entity.description is not None:
+ func_docstring_lines.append(entity.description.strip())
+
+ if isinstance(entity, DataTransferImportComponent):
+ all_params = get_dynamic_source_parameter(entity.source)
+ else:
+ all_params = get_dynamic_input_parameter(entity.inputs)
+
+ flattened_group_keys = []
+ # Flatten all group parameters, for function parameter validation.
+ from azure.ai.ml.entities._inputs_outputs import GroupInput
+
+ for name, item in entity.inputs.items():
+ if isinstance(item, GroupInput):
+ flattened_group_keys.extend(list(item.flatten(group_parameter_name=name).keys()))
+
+ doc_string = entity.description
+ # Try add yaml to doc string
+ try:
+ yaml_str = entity._yaml_str if entity._yaml_str else entity._to_yaml()
+ doc_string = "{0}\n\nComponent yaml:\n```yaml\n{1}\n```".format(doc_string, yaml_str)
+ except Exception: # pylint: disable=W0718
+ pass
+
+ params_assignment_str = ", ".join([f"{param.name}=xxx" for param in all_params])
+ example = f"component_func({params_assignment_str})"
+
+ dynamic_func = create_kw_function_from_parameters(
+ component_creation_func,
+ documentation=str(doc_string),
+ parameters=all_params,
+ func_name=func_name,
+ flattened_group_keys=flattened_group_keys,
+ )
+
+ # Bug Item number: 2883188
+ dynamic_func._func_calling_example = example # type: ignore
+ dynamic_func._has_parameters = bool(all_params) # type: ignore
+ return dynamic_func
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py
new file mode 100644
index 00000000..20c46169
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py
@@ -0,0 +1,75 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import List, Optional, Union
+
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.condition_node import ConditionNode
+from azure.ai.ml.entities._job.pipeline._io import InputOutputBase
+from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression
+from azure.ai.ml.exceptions import UserErrorException
+
+# pylint: disable=redefined-outer-name
+
+
+def condition(
+ condition: Union[str, bool, InputOutputBase, BaseNode, PipelineExpression],
+ *,
+ true_block: Optional[Union[BaseNode, List[BaseNode]]] = None,
+ false_block: Optional[Union[BaseNode, List[BaseNode]]] = None,
+) -> ConditionNode:
+ """Create a condition node to provide runtime condition graph experience.
+
+ Below is an example of using an expression result to control which step is executed.
+ If the pipeline parameter 'int_param1' is greater than 'int_param2', then 'true_step' will be executed,
+ otherwise, the 'false_step' will be executed.
+
+ .. code-block:: python
+
+ @dsl.pipeline
+ def pipeline_func(int_param1: int, int_param2: int):
+ true_step = component_func()
+ false_step = another_component_func()
+ dsl.condition(
+ int_param1 > int_param2,
+ true_block=true_step,
+ false_block=false_step
+ )
+
+ :param condition: The condition of the execution flow.
+ The value could be a boolean type control output, a node with exactly one boolean type output,
+ or a pipeline expression.
+ :type condition: Union[
+ str,
+ bool,
+ ~azure.ai.ml.entities._job.pipeline._io.InputOutputBase,
+ ~azure.ai.ml.entities._builders.BaseNode,
+ ~azure.ai.ml.entities._job.pipeline._pipeline_expression.PipelineExpression]
+ :keyword true_block: The block to be executed if the condition resolves to True.
+ :paramtype true_block: Union[
+ ~azure.ai.ml.entities._builders.BaseNode,
+ List[~azure.ai.ml.entities._builders.BaseNode]]
+ :keyword false_block: The block to be executed if the condition resolves to False.
+ :paramtype false_block: Union[
+ ~azure.ai.ml.entities._builders.BaseNode,
+ List[~azure.ai.ml.entities._builders.BaseNode]]
+ :return: The condition node.
+ :rtype: ConditionNode
+ :raises UserErrorException: Raised if the condition node has an incorrect number of outputs.
+ """
+ # resolve expression as command component
+ if isinstance(condition, PipelineExpression):
+ condition = condition.resolve()
+ if isinstance(condition, BaseNode):
+ if len(condition.outputs) != 1:
+ error_message = (
+ f"Exactly one output is expected for condition node, {len(condition.outputs)} outputs found."
+ )
+ raise UserErrorException(message=error_message, no_personal_data_message=error_message)
+ condition = list(condition.outputs.values())[0]
+ return ConditionNode(
+ condition=condition,
+ true_block=true_block, # type: ignore[arg-type]
+ false_block=false_block, # type: ignore[arg-type]
+ _from_component_func=True,
+ )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py
new file mode 100644
index 00000000..461c8c4c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py
@@ -0,0 +1,4 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+VALID_NAME_CHARS = set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._-")
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py
new file mode 100644
index 00000000..59ce50e9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py
@@ -0,0 +1,101 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Dict, Optional, Union
+
+from azure.ai.ml.entities._builders import Command
+from azure.ai.ml.entities._builders.do_while import DoWhile
+from azure.ai.ml.entities._builders.pipeline import Pipeline
+from azure.ai.ml.entities._inputs_outputs import Output
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput
+
+
+def do_while(
+ body: Union[Pipeline, Command], mapping: Dict, max_iteration_count: int, condition: Optional[Output] = None
+) -> DoWhile:
+ """Build a do_while node by specifying the loop body, output-input mapping, and termination condition.
+
+ .. note::
+ The following example shows how to use the `do_while` function to create a pipeline with a `do_while` node.
+
+ .. code-block:: python
+
+ from azure.ai.ml.dsl import pipeline
+ from mldesigner.dsl import do_while
+
+ @pipeline()
+ def your_do_while_body():
+ pass
+
+ @pipeline()
+ def pipeline_with_do_while_node():
+ do_while_body = your_do_while_body()
+ do_while_node = do_while(
+ body=do_while_body,
+ condition=do_while_body.outputs.condition_output,
+ mapping={
+ do_while_body.outputs.output1: do_while_body_inputs.input1,
+ do_while_body.outputs.output2: [
+ do_while_body_inputs.input2,
+ do_while_body_inputs.input3,
+ ],
+ },
+ )
+ # Connect to the do_while_node outputs
+ component = component_func(
+ input1=do_while_body.outputs.output1, input2=do_while_body.outputs.output2
+ )
+
+ :param body: The pipeline job or command node for the do-while loop body.
+ :type body: Union[~azure.ai.ml.entities._builders.pipeline.Pipeline, ~azure.ai.ml.entities._builders.Command]
+ :param mapping: The output-input mapping for each round of the do-while loop.
+ The key is the last round's output of the body, and the value is the input port for the current body.
+ :type mapping: Dict[
+ Union[str, ~azure.ai.ml.entities.Output],
+ Union[str, ~azure.ai.ml.entities.Input, List]]
+ :param max_iteration_count: The limit on running the do-while node.
+ :type max_iteration_count: int
+ :param condition: The name of a boolean output of the body.
+ The do-while loop stops if its value is evaluated to be negative.
+ If not specified, it handles as a while-true loop.
+ :type condition: ~azure.ai.ml.entities.Output
+ :return: The do-while node.
+ :rtype: ~azure.ai.ml.entities._builders.do_while.DoWhile
+ """
+ do_while_node = DoWhile(
+ body=body,
+ condition=condition, # type: ignore[arg-type]
+ mapping=mapping,
+ _from_component_func=True,
+ )
+ do_while_node.set_limits(max_iteration_count=max_iteration_count)
+
+ def _infer_and_update_body_input_from_mapping() -> None:
+ # pylint: disable=protected-access
+ for source_output, body_input in mapping.items():
+ # handle case that mapping key is a NodeOutput
+ output_name = source_output._port_name if isinstance(source_output, NodeOutput) else source_output
+ # if loop body output type is not specified, skip as we have no place to infer
+ if body.outputs[output_name].type is None:
+ continue
+ # body input can be a list of inputs, normalize as a list to process
+ if not isinstance(body_input, list):
+ body_input = [body_input]
+ for single_input in body_input:
+ # if input type is specified, no need to infer and skip
+ if single_input.type is not None:
+ continue
+ inferred_type = body.outputs[output_name].type
+ # update node input
+ single_input._meta._is_inferred_optional = True
+ single_input.type = inferred_type
+ # update node corresponding component input
+ input_name = single_input._meta.name
+ body.component.inputs[input_name]._is_inferred_optional = True # type: ignore[union-attr]
+ body.component.inputs[input_name].type = inferred_type # type: ignore[union-attr]
+
+ # when mapping is a dictionary, infer and update for dynamic input
+ if isinstance(mapping, dict):
+ _infer_and_update_body_input_from_mapping()
+
+ return do_while_node
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py
new file mode 100644
index 00000000..3fcb42fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py
@@ -0,0 +1,192 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import logging
+import types
+from inspect import Parameter, Signature
+from typing import Any, Callable, Dict, Sequence, cast
+
+from azure.ai.ml.entities import Component
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, UnexpectedKeywordError, ValidationException
+
+module_logger = logging.getLogger(__name__)
+
+
+class KwParameter(Parameter):
+ """A keyword-only parameter with a default value.
+
+ :param name: The name of the parameter.
+ :type name: str
+ :param default: The default value of the parameter.
+ :param annotation: The annotation type of the parameter, defaults to `Parameter.empty`.
+ :type annotation: Any
+ :param _type: The type of the parameter, defaults to "str".
+ :type _type: str
+ :param _optional: Indicates if the parameter is optional, defaults to False.
+ :type _optional: bool
+ """
+
+ def __init__(
+ self, name: str, default: Any, annotation: Any = Parameter.empty, _type: str = "str", _optional: bool = False
+ ) -> None:
+ super().__init__(name, Parameter.KEYWORD_ONLY, default=default, annotation=annotation)
+ self._type = _type
+ self._optional = _optional
+
+
+def _replace_function_name(func: types.FunctionType, new_name: str) -> types.FunctionType:
+ """Replaces the name of a function with a new name
+
+ :param func: The function to update
+ :type func: types.FunctionType
+ :param new_name: The new function name
+ :type new_name: str
+ :return: The function with a replaced name, but otherwise unchanged body
+ :rtype: types.FunctionType
+ """
+ try:
+ # Use the original code of the function to initialize a new code object for the new function.
+ code_template = func.__code__
+ # For python>=3.8, it is recommended to use `CodeType.replace`, since the interface is change in py3.8
+ # See https://github.com/python/cpython/blob/384621c42f9102e31ba2c47feba144af09c989e5/Objects/codeobject.c#L646
+ # The interface has been changed in py3.8, so the CodeType initializing code is invalid.
+ # See https://github.com/python/cpython/blob/384621c42f9102e31ba2c47feba144af09c989e5/Objects/codeobject.c#L446
+ if hasattr(code_template, "replace"):
+ code = code_template.replace(co_name=new_name)
+ else:
+ # Before python<3.8, replace is not available, we can only initialize the code as following.
+ # https://github.com/python/cpython/blob/v3.7.8/Objects/codeobject.c#L97
+
+ # Bug Item number: 2881688
+ code = types.CodeType( # type: ignore
+ code_template.co_argcount,
+ code_template.co_kwonlyargcount,
+ code_template.co_nlocals,
+ code_template.co_stacksize,
+ code_template.co_flags,
+ code_template.co_code, # type: ignore
+ code_template.co_consts, # type: ignore
+ code_template.co_names,
+ code_template.co_varnames,
+ code_template.co_filename, # type: ignore
+ new_name, # Use the new name for the new code object.
+ code_template.co_firstlineno, # type: ignore
+ code_template.co_lnotab, # type: ignore
+ # The following two values are required for closures.
+ code_template.co_freevars, # type: ignore
+ code_template.co_cellvars, # type: ignore
+ )
+ # Initialize a new function with the code object and the new name, see the following ref for more details.
+ # https://github.com/python/cpython/blob/4901fe274bc82b95dc89bcb3de8802a3dfedab32/Objects/clinic/funcobject.c.h#L30
+ return types.FunctionType(
+ code,
+ globals=func.__globals__,
+ name=new_name,
+ argdefs=func.__defaults__,
+ # Closure must be set to make sure free variables work.
+ closure=func.__closure__,
+ )
+ except BaseException: # pylint: disable=W0718
+ # If the dynamic replacing failed in corner cases, simply set the two fields.
+ func.__name__ = func.__qualname__ = new_name
+ return func
+
+
+# pylint: disable-next=docstring-missing-param
+def _assert_arg_valid(kwargs: dict, keys: list, func_name: str) -> None:
+ """Assert the arg keys are all in keys."""
+ # pylint: disable=protected-access
+ # validate component input names
+ Component._validate_io_names(kwargs, raise_error=True)
+ lower2original_parameter_names = {x.lower(): x for x in keys}
+ kwargs_need_to_update = []
+ for key in kwargs:
+ if key not in keys:
+ lower_key = key.lower()
+ if lower_key in lower2original_parameter_names:
+ # record key that need to update
+ kwargs_need_to_update.append(key)
+ if key != lower_key:
+ # raise warning if name not match sanitize version
+ module_logger.warning(
+ "Component input name %s, treat it as %s", key, lower2original_parameter_names[lower_key]
+ )
+ else:
+ raise UnexpectedKeywordError(func_name=func_name, keyword=key, keywords=keys)
+ # update kwargs to align with yaml definition
+ for key in kwargs_need_to_update:
+ kwargs[lower2original_parameter_names[key.lower()]] = kwargs.pop(key)
+
+
+def _update_dct_if_not_exist(dst: Dict, src: Dict) -> None:
+ """Computes the union of `src` and `dst`, in-place within `dst`
+
+ If a key exists in `dst` and `src` the value in `dst` is preserved
+
+ :param dst: The destination to compute the union within
+ :type dst: Dict
+ :param src: A dictionary to include in the union
+ :type src: Dict
+ """
+ for k, v in src.items():
+ if k not in dst:
+ dst[k] = v
+
+
+def create_kw_function_from_parameters(
+ func: Callable,
+ parameters: Sequence[Parameter],
+ flattened_group_keys: list,
+ func_name: str,
+ documentation: str,
+) -> Callable:
+ """Create a new keyword-only function with provided parameters.
+
+ :param func: The original function to be wrapped.
+ :type func: Callable
+ :param parameters: The sequence of parameters for the new function.
+ :type parameters: Sequence[Parameter]
+ :param flattened_group_keys: The list of valid group keys.
+ :type flattened_group_keys: list
+ :param func_name: The name of the new function.
+ :type func_name: str
+ :param documentation: The documentation string for the new function.
+ :type documentation: str
+ :return: The new keyword-only function.
+ :rtype: Callable
+ :raises ValidationException: If the provided function parameters are not keyword-only.
+ """
+ if any(p.default == p.empty or p.kind != Parameter.KEYWORD_ONLY for p in parameters):
+ msg = "This function only accept keyword only parameters."
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ target=ErrorTarget.COMPONENT,
+ )
+ default_kwargs = {p.name: p.default for p in parameters}
+
+ def f(**kwargs: Any) -> Any:
+ # We need to make sure all keys of kwargs are valid.
+ # Merge valid group keys with original keys.
+ _assert_arg_valid(kwargs, [*list(default_kwargs.keys()), *flattened_group_keys], func_name=func_name)
+ # We need to put the default args to the kwargs before invoking the original function.
+ _update_dct_if_not_exist(kwargs, default_kwargs)
+ return func(**kwargs)
+
+ f = _replace_function_name(cast(types.FunctionType, f), func_name)
+ # Set the signature so jupyter notebook could have param hint by calling inspect.signature()
+ # Bug Item number: 2883223
+ f.__signature__ = Signature(parameters) # type: ignore
+ # Set doc/name/module to make sure help(f) shows following expected result.
+ # Expected help(f):
+ #
+ # Help on function FUNC_NAME:
+ # FUNC_NAME(SIGNATURE)
+ # FUNC_DOC
+ #
+ f.__doc__ = documentation # Set documentation to update FUNC_DOC in help.
+ # Set module = None to avoid showing the sentence `in module 'azure.ai.ml.component._dynamic' in help.`
+ # See https://github.com/python/cpython/blob/2145c8c9724287a310bc77a2760d4f1c0ca9eb0c/Lib/pydoc.py#L1757
+ f.__module__ = None # type: ignore
+ return f
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py
new file mode 100644
index 00000000..7f88a70b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py
@@ -0,0 +1,148 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import importlib
+
+from typing import Any, Dict, List, Optional, Union
+
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.entities import CommandComponent, PipelineJob
+from azure.ai.ml.entities._assets.federated_learning_silo import FederatedLearningSilo
+from azure.ai.ml.entities._builders.fl_scatter_gather import FLScatterGather
+
+
+def _check_for_import(package_name: str) -> None:
+ try:
+ importlib.import_module(package_name)
+ except ImportError as e:
+ raise ImportError(
+ "The DSL FL Node has an additional requirement above the rest of the "
+ + "AML SDK repo in that the mldesigner package is required. Please run `pip install mldesigner` "
+ + "and try again."
+ ) from e
+
+
+@experimental
+def fl_scatter_gather(
+ *,
+ silo_configs: List[FederatedLearningSilo],
+ silo_component: Union[PipelineJob, CommandComponent],
+ aggregation_component: Union[PipelineJob, CommandComponent],
+ aggregation_compute: Optional[str] = None,
+ aggregation_datastore: Optional[str] = None,
+ shared_silo_kwargs: Optional[Dict] = None,
+ aggregation_kwargs: Optional[Dict] = None,
+ silo_to_aggregation_argument_map: Optional[Dict] = None,
+ aggregation_to_silo_argument_map: Optional[Dict] = None,
+ max_iterations: int = 1,
+ _create_default_mappings_if_needed: bool = False,
+ **kwargs: Any,
+) -> FLScatterGather:
+ """A federated learning scatter-gather subgraph node.
+
+ It's assumed that this will be used inside of a `@pipeline`-decorated function in order to create a subgraph which
+ will:
+ - Execute a specified pipeline step multiple times (the silo step), with each execution using slightly
+ different inputs, datastores, and computes based on an inputted config.
+ - Merge the outputs of the multiple silo steps into a single input for another step (the aggregation step),
+ which will then process the values into a single unified result.
+ - With the process above being a 'scatter gather' sequence, iterate and perform the scatter gather
+ a number of times according to the max_iterations input, with the output of any given iteration's
+ aggregation step being fed back into the silo steps of the subsequent iteration.
+ - Return the outputs of the last iteration's aggregation step as the node's final output.
+
+ The process of assigning computes, datastores, and inputs to steps is called 'anchoring'. The following
+ details of the anchoring process should be noted:
+ - Computes will always be overridden to their respective compute.
+ - The paths of 'internal' outputs for silo steps (i.e. a component output that becomes an input for another
+ component within the silo step) are anchored to that silo's datastore. All other outputs are anchored
+ to the aggregation datastore.
+ - Some steps are automatically created by this node to merge inputs from the silo steps to the aggregation
+ step. These steps are anchored in the same manner as the aggregation step.
+ - Datastore anchoring ONLY occurs if an output's path value is empty. If a path already exists, it will NOT
+ try to replace it.
+
+ The process of trimming down inputs from the silo steps to a single input for the aggregate step has some
+ caveats:
+ - Only silo outputs of type mltable and uri_folder are supported.
+ - Both the above output types become an mltable which mounts all the silo step outputs as sub-folders.
+
+ The expected use case of this node is shown in the following code snippet:
+
+ .. code-block:: python
+
+ @experimental
+ def fl_pipeline():
+ fl_node = fl_scatter_gather(**many_inputs)
+ return {"pipeline_output" : fl_node.outputs["aggregated_model"]}
+
+ submitted_pipeline_job = my_client.jobs.create_or_update(fl_pipeline(), experiment_name="example_fl_pipeline")
+
+ :keyword silo_configs: A list of FederatedLearningSilo objects,
+ which contain the necessary data to reconfigure components to run on specific computes and datastores,
+ while also targeting specific inputs located on the aforementioned datastores.
+ :paramtype silo_configs: List[~azure.ai.ml.entities._assets.federated_learning_silo.FederatedLearningSilo]
+ :keyword silo_component: A pipeline step that will be run multiple times across different silos, as specified
+ by the silo_configs input. In a typical horizontal federated learning context, this step is what will perform
+ model training using siloed subsets of data. Can be either a PipelineJob or a CommandComponent.
+ :paramtype silo_component: Union[~azure.ai.ml.entities.PipelineJob, ~azure.ai.ml.entities.CommandComponent]
+ :keyword aggregation_component: A pipeline step which receives inputs from the myriad executed silo components,
+ and does something with them. In a typical horizontal federated learning context, this component will merge
+ the models that were independently trained on each silo's data in a single model. Can be either a
+ PipelineJob or a CommandComponent.
+ :paramtype aggregation_component: Union[
+ ~azure.ai.ml.entities.PipelineJob,
+ ~azure.ai.ml.entities.CommandComponent]
+ :keyword aggregation_compute: The name of the compute that the aggregation component will use.
+ :paramtype aggregation_compute: str
+ :keyword aggregation_datastore: The name of the datastore that the aggregation component will use.
+ :paramtype aggregation_datastore: str
+ :keyword shared_silo_kwargs: A dictionary of string keywords to component inputs. This dictionary is treated
+ like kwargs and is injected into ALL executed silo components.
+ :paramtype shared_silo_kwargs: Dict
+ :keyword aggregation_kwargs: A dictionary of string keywords to component inputs. This dictionary is treated
+ like kwargs and is injected into ALL executed aggregation components.
+ :paramtype aggregation_kwargs: Dict
+ :keyword silo_to_aggregation_argument_map: A dictionary specifying the mapping of outputs from the silo step to
+ inputs in the aggregation step. The keys should be output names from the silo step, and the values should be
+ input names in the aggregation step. This allows for customization of the mapping between the steps.
+ :paramtype silo_to_aggregation_argument_map: Dict
+ :keyword aggregation_to_silo_argument_map: A dictionary specifying the mapping of outputs from the aggregation step
+ to inputs in the silo step. The keys should be output names from the aggregation step, and the values should be
+ input names in the silo step. This allows for customization of the mapping between the steps.
+ :paramtype aggregation_to_silo_argument_map: Dict
+ :keyword max_iterations: The maximum number of scatter gather iterations that should be performed.
+ :paramtype max_iterations: int
+ :keyword _create_default_mappings_if_needed:
+ If True, try to automatically create input/output mappings if they're unset.
+ :paramtype _create_default_mappings_if_needed: bool
+ :return: The federated learning scatter-gather subgraph node.
+ :rtype: ~azure.ai.ml.entities._builders.fl_scatter_gather.FLScatterGather
+
+ .. warning::
+
+ Using this node directly from the SDK repo requires that the user have the 'mldesigner' package installed,
+ which is not a standard dependency of this package.
+ """
+ # Private kwargs:
+ # _create_default_mappings_if_needed: if true, then try to automatically create i/o mappings if they're unset.
+
+ # check that mldesigner is available
+ _check_for_import("mldesigner")
+
+ # Like other DSL nodes, this is just a wrapper around a node builder entity initializer.
+ return FLScatterGather(
+ silo_configs=silo_configs,
+ silo_component=silo_component, # type: ignore[arg-type]
+ aggregation_component=aggregation_component, # type: ignore[arg-type]
+ shared_silo_kwargs=shared_silo_kwargs,
+ aggregation_compute=aggregation_compute,
+ aggregation_datastore=aggregation_datastore,
+ aggregation_kwargs=aggregation_kwargs,
+ silo_to_aggregation_argument_map=silo_to_aggregation_argument_map,
+ aggregation_to_silo_argument_map=aggregation_to_silo_argument_map,
+ max_iterations=max_iterations,
+ create_default_mappings_if_needed=_create_default_mappings_if_needed,
+ **kwargs,
+ )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py
new file mode 100644
index 00000000..6455f793
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py
@@ -0,0 +1,301 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, redefined-builtin
+# disable redefined-builtin to use globals/locals as argument name
+
+# Attribute on customized group class to mark a value type as a group of inputs/outputs.
+import threading
+import functools
+from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union
+
+from azure.ai.ml import Input, Output
+from azure.ai.ml.constants._component import IOConstants
+from azure.ai.ml.entities._inputs_outputs import GroupInput, _get_param_with_standard_annotation
+from azure.ai.ml.entities._inputs_outputs.utils import Annotation
+
+T = TypeVar("T")
+
+
+def group(_cls: Type[T]) -> Type[T]:
+ """Group decorator to make user-defined class as a group of inputs/outputs.
+
+ Usage:
+ Group a set of component inputs make component configuration easier.
+
+ e.g. Define a group
+
+ .. code-block:: python
+
+ # define a group
+ @dsl.group
+ class ParamClass:
+ str_param: str
+ int_param: int = 1
+
+
+ # see the help of auto-gen __init__ function
+ help(ParamClass.__init__)
+
+ e.g. Define a group with inheritance
+
+ .. code-block:: python
+
+ # define the parent group
+ @dsl.group
+ class ParentClass:
+ str_param: str
+ int_param: int = 1
+
+
+ @dsl.group
+ class GroupClass(ParentClass):
+ float_param: float
+ str_param: str = "test"
+
+
+ # see the help of auto-gen __init__ function
+ help(GroupClass.__init__)
+
+ e.g. Define a multi-level group
+
+ .. code-block:: python
+
+ # define a sub group
+ @dsl.group
+ class SubGroupClass:
+ str_param: str
+ int_param: int = 1
+
+
+ @dsl.group
+ class ParentClass:
+ param_group1: SubGroupClass
+ # declare a sub group field with default
+ param_group2: SubGroupClass = SubGroupClass(str_param="test")
+
+
+ # see the help of auto-gen __init__ function
+ help(ParentClass.__init__)
+
+ e.g. Link and assign Group
+
+ .. code-block:: python
+
+ # create a sub group value
+ my_param_group = SubGroupClass(str_param="dataset", int_param=2)
+
+ # create a parent group value
+ my_parent_group = ParentClass(param_group1=my_param_group)
+
+
+ # option 1. use annotation to declare the input is a group.
+ @pipeline
+ def pipeline_func(params: SubGroupClass):
+ component = component_func(
+ string_parameter=params.str_param, int_parameter=params.int_param
+ )
+ return component.outputs
+
+
+ # create a pipeline instance
+ pipeline = pipeline_func(my_param_group)
+
+
+ # option 2. use default of input to declare itself a group.
+ @pipeline
+ def pipeline_func(params=my_param_group):
+ component = component_func(
+ string_parameter=params.str_param, int_parameter=params.int_param
+ )
+ return component.outputs
+
+
+ # create a pipeline instance
+ pipeline = pipeline_func()
+
+
+ # use multi-level group in pipeline.
+ @pipeline
+ def parent_func(params: ParentClass):
+ # pass sub group to sub pipeline.
+ pipeline = pipeline_func(params.param_group1)
+ return pipeline.outputs
+
+ Supported Types:
+ * Primitive type: int, str, float, bool
+ * Declare with python type annotation: `param: int`
+ * Declare with Input annotation: `param: Input(type='integer', min=1, max=5)`
+
+ * For sub group class
+ * Sub group class should be decorated with `dsl.group`
+
+ Restrictions:
+ * Each group member's name must be public (not start with '_').
+ * When use group as a pipeline input, user **MUST** write the type annotation
+ or give it a non-None default value to infer the group class.
+
+ :param _cls: The class to decorate
+ :type _cls: Type[T]
+ :return: The decorated class
+ :rtype: Type[T]
+ """
+
+ T2 = TypeVar("T2")
+
+ def _create_fn(
+ name: str,
+ args: Union[List, str],
+ body: Union[List, str],
+ *,
+ globals: Optional[Dict[str, Any]] = None,
+ locals: Optional[Dict[str, Any]] = None,
+ return_type: Optional[Type[T2]],
+ ) -> Callable[..., T2]:
+ """To generate function in class.
+
+ :param name: The name of the new function
+ :type name: str
+ :param args: The parameter names of the new function
+ :type args: List[str]
+ :param body: The source code of the body of the new function
+ :type body: List[str]
+ :keyword globals: The global variables to make available to the new function.
+ :paramtype globals: Dict[str, Any]
+ :keyword locals: The local variables to make available to the new function
+ :paramtype locals: Dict[str, Any]
+ :keyword return_type: The return type of the new function
+ :paramtype return_type: Type[T2]
+ :return: The created function
+ :rtype: Callable[..., T2]
+ """
+ # Reference: Source code of dataclasses.dataclass
+ # Doc link: https://docs.python.org/3/library/dataclasses.html
+ # Reference code link:
+ # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L412
+ # Note that we mutate locals when exec() is called
+ if locals is None:
+ locals = {}
+ if "BUILTINS" not in locals:
+ import builtins
+
+ locals["BUILTINS"] = builtins
+ locals["_return_type"] = return_type
+ return_annotation = "->_return_type"
+ args = ",".join(args)
+ body = "\n".join(f" {b}" for b in body)
+ # Compute the text of the entire function.
+ txt = f" def {name}({args}){return_annotation}:\n{body}"
+ local_vars = ", ".join(locals.keys())
+ txt = f"def __create_fn__({local_vars}):\n{txt}\n return {name}"
+ ns: Dict = {}
+ exec(txt, globals, ns) # pylint: disable=exec-used # nosec
+ res: Callable = ns["__create_fn__"](**locals)
+ return res
+
+ def _create_init_fn( # pylint: disable=unused-argument
+ cls: Type[T], fields: Dict[str, Union[Annotation, Input, Output]]
+ ) -> Callable[..., None]:
+ """Generate the __init__ function for user-defined class.
+
+ :param cls: The class to update
+ :type cls: Type[T]
+ :param fields: The fields
+ :type fields: Dict[str, Union[Annotation, Input, Output]]
+ :return: The __init__ function
+ :rtype: Callable[..., None]
+ """
+
+ # Reference code link:
+ # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L523
+ def _get_data_type_from_annotation(anno: Any) -> Any:
+ if isinstance(anno, GroupInput):
+ return anno._group_class
+ # keep original annotation for Outputs
+ if isinstance(anno, Output):
+ return anno
+ try:
+ # convert to primitive type annotation if possible
+ return IOConstants.PRIMITIVE_STR_2_TYPE[anno.type]
+ except KeyError:
+ # otherwise, keep original annotation
+ return anno
+
+ def _get_default(key: str) -> Any:
+ # will set None as default value when default not exist so won't need to reorder the init params
+ val = fields[key]
+ if val is not None and hasattr(val, "default"):
+ return val.default
+ return None
+
+ locals = {f"_type_{key}": _get_data_type_from_annotation(val) for key, val in fields.items()}
+ # Collect field defaults if val is parameter and is optional
+ defaults = {f"_default_{key}": _get_default(key) for key, val in fields.items()}
+ locals.update(defaults)
+ # Ban positional init as we reordered the parameter.
+ _init_param = ["self", "*"]
+ for key in fields:
+ _init_param.append(f"{key}:_type_{key}=_default_{key}")
+
+ body_lines = [f"self.{key}={key}" for key in fields]
+ # If no body lines, use 'pass'.
+ if not body_lines:
+ body_lines = ["pass"]
+ return _create_fn("__init__", _init_param, body_lines, locals=locals, return_type=None)
+
+ def _create_repr_fn(fields: Dict[str, Union[Annotation, Input, Output]]) -> Callable[..., str]:
+ """Generate the __repr__ function for user-defined class.
+
+ :param fields: The fields
+ :type fields: Dict[str, Union[Annotation, Input, Output]]
+ :return: The __repr__ function
+ :rtype: Callable[..., str]
+ """
+ # Reference code link:
+ # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L582
+ fn = _create_fn(
+ "__repr__",
+ [
+ "self",
+ ],
+ ['return self.__class__.__qualname__ + f"(' + ", ".join([f"{k}={{self.{k}!r}}" for k in fields]) + ')"'],
+ return_type=str,
+ )
+
+ # This function's logic is copied from "recursive_repr" function in
+ # reprlib module to avoid dependency.
+ def _recursive_repr(user_function: Any) -> Any:
+ # Decorator to make a repr function return "..." for a recursive
+ # call.
+ repr_running = set()
+
+ @functools.wraps(user_function)
+ def wrapper(self: Any) -> Any:
+ key = id(self), threading.get_ident()
+ if key in repr_running:
+ return "..."
+ repr_running.add(key)
+ try:
+ result = user_function(self)
+ finally:
+ repr_running.discard(key)
+ return result
+
+ return wrapper
+
+ res: Callable = _recursive_repr(fn)
+ return res
+
+ def _process_class(cls: Type[T], all_fields: Dict[str, Union[Annotation, Input, Output]]) -> Type[T]:
+ setattr(cls, "__init__", _create_init_fn(cls, all_fields))
+ setattr(cls, "__repr__", _create_repr_fn(all_fields))
+ return cls
+
+ def _wrap(cls: Type[T]) -> Type[T]:
+ all_fields = _get_param_with_standard_annotation(cls)
+ # Set group info on cls
+ setattr(cls, IOConstants.GROUP_ATTR_NAME, GroupInput(all_fields, _group_class=cls))
+ # Add init, repr, eq to class with user-defined annotation type
+ return _process_class(cls, all_fields)
+
+ return _wrap(_cls)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py
new file mode 100644
index 00000000..074c0502
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py
@@ -0,0 +1,43 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+from typing import Any, Callable
+
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.entities._builders import Command
+from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
+
+
+# pylint: disable=unused-argument
+def to_component(*, job: ComponentTranslatableMixin, **kwargs: Any) -> Callable[..., Command]:
+ """Translate a job object to a component function, provided job should be able to translate to a component.
+
+ For example:
+
+ .. code-block:: python
+
+ # Load a local command job to a component function.
+ my_job = load_job("my_job.yaml")
+ component_func = dsl.to_component(my_job)
+ # Load a remote command job component to a component function.
+ my_job = ml_client.jobs.get("my_job")
+ component_func = dsl.to_component(my_job)
+
+ # Consuming the component func
+ component = component_func(param1=xxx, param2=xxx)
+
+ :keyword job: Job load from local or remote.
+ :paramtype job: ~azure.ai.ml.entities._job.pipeline._component_translatable.ComponentTranslatableMixin
+
+ :return: Wrapped function call.
+ :rtype: typing.Callable[..., ~azure.ai.ml.entities._builders.command.Command]
+ """
+ from pathlib import Path
+
+ # set default base path as "./". Because if code path is relative path and base path is None, will raise error when
+ # get arm id of Code
+ res: Callable = job._to_component(context={BASE_PATH_CONTEXT_KEY: Path("./")}) # type: ignore[arg-type, assignment]
+ return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py
new file mode 100644
index 00000000..2732cd75
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py
@@ -0,0 +1,46 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+"""This file stores functions and objects that will be used in mldesigner package.
+
+DO NOT change the module names in "all" list. If the interface has changed in source code, wrap it here and keep
+original function/module names the same as before, otherwise mldesigner will be broken by this change.
+"""
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
+
+from azure.ai.ml._internal.entities import InternalComponent
+from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes
+from azure.ai.ml._utils._asset_utils import get_ignore_file
+from azure.ai.ml._utils.utils import try_enable_internal_components
+from azure.ai.ml.dsl._condition import condition
+from azure.ai.ml.dsl._do_while import do_while
+from azure.ai.ml.dsl._group_decorator import group
+from azure.ai.ml.dsl._parallel_for import ParallelFor, parallel_for
+from azure.ai.ml.entities._component.component_factory import component_factory
+from azure.ai.ml.entities._inputs_outputs import _get_param_with_standard_annotation
+from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function
+
+from ._constants import V1_COMPONENT_TO_NODE
+
+component_factory_load_from_dict = component_factory.load_from_dict
+
+
+__all__ = [
+ # to be put in main package
+ "condition",
+ "do_while",
+ "group",
+ "parallel_for",
+ "ParallelFor",
+ # must keep
+ "get_ignore_file",
+ "_get_param_with_standard_annotation",
+ "_generate_component_function",
+ "component_factory_load_from_dict",
+ "V1_COMPONENT_TO_NODE",
+ "try_enable_internal_components",
+ "InternalAdditionalIncludes",
+ "InternalComponent",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py
new file mode 100644
index 00000000..f770c97d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py
@@ -0,0 +1,33 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+"""This file stores constants that will be used in mldesigner package."""
+from azure.ai.ml._internal._schema.component import NodeType as V1NodeType
+from azure.ai.ml._internal.entities import (
+ Ae365exepool,
+ AetherBridge,
+ Command as InternalCommand,
+ Parallel as InternalParallel,
+ Pipeline as InternalPipeline,
+ DataTransfer,
+ Distributed,
+ HDInsight,
+ Hemera,
+ Scope,
+ Starlite,
+)
+
+V1_COMPONENT_TO_NODE = {
+ V1NodeType.SCOPE: Scope,
+ V1NodeType.COMMAND: InternalCommand,
+ V1NodeType.PARALLEL: InternalParallel,
+ V1NodeType.PIPELINE: InternalPipeline,
+ V1NodeType.DATA_TRANSFER: DataTransfer,
+ V1NodeType.DISTRIBUTED: Distributed,
+ V1NodeType.HDI: HDInsight,
+ V1NodeType.STARLITE: Starlite,
+ V1NodeType.HEMERA: Hemera,
+ V1NodeType.AE365EXEPOOL: Ae365exepool,
+ V1NodeType.AETHER_BRIDGE: AetherBridge,
+}
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py
new file mode 100644
index 00000000..6460ec6f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py
@@ -0,0 +1,22 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Mapping, Optional
+
+
+class OverrideDefinition(dict):
+ """Definition of a overridable field of a component job."""
+
+
+def get_override_definition_from_schema(
+ schema: str, # pylint: disable=unused-argument
+) -> Optional[Mapping[str, OverrideDefinition]]:
+ """Ger override definition from a json schema.
+
+ :param schema: Json schema of component job.
+ :type schema: str
+ :return: A dictionary from a override definition name to a override definition.
+ """
+ # TODO: gen override definition
+ return None
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py
new file mode 100644
index 00000000..4158f744
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py
@@ -0,0 +1,67 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Any, Dict, List, Union
+
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.parallel_for import ParallelFor
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
+
+
+def parallel_for(
+ *, body: BaseNode, items: Union[List, Dict, str, PipelineInput, NodeOutput], **kwargs: Any
+) -> ParallelFor:
+ """Build a parallel for loop by specifying the loop body and input items.
+
+ .. note::
+ The following example shows how to use parallel for API to create a pipeline with parallel for node.
+
+ .. code-block:: python
+
+ from azure.ai.ml.dsl import pipeline
+ from mldesigner.dsl import parallel_for
+
+
+ @pipeline
+ def your_loop_body(input):
+ pass
+
+
+ @pipeline
+ def pipeline_with_parallel_for_node():
+ # specify fixed inputs and leave loop config inputs unprovided
+ loop_body = your_loop_body()
+
+ # link loop body & loop config to loop node
+ loop_node = parallel_for(
+ body=loop_body,
+ items=[
+ {"loop_body_input": 1},
+ {"loop_body_input": 2},
+ ],
+ )
+
+ # collect aggregated output from loop body
+ aggregate_node = collect_aggregated_outputs_from_for_each_node(
+ input=loop_node.outputs.output,
+ )
+
+ :keyword body: Node to execute as the loop body.
+ :paramtype body: ~azure.ai.ml.entities._builders.BaseNode
+ :keyword items: The loop body's input which will bind to the loop node.
+ :paramtype items: Union[
+ list,
+ dict,
+ str,
+ ~azure.ai.ml.entities._job.pipeline._io.PipelineInput,
+ ~azure.ai.ml.entities._job.pipeline._io.NodeOutput]
+ :return: The parallel for loop
+ :rtype: ~azure.ai.ml.entities._builders.parallel_for.ParallelFor
+ """
+ parallel_for_node = ParallelFor(
+ body=body, # type: ignore[arg-type]
+ items=items,
+ _from_component_func=True,
+ **kwargs,
+ )
+ return parallel_for_node
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py
new file mode 100644
index 00000000..44026174
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py
@@ -0,0 +1,615 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+import copy
+import inspect
+import logging
+import typing
+from collections import OrderedDict
+from inspect import Parameter, signature
+from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union
+
+from azure.ai.ml._utils._func_utils import get_outputs_and_locals
+from azure.ai.ml._utils.utils import is_valid_node_name, parse_args_description_from_docstring
+from azure.ai.ml.constants._component import ComponentSource, IOConstants
+from azure.ai.ml.constants._job.pipeline import COMPONENT_IO_KEYWORDS
+from azure.ai.ml.dsl._utils import _sanitize_python_variable_name
+from azure.ai.ml.entities import PipelineJob
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._component.pipeline_component import PipelineComponent
+from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output, _get_param_with_standard_annotation
+from azure.ai.ml.entities._inputs_outputs.utils import _get_annotation_by_value, is_group
+from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+from azure.ai.ml.entities._job.pipeline._attr_dict import has_attr_safe
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, PipelineOutput, _GroupAttrDict
+from azure.ai.ml.entities._util import copy_output_setting
+
+# We need to limit the depth of pipeline to avoid the built graph goes too deep and prevent potential
+# stack overflow in dsl.pipeline.
+from azure.ai.ml.exceptions import UserErrorException
+
+_BUILDER_STACK_MAX_DEPTH = 100
+
+module_logger = logging.getLogger(__name__)
+
+
+class _PipelineComponentBuilderStack:
+ def __init__(self) -> None:
+ self.items: List["PipelineComponentBuilder"] = []
+
+ def top(self) -> Optional["PipelineComponentBuilder"]:
+ if self.is_empty():
+ return None
+ return self.items[-1]
+
+ def pop(self) -> Optional["PipelineComponentBuilder"]:
+ if self.is_empty():
+ return None
+ return self.items.pop()
+
+ def push(self, item: object) -> None:
+ error_msg = f"{self.__class__.__name__} only " f"allows pushing `{PipelineComponentBuilder.__name__}` element"
+ assert isinstance(item, PipelineComponentBuilder), error_msg
+
+ # TODO: validate cycle
+ self.items.append(item)
+ if self.size() >= _BUILDER_STACK_MAX_DEPTH:
+ current_pipeline = self.items[0].name
+ # clear current pipeline stack
+ self.items = []
+ msg = "Pipeline {} depth exceeds limitation. Max depth: {}"
+ raise UserErrorException(
+ message=msg.format(current_pipeline, _BUILDER_STACK_MAX_DEPTH),
+ no_personal_data_message=msg.format("[current_pipeline]", _BUILDER_STACK_MAX_DEPTH),
+ )
+
+ def is_empty(self) -> bool:
+ return len(self.items) == 0
+
+ def size(self) -> int:
+ return len(self.items)
+
+
+# This collection is used to record pipeline component builders in current call stack
+_definition_builder_stack = _PipelineComponentBuilderStack()
+
+
+def _is_inside_dsl_pipeline_func() -> bool:
+ """Checks whether executing within a dsl pipeline func
+
+ :return: True if is inside DSL pipeline func.
+ :rtype: bool
+ """
+ return _definition_builder_stack.size() > 0
+
+
+def _add_component_to_current_definition_builder(component: Union[BaseNode, AutoMLJob]) -> None:
+ if _is_inside_dsl_pipeline_func():
+ builder = _definition_builder_stack.top()
+ if builder is not None:
+ builder.add_node(component)
+
+
+class PipelineComponentBuilder:
+ # map from python built-in type to component type
+ # pylint: disable=too-many-instance-attributes
+ DEFAULT_DATA_TYPE_MAPPING = {
+ "float": "number",
+ "int": "integer",
+ "bool": "boolean",
+ "str": "string",
+ }
+ DEFAULT_OUTPUT_NAME = "output"
+
+ def __init__(
+ self,
+ func: Callable,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ display_name: Optional[str] = None,
+ description: Optional[str] = None,
+ default_datastore: Any = None,
+ tags: Optional[Union[Dict[str, str], str]] = None,
+ source_path: Optional[str] = None,
+ non_pipeline_inputs: Optional[List] = None,
+ ):
+ self.func = func
+ name = name if name is not None else func.__name__
+ display_name = display_name if display_name else name
+ description = description if description else func.__doc__
+ self._args_description = parse_args_description_from_docstring(func.__doc__)
+ # List of nodes, order by it's creation order in pipeline.
+ self.nodes: List = []
+ self.non_pipeline_parameter_names = non_pipeline_inputs or []
+ # A dict of inputs name to InputDefinition.
+ # TODO: infer pipeline component input meta from assignment
+ self.inputs = self._build_inputs(func)
+ self.output_annotation = self._get_output_annotation(func)
+ self._name = name
+ self.version = version
+ self.display_name = display_name
+ self.description = description
+ self.default_datastore = default_datastore
+ self.tags = tags
+ self.source_path = source_path
+
+ @property
+ def name(self) -> str:
+ """Name of pipeline builder, it's name will be same as the pipeline definition it builds.
+
+ :return: Pipeline builder name
+ :rtype: str
+ """
+ return self._name
+
+ def add_node(self, node: Union[BaseNode, AutoMLJob]) -> None:
+ """Add node to pipeline builder.
+
+ :param node: A pipeline node.
+ :type node: Union[BaseNode, AutoMLJob]
+ """
+ self.nodes.append(node)
+
+ def build(
+ self,
+ *,
+ user_provided_kwargs: Optional[Dict] = None,
+ non_pipeline_inputs_dict: Optional[Dict] = None,
+ non_pipeline_inputs: Optional[List] = None,
+ ) -> PipelineComponent:
+ """Build a pipeline component from current pipeline builder.
+
+ :keyword user_provided_kwargs: The kwargs user provided to dsl pipeline function. None if not provided.
+ :keyword non_pipeline_inputs_dict: The non-pipeline input provided key-value. None if not exist.
+ :keyword non_pipeline_inputs: List of non-pipeline input name. None if not exist.
+ :return: The built PipelineComponent
+ :rtype: PipelineComponent
+ """
+ if user_provided_kwargs is None:
+ user_provided_kwargs = {}
+ # Clear nodes as we may call build multiple times.
+ self.nodes = []
+
+ kwargs = _build_pipeline_parameter(
+ func=self.func,
+ user_provided_kwargs=user_provided_kwargs,
+ # TODO: support result() for pipeline input inside parameter group
+ group_default_kwargs=self._get_group_parameter_defaults(),
+ non_pipeline_inputs=non_pipeline_inputs,
+ )
+ kwargs.update(non_pipeline_inputs_dict or {})
+
+ # Use a dict to store all variables in self.func
+ # We use this stack to store the dsl pipeline definition hierarchy
+ _definition_builder_stack.push(self)
+
+ try:
+ outputs, _locals = get_outputs_and_locals(self.func, kwargs)
+ finally:
+ _definition_builder_stack.pop()
+
+ if outputs is None:
+ outputs = {}
+
+ jobs: Dict = self._update_nodes_variable_names(_locals)
+ pipeline_component = PipelineComponent(
+ name=self.name,
+ version=self.version,
+ display_name=self.display_name,
+ description=self.description,
+ inputs=self.inputs,
+ jobs=jobs,
+ tags=self.tags, # type: ignore[arg-type]
+ source_path=self.source_path,
+ _source=ComponentSource.DSL,
+ )
+ # TODO: Refine here. The output can not be built first then pass into pipeline component creation,
+ # exception will be raised in component.build_validate_io().
+ pipeline_component._outputs = self._build_pipeline_outputs(outputs)
+ return pipeline_component
+
+ def _validate_group_annotation(self, name: str, val: GroupInput) -> None:
+ for k, v in val.values.items():
+ if isinstance(v, GroupInput):
+ self._validate_group_annotation(k, v)
+ elif isinstance(v, Output):
+ # TODO(2097468): automatically change it to Input when used in input annotation
+ raise UserErrorException("Output annotation cannot be used in @pipeline.")
+ elif isinstance(v, Input):
+ if v.type not in IOConstants.PRIMITIVE_STR_2_TYPE:
+ # TODO(2097478): support port type groups
+ raise UserErrorException(f"Only primitive types can be used as input of group, got {v.type}")
+ else:
+ raise UserErrorException(f"Unsupported annotation type {type(v)} for group field {name}.{k}")
+
+ def _build_inputs(self, func: Union[Callable, Type]) -> Dict:
+ inputs: Dict = _get_param_with_standard_annotation(
+ func, is_func=True, skip_params=self.non_pipeline_parameter_names
+ )
+ for k, v in inputs.items():
+ if isinstance(v, GroupInput):
+ self._validate_group_annotation(name=k, val=v)
+ # add arg description
+ if k in self._args_description:
+ v["description"] = self._args_description[k]
+ return inputs
+
+ def _build_pipeline_outputs(self, outputs: typing.Dict[str, NodeOutput]) -> Dict[str, PipelineOutput]:
+ """Validate if dsl.pipeline returns valid outputs and set output binding. Create PipelineOutput as pipeline's
+ output definition based on node outputs from return.
+
+ :param outputs: Outputs of pipeline
+ :type outputs: Mapping[str, azure.ai.ml.Output]
+ :return: The mapping of output names to PipelineOutput
+ :rtype: Dict[str, PipelineOutput]
+ """
+ error_msg = (
+ "The return type of dsl.pipeline decorated function should be a mapping from output name to "
+ "azure.ai.ml.Output with owner."
+ )
+ if is_group(outputs):
+ outputs = {key: val for key, val in outputs.__dict__.items() if val}
+ if not isinstance(outputs, dict):
+ raise UserErrorException(message=error_msg, no_personal_data_message=error_msg)
+ output_dict = {}
+ output_meta_dict = {}
+ for key, value in outputs.items():
+ if not isinstance(key, str) or not isinstance(value, NodeOutput) or value._owner is None:
+ raise UserErrorException(message=error_msg, no_personal_data_message=error_msg)
+ if value._meta is not None:
+ meta = value._meta
+ else:
+ meta = Output(type=value.type, path=value.path, mode=value.mode, description=value.description)
+
+ # Hack: map internal output type to pipeline output type
+ def _map_internal_output_type(_meta: Output) -> str:
+ """Map component output type to valid pipeline output type.
+
+ :param _meta: The output
+ :type _meta: Output
+ :return: Output type
+ :rtype: str
+ """
+ if type(_meta).__name__ != "InternalOutput":
+ return str(_meta.type)
+ return str(_meta.map_pipeline_output_type()) # type: ignore[attr-defined]
+
+ # Note: Here we set PipelineOutput as Pipeline's output definition as we need output binding.
+ output_meta = Output(
+ type=_map_internal_output_type(meta), # type: ignore[arg-type]
+ description=meta.description,
+ mode=meta.mode,
+ )
+ pipeline_output = PipelineOutput(
+ port_name=key,
+ data=None,
+ # meta is used to create pipeline component, store it here to make sure pipeline component and inner
+ # node output type are consistent
+ meta=output_meta,
+ owner="pipeline",
+ description=self._args_description.get(key, None),
+ # store original node output to be able to trace back to inner node from a pipeline output builder.
+ binding_output=value,
+ )
+ # copy node level output setting to pipeline output
+ copy_output_setting(
+ source=value._owner.outputs[value._port_name], target=pipeline_output # type: ignore[arg-type]
+ )
+
+ value._owner.outputs[value._port_name]._data = pipeline_output # type: ignore[union-attr]
+
+ output_dict[key] = pipeline_output
+ output_meta_dict[key] = output_meta._to_dict()
+
+ self._validate_inferred_outputs(output_meta_dict, output_dict)
+ return output_dict
+
+ def _get_group_parameter_defaults(self) -> Dict:
+ group_defaults = {}
+ for key, val in self.inputs.items():
+ if not isinstance(val, GroupInput):
+ continue
+ # Copy and insert top-level parameter name into group names for all items
+ group_defaults[key] = copy.deepcopy(val.default)
+ group_defaults[key].insert_group_name_for_items(key)
+ return group_defaults
+
+ def _update_nodes_variable_names(self, func_variables: dict) -> Dict[str, Union[BaseNode, AutoMLJob]]:
+ """Update nodes list to ordered dict with variable name key and component object value.
+
+ Variable naming priority:
+ 1. Specified by using xxx.name.
+ e.g.
+ module1 = module_func()
+ module1.name = "node1" # final node name is "node1"
+
+ 2. Variable name
+ e.g.
+ my_node = module_func() # final node name is "my_node"
+
+ 3. Anonymous node, but another node with same component.name has user-defined name
+ e.g.
+ my_node = module_func() # final node name is "my_node"
+ module_fun() # final node name is "my_node_1"
+ module_fun() # final node name is "my_node_2"
+
+ 4. Anonymous node
+ e.g.
+ my_node = module_func() # final node name is "my_node"
+ module_func_1() # final node name is its component name
+
+ :param func_variables: The function variables
+ :type func_variables: dict
+ :return: Map of variable name to component object
+ :rtype: Dict[str, Union[BaseNode, AutoMLJob]]
+ """
+
+ def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]) -> Optional[Union[str, Component]]:
+ # TODO(1979547): refactor this
+ if isinstance(node, AutoMLJob):
+ return node.name or _sanitize_python_variable_name(node.__class__.__name__)
+ if isinstance(node, ControlFlowNode):
+ return _sanitize_python_variable_name(node.__class__.__name__)
+ return node.name or node._get_component_name()
+
+ valid_component_ids = set(item._instance_id for item in self.nodes)
+ id_name_dict = {}
+ name_count_dict = {}
+ compname_udfname_dict = {}
+ local_names = set()
+ result = OrderedDict()
+
+ for k, v in func_variables.items():
+ # TODO(1979547): refactor this
+ if not isinstance(v, (BaseNode, AutoMLJob, PipelineJob, ControlFlowNode)):
+ continue
+ instance_id = getattr(v, "_instance_id", None)
+ if instance_id not in valid_component_ids:
+ continue
+ name = getattr(v, "name", None) or k
+ # for node name _, treat it as anonymous node with name unset
+ if name == "_":
+ continue
+
+ # User defined name must be valid python identifier
+ if not is_valid_node_name(name):
+ raise UserErrorException(
+ f"Invalid node name found: {name!r}. Node name must start with a lower letter or underscore, "
+ "and can only contain lower letters, numbers and underscore."
+ )
+
+ # Raise error when setting a name that already exists, likely conflict with a variable name
+ if name in local_names and instance_id not in id_name_dict:
+ raise UserErrorException(
+ f"Duplicate node name found in pipeline: {self.name!r}, "
+ f"node name: {name!r}. Duplicate check is case-insensitive."
+ )
+ local_names.add(name)
+ id_name_dict[v._instance_id] = name # type: ignore[union-attr]
+ name_count_dict[name] = 1
+
+ # Find the last user-defined name for the same type of components
+ for node in self.nodes:
+ _id = node._instance_id
+ if _id in id_name_dict:
+ compname_udfname_dict[_get_name_or_component_name(node)] = id_name_dict[_id]
+
+ # Refine and fill default name
+ # If component name is same, append '_{count}' suffix
+ for node in self.nodes:
+ _id = node._instance_id
+ if _id not in id_name_dict:
+ target_name = _get_name_or_component_name(node)
+ if node.name is None and target_name in compname_udfname_dict:
+ target_name = compname_udfname_dict[target_name]
+ if target_name not in name_count_dict:
+ name_count_dict[target_name] = 0
+ name_count_dict[target_name] += 1
+ suffix = "" if name_count_dict[target_name] == 1 else f"_{name_count_dict[target_name] - 1}"
+ id_name_dict[_id] = f"{_sanitize_python_variable_name(str(target_name))}{suffix}"
+ final_name = id_name_dict[_id]
+ node.name = final_name
+ result[final_name] = node
+
+ # Validate IO name of node with correct node name, and log warning if there is keyword.
+ self._validate_keyword_in_node_io(node)
+ return result
+
+ def _update_inputs(self, pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]) -> None:
+ """Update the pipeline inputs by the dict.
+
+ :param pipeline_inputs: The pipeline inputs
+ :type pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]
+ """
+ for input_name, value in pipeline_inputs.items():
+ anno: Any = None
+ if input_name not in self.inputs:
+ if isinstance(value, PipelineInput):
+ value = value._data
+ if isinstance(value, Input):
+ anno = copy.copy(value)
+ elif isinstance(value, NodeOutput):
+ anno = Input(type=value.type)
+ else:
+ anno = _get_annotation_by_value(value)
+ anno.name = input_name
+ anno.description = self._args_description.get(input_name)
+ self.inputs[input_name] = anno
+
+ @classmethod
+ def _get_output_annotation(cls, func: Callable) -> Dict[str, Dict]:
+ """Get the output annotation of the function, validate & refine it.
+
+ :param func: The function to retrieve output annotations from
+ :type func: Callable
+ :return: A dict of output annotations
+ :rtype: Dict[str, Dict]
+ """
+ return_annotation = inspect.signature(func).return_annotation
+
+ if is_group(return_annotation):
+ outputs = _get_param_with_standard_annotation(return_annotation, is_func=False)
+ elif isinstance(return_annotation, Output):
+ outputs = {cls.DEFAULT_OUTPUT_NAME: return_annotation}
+ else:
+ # skip if return annotation is not group or output
+ return {}
+
+ output_annotations = {}
+ for key, val in outputs.items():
+ if isinstance(val, GroupInput):
+ raise UserErrorException(message="Nested group annotation is not supported in pipeline output.")
+ # normalize annotation since currently annotation in @group will be converted to Input
+ if isinstance(val, Input):
+ val = Output(type=val.type)
+ if not isinstance(val, Output):
+ raise UserErrorException(
+ message="Invalid output annotation. "
+ f"Only Output annotation in return annotation is supported. Got {type(val)}."
+ )
+ output_annotations[key] = val._to_dict()
+ return output_annotations
+
+ def _validate_inferred_outputs(self, output_meta_dict: dict, output_dict: Dict[str, PipelineOutput]) -> None:
+ """Validate inferred output dict against annotation.
+
+ :param output_meta_dict: The output meta dict
+ :type output_meta_dict: dict
+ :param output_dict: The output dict
+ :type output_dict: Dict[str, PipelineOutput]
+ """
+ if not self.output_annotation:
+ return
+ error_prefix = "Unmatched outputs between actual pipeline output and output in annotation"
+ if output_meta_dict.keys() != self.output_annotation.keys():
+ raise UserErrorException(
+ "{}: actual pipeline component outputs: {}, annotation outputs: {}".format(
+ error_prefix, output_meta_dict.keys(), self.output_annotation.keys()
+ )
+ )
+
+ unmatched_outputs = []
+ for key, actual_output in output_meta_dict.items():
+ expected_output = self.output_annotation[key]
+ actual_output.pop("description", None)
+ expected_description = expected_output.pop("description", None)
+ # skip comparing mode since when component's from remote, output mode is not available
+ actual_output.pop("mode", None)
+ expected_mode = expected_output.pop("mode", None)
+ if expected_output != actual_output:
+ unmatched_outputs.append(
+ f"{key}: pipeline component output: {actual_output} != annotation output {expected_output}"
+ )
+ res = output_dict[key]._meta
+ if expected_description:
+ if res is not None:
+ res.description = expected_description
+ # also copy the description to pipeline job
+ output_dict[key].description = expected_description
+ if expected_mode:
+ if res is not None:
+ res.mode = expected_mode
+ # also copy the mode to pipeline job
+ output_dict[key].mode = expected_mode
+
+ if unmatched_outputs:
+ raise UserErrorException(f"{error_prefix}: {unmatched_outputs}")
+
+ @staticmethod
+ def _validate_keyword_in_node_io(node: Union[BaseNode, AutoMLJob]) -> None:
+ if has_attr_safe(node, "inputs"):
+ for input_name in set(node.inputs) & COMPONENT_IO_KEYWORDS: # type: ignore[arg-type]
+ module_logger.warning(
+ 'Reserved word "%s" is used as input name in node "%s", '
+ "can only be accessed with '%s.inputs[\"%s\"]'",
+ input_name,
+ node.name,
+ node.name,
+ input_name,
+ )
+ if has_attr_safe(node, "outputs"):
+ for output_name in set(node.outputs) & COMPONENT_IO_KEYWORDS: # type: ignore[arg-type]
+ module_logger.warning(
+ 'Reserved word "%s" is used as output name in node "%s", '
+ "can only be accessed with '%s.outputs[\"%s\"]'",
+ output_name,
+ node.name,
+ node.name,
+ output_name,
+ )
+
+
+def _build_pipeline_parameter(
+ func: Optional[Callable],
+ *,
+ user_provided_kwargs: Dict,
+ group_default_kwargs: Optional[Dict] = None,
+ non_pipeline_inputs: Optional[List] = None,
+) -> Dict:
+ # Pass group defaults into kwargs to support group.item can be used even if no default on function.
+ # example:
+ # @group
+ # class Group:
+ # key = 'val'
+ #
+ # @pipeline
+ # def pipeline_func(param: Group):
+ # component_func(input=param.key) <--- param.key should be val.
+
+ # transform kwargs
+ transformed_kwargs, non_pipeline_inputs = {}, non_pipeline_inputs or []
+ if group_default_kwargs:
+ transformed_kwargs.update(
+ {
+ key: _wrap_pipeline_parameter(key, default_value=value, actual_value=value)
+ for key, value in group_default_kwargs.items()
+ if key not in non_pipeline_inputs
+ }
+ )
+
+ def all_params(parameters: Any) -> Generator:
+ yield from parameters.values()
+
+ if func is None:
+ return transformed_kwargs
+
+ parameters = all_params(signature(func).parameters)
+ # transform default values
+ for left_args in parameters:
+ if (
+ left_args.name not in transformed_kwargs
+ and left_args.kind != Parameter.VAR_KEYWORD
+ and left_args.name not in non_pipeline_inputs
+ ):
+ default_value = left_args.default if left_args.default is not Parameter.empty else None
+ actual_value = user_provided_kwargs.get(left_args.name)
+ transformed_kwargs[left_args.name] = _wrap_pipeline_parameter(
+ key=left_args.name, default_value=default_value, actual_value=actual_value
+ )
+ # Add variable kwargs to transformed_kwargs.
+ for key, value in user_provided_kwargs.items():
+ if key not in transformed_kwargs:
+ transformed_kwargs[key] = _wrap_pipeline_parameter(key=key, default_value=None, actual_value=value)
+ return transformed_kwargs
+
+
+def _wrap_pipeline_parameter(
+ key: str, default_value: Any, actual_value: Any, group_names: Optional[List[str]] = None
+) -> Union[_GroupAttrDict, PipelineInput]:
+ # Append parameter path in group
+ group_names = [*group_names] if group_names else []
+ if isinstance(default_value, _GroupAttrDict):
+ group_names.append(key)
+ return _GroupAttrDict(
+ {
+ k: _wrap_pipeline_parameter(k, default_value=v, actual_value=v, group_names=group_names)
+ for k, v in default_value.items()
+ }
+ )
+ # Note: this PipelineInput object is built to mark input as a data binding.
+ # It only exists in dsl.pipeline function execution time and won't store in pipeline job or pipeline component.
+ return PipelineInput(name=key, meta=None, default_data=default_value, data=actual_value, group_names=group_names)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py
new file mode 100644
index 00000000..21b5b6e3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py
@@ -0,0 +1,327 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import inspect
+import logging
+from collections import OrderedDict
+from functools import wraps
+from inspect import Parameter, signature
+from pathlib import Path
+from typing import Any, Callable, Dict, List, Optional, TypeVar, Union, overload
+
+from typing_extensions import ParamSpec
+
+from azure.ai.ml._utils.utils import is_private_preview_enabled
+from azure.ai.ml.entities import Data, Model, PipelineJob, PipelineJobSettings
+from azure.ai.ml.entities._builders.pipeline import Pipeline
+from azure.ai.ml.entities._inputs_outputs import Input, is_group
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, _GroupAttrDict
+from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression
+from azure.ai.ml.exceptions import (
+ MultipleValueError,
+ ParamValueNotExistsError,
+ TooManyPositionalArgsError,
+ UnexpectedKeywordError,
+ UnsupportedParameterKindError,
+ UserErrorException,
+)
+
+from ..entities._builders import BaseNode
+from ._pipeline_component_builder import PipelineComponentBuilder, _is_inside_dsl_pipeline_func
+from ._settings import _dsl_settings_stack
+from ._utils import _resolve_source_file
+
+SUPPORTED_INPUT_TYPES = (
+ PipelineInput,
+ NodeOutput,
+ Input,
+ Model,
+ Data, # For the case use a Data object as an input, we will convert it to Input object
+ Pipeline, # For the case use a pipeline node as the input, we use its only one output as the real input.
+ str,
+ bool,
+ int,
+ float,
+ PipelineExpression,
+ _GroupAttrDict,
+)
+module_logger = logging.getLogger(__name__)
+
+T = TypeVar("T")
+P = ParamSpec("P")
+
+
+# Overload the returns a decorator when func is None
+@overload
+def pipeline(
+ func: None,
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ display_name: Optional[str] = None,
+ description: Optional[str] = None,
+ experiment_name: Optional[str] = None,
+ tags: Optional[Dict[str, str]] = None,
+ **kwargs: Any,
+) -> Callable[[Callable[P, T]], Callable[P, PipelineJob]]: ...
+
+
+# Overload the returns a decorated function when func isn't None
+@overload
+def pipeline(
+ func: Callable[P, T],
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ display_name: Optional[str] = None,
+ description: Optional[str] = None,
+ experiment_name: Optional[str] = None,
+ tags: Optional[Dict[str, str]] = None,
+ **kwargs: Any,
+) -> Callable[P, PipelineJob]: ...
+
+
+def pipeline(
+ func: Optional[Callable[P, T]] = None,
+ *,
+ name: Optional[str] = None,
+ version: Optional[str] = None,
+ display_name: Optional[str] = None,
+ description: Optional[str] = None,
+ experiment_name: Optional[str] = None,
+ tags: Optional[Union[Dict[str, str], str]] = None,
+ **kwargs: Any,
+) -> Union[Callable[[Callable[P, T]], Callable[P, PipelineJob]], Callable[P, PipelineJob]]:
+ """Build a pipeline which contains all component nodes defined in this function.
+
+ :param func: The user pipeline function to be decorated.
+ :type func: types.FunctionType
+ :keyword name: The name of pipeline component, defaults to function name.
+ :paramtype name: str
+ :keyword version: The version of pipeline component, defaults to "1".
+ :paramtype version: str
+ :keyword display_name: The display name of pipeline component, defaults to function name.
+ :paramtype display_name: str
+ :keyword description: The description of the built pipeline.
+ :paramtype description: str
+ :keyword experiment_name: Name of the experiment the job will be created under, \
+ if None is provided, experiment will be set to current directory.
+ :paramtype experiment_name: str
+ :keyword tags: The tags of pipeline component.
+ :paramtype tags: dict[str, str]
+ :return: Either
+ * A decorator, if `func` is None
+ * The decorated `func`
+
+ :rtype: Union[
+ Callable[[Callable], Callable[..., ~azure.ai.ml.entities.PipelineJob]],
+ Callable[P, ~azure.ai.ml.entities.PipelineJob]
+
+ ]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_pipeline_job_configurations.py
+ :start-after: [START configure_pipeline]
+ :end-before: [END configure_pipeline]
+ :language: python
+ :dedent: 8
+ :caption: Shows how to create a pipeline using this decorator.
+ """
+
+ def pipeline_decorator(func: Callable[P, T]) -> Callable:
+ if not isinstance(func, Callable): # type: ignore
+ raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.")
+
+ non_pipeline_inputs = kwargs.get("non_pipeline_inputs", []) or kwargs.get("non_pipeline_parameters", [])
+ # compute variable names changed from default_compute_targe -> compute -> default_compute -> none
+ # to support legacy usage, we support them with priority.
+ compute = kwargs.get("compute", None)
+ default_compute_target = kwargs.get("default_compute_target", None)
+ default_compute_target = kwargs.get("default_compute", None) or default_compute_target
+ continue_on_step_failure = kwargs.get("continue_on_step_failure", None)
+ on_init = kwargs.get("on_init", None)
+ on_finalize = kwargs.get("on_finalize", None)
+
+ default_datastore = kwargs.get("default_datastore", None)
+ force_rerun = kwargs.get("force_rerun", None)
+ job_settings = {
+ "default_datastore": default_datastore,
+ "continue_on_step_failure": continue_on_step_failure,
+ "force_rerun": force_rerun,
+ "default_compute": default_compute_target,
+ "on_init": on_init,
+ "on_finalize": on_finalize,
+ }
+ func_entry_path = _resolve_source_file()
+ if not func_entry_path:
+ func_path = Path(inspect.getfile(func))
+ # in notebook, func_path may be a fake path and will raise error when trying to resolve this fake path
+ if func_path.exists():
+ func_entry_path = func_path.resolve().absolute()
+
+ job_settings = {k: v for k, v in job_settings.items() if v is not None}
+ pipeline_builder = PipelineComponentBuilder(
+ func=func,
+ name=name,
+ version=version,
+ display_name=display_name,
+ description=description,
+ default_datastore=default_datastore,
+ tags=tags,
+ source_path=str(func_entry_path),
+ non_pipeline_inputs=non_pipeline_inputs,
+ )
+
+ @wraps(func)
+ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Union[Pipeline, PipelineJob]:
+ # Default args will be added here.
+ # Node: push/pop stack here instead of put it inside build()
+ # Because we only want to enable dsl settings on top level pipeline
+ _dsl_settings_stack.push() # use this stack to track on_init/on_finalize settings
+ try:
+ # Convert args to kwargs
+ provided_positional_kwargs = _validate_args(func, args, kwargs, non_pipeline_inputs)
+
+ # When pipeline supports variable params, update pipeline component to support the inputs in **kwargs.
+ pipeline_parameters = {
+ k: v for k, v in provided_positional_kwargs.items() if k not in non_pipeline_inputs
+ }
+ pipeline_builder._update_inputs(pipeline_parameters)
+
+ non_pipeline_params_dict = {
+ k: v for k, v in provided_positional_kwargs.items() if k in non_pipeline_inputs
+ }
+
+ # TODO: cache built pipeline component
+ pipeline_component = pipeline_builder.build(
+ user_provided_kwargs=provided_positional_kwargs,
+ non_pipeline_inputs_dict=non_pipeline_params_dict,
+ non_pipeline_inputs=non_pipeline_inputs,
+ )
+ finally:
+ # use `finally` to ensure pop operation from the stack
+ dsl_settings = _dsl_settings_stack.pop()
+
+ # update on_init/on_finalize settings if init/finalize job is set
+ if dsl_settings.init_job_set:
+ job_settings["on_init"] = dsl_settings.init_job_name(pipeline_component.jobs)
+ if dsl_settings.finalize_job_set:
+ job_settings["on_finalize"] = dsl_settings.finalize_job_name(pipeline_component.jobs)
+
+ # TODO: pass compute & default_compute separately?
+ common_init_args: Any = {
+ "experiment_name": experiment_name,
+ "component": pipeline_component,
+ "inputs": pipeline_parameters,
+ "tags": tags,
+ }
+ built_pipeline: Any = None
+ if _is_inside_dsl_pipeline_func():
+ # on_init/on_finalize is not supported for pipeline component
+ if job_settings.get("on_init") is not None or job_settings.get("on_finalize") is not None:
+ raise UserErrorException("On_init/on_finalize is not supported for pipeline component.")
+ # Build pipeline node instead of pipeline job if inside dsl.
+ built_pipeline = Pipeline(_from_component_func=True, **common_init_args)
+ if job_settings:
+ module_logger.warning(
+ ("Job settings %s on pipeline function %r are ignored when using inside PipelineJob."),
+ job_settings,
+ func.__name__,
+ )
+ else:
+ built_pipeline = PipelineJob(
+ jobs=pipeline_component.jobs,
+ compute=compute,
+ settings=PipelineJobSettings(**job_settings),
+ **common_init_args,
+ )
+
+ return built_pipeline
+
+ # Bug Item number: 2883169
+ wrapper._is_dsl_func = True # type: ignore
+ wrapper._job_settings = job_settings # type: ignore
+ wrapper._pipeline_builder = pipeline_builder # type: ignore
+ return wrapper
+
+ # enable use decorator without "()" if all arguments are default values
+ if func is not None:
+ return pipeline_decorator(func)
+ return pipeline_decorator
+
+
+# pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+def _validate_args(func: Callable, args: Any, kwargs: Dict, non_pipeline_inputs: List) -> OrderedDict:
+ """Validate customer function args and convert them to kwargs."""
+ if not isinstance(non_pipeline_inputs, List) or any(not isinstance(param, str) for param in non_pipeline_inputs):
+ msg = "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string"
+ raise UserErrorException(message=msg, no_personal_data_message=msg)
+ # Positional arguments validate
+ is_support_variable_params = is_private_preview_enabled()
+ all_parameters = signature(func).parameters
+ # Implicit parameter are *args
+ var_positional_arg = next(filter(lambda param: param.kind == param.VAR_POSITIONAL, all_parameters.values()), None)
+ if var_positional_arg:
+ raise UnsupportedParameterKindError(func.__name__, parameter_kind=f"*{var_positional_arg.name}")
+
+ non_pipeline_inputs = non_pipeline_inputs or []
+ unexpected_non_pipeline_inputs = [param for param in non_pipeline_inputs if param not in all_parameters]
+ if unexpected_non_pipeline_inputs:
+ raise ParamValueNotExistsError(func.__name__, unexpected_non_pipeline_inputs)
+
+ named_parameters = [
+ param for param in all_parameters.values() if param.kind not in [param.VAR_KEYWORD, param.VAR_POSITIONAL]
+ ]
+ empty_parameters = {param.name: param for param in named_parameters if param.default is Parameter.empty}
+ # Implicit parameter are *args and **kwargs
+ if not is_support_variable_params:
+ if len(all_parameters.values()) != len(named_parameters):
+ raise UnsupportedParameterKindError(func.__name__)
+
+ min_num = len(empty_parameters)
+ max_num = len(named_parameters)
+ if len(args) > max_num:
+ raise TooManyPositionalArgsError(func.__name__, min_num, max_num, len(args))
+
+ func_args, provided_args = list(args), OrderedDict({})
+ provided_args = OrderedDict({param.name: func_args.pop(0) for param in named_parameters if len(func_args) > 0})
+ for _k, _v in kwargs.items():
+ if not is_support_variable_params and _k not in all_parameters:
+ raise UnexpectedKeywordError(func.__name__, _k, all_parameters.keys())
+ if _k in provided_args.keys():
+ raise MultipleValueError(func.__name__, _k)
+ provided_args[_k] = _v
+
+ def _is_supported_data_type(_data: object) -> bool:
+ return isinstance(_data, SUPPORTED_INPUT_TYPES) or is_group(_data)
+
+ for pipeline_input_name in provided_args:
+ data = provided_args[pipeline_input_name]
+ # for input_key, input_value in kwargs.items():
+ if isinstance(data, BaseNode):
+ if len(data.outputs) != 1:
+ raise ValueError(
+ "Provided input {} is not a single output node, cannot be used as a node input.".format(
+ pipeline_input_name
+ )
+ )
+ data = next(iter(data.outputs.values()))
+ provided_args[pipeline_input_name] = data
+
+ if data is not None and not _is_supported_data_type(data) and pipeline_input_name not in non_pipeline_inputs:
+ msg = (
+ "Pipeline input expected an azure.ai.ml.Input or primitive types (str, bool, int or float), "
+ "but got type {}."
+ )
+ raise UserErrorException(
+ message=msg.format(type(data)),
+ no_personal_data_message=msg.format("[type(pipeline_input_name)]"),
+ )
+
+ # Note: unprovided required inputs won't cause exception when calling pipeline func
+ # the exception will be raised in pipeline's customized validate.
+ return provided_args
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py
new file mode 100644
index 00000000..784aaece
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py
@@ -0,0 +1,128 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import logging
+from collections import deque
+from typing import Any, Deque, Dict, Optional, Union
+
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.exceptions import UserErrorException
+
+module_logger = logging.getLogger(__name__)
+
+
+class _DSLSettingsStack:
+ def __init__(self) -> None:
+ self._stack: Deque["_DSLSettings"] = deque()
+
+ def push(self) -> None:
+ self._stack.append(_DSLSettings())
+
+ def pop(self) -> "_DSLSettings":
+ self._check_stack()
+ return self._stack.pop()
+
+ def top(self) -> "_DSLSettings":
+ self._check_stack()
+ return self._stack[-1]
+
+ def _check_stack(self) -> None:
+ # as there is a separate stack push & pop operation management, if stack is empty,
+ # it should be the scenario that user call `set_pipeline_settings` out of `pipeline` decorator,
+ # then directly raise user error.
+ if len(self._stack) == 0:
+ error_message = "Please call `set_pipeline_settings` inside a `pipeline` decorated function."
+ raise UserErrorException(
+ message=error_message,
+ no_personal_data_message=error_message,
+ )
+
+
+_dsl_settings_stack = _DSLSettingsStack()
+
+
+class _DSLSettings:
+ """Initialization & finalization job settings for DSL pipeline job.
+
+ Store settings from `dsl.set_pipeline_settings` during pipeline definition.
+ """
+
+ def __init__(self) -> None:
+ self._init_job: Any = None
+ self._finalize_job: Any = None
+
+ @property
+ def init_job(self) -> Union[BaseNode, str]:
+ return self._init_job
+
+ @init_job.setter
+ def init_job(self, value: Optional[Union[BaseNode, str]]) -> None:
+ # pylint: disable=logging-fstring-interpolation
+ if isinstance(value, (BaseNode, str)):
+ self._init_job = value
+ else:
+ module_logger.warning(f"Initialization job setting is ignored as input parameter type is {type(value)!r}.")
+
+ @property
+ def init_job_set(self) -> bool:
+ # note: need to use `BaseNode is not None` as `bool(BaseNode)` will return False
+ return self._init_job is not None
+
+ def init_job_name(self, jobs: Dict[str, BaseNode]) -> Optional[str]:
+ if isinstance(self._init_job, str):
+ return self._init_job
+ for name, job in jobs.items():
+ if id(self.init_job) == id(job):
+ return name
+ module_logger.warning("Initialization job setting is ignored as cannot find corresponding job node.")
+ return None
+
+ @property
+ def finalize_job(self) -> Union[BaseNode, str]:
+ return self._finalize_job
+
+ @finalize_job.setter
+ def finalize_job(self, value: Optional[Union[BaseNode, str]]) -> None:
+ # pylint: disable=logging-fstring-interpolation
+ if isinstance(value, (BaseNode, str)):
+ self._finalize_job = value
+ else:
+ module_logger.warning(f"Finalization job setting is ignored as input parameter type is {type(value)!r}.")
+
+ @property
+ def finalize_job_set(self) -> bool:
+ # note: need to use `BaseNode is not None` as `bool(BaseNode)` will return False
+ return self._finalize_job is not None
+
+ def finalize_job_name(self, jobs: Dict[str, BaseNode]) -> Optional[str]:
+ if isinstance(self._finalize_job, str):
+ return self._finalize_job
+ for name, job in jobs.items():
+ if id(self.finalize_job) == id(job):
+ return name
+ module_logger.warning("Finalization job setting is ignored as cannot find corresponding job node.")
+ return None
+
+
+def set_pipeline_settings(
+ *,
+ on_init: Optional[Union[BaseNode, str]] = None,
+ on_finalize: Optional[Union[BaseNode, str]] = None,
+) -> None:
+ """Set pipeline settings for current `dsl.pipeline` definition.
+
+ This function should be called inside a `dsl.pipeline` decorated function, otherwise will raise exception.
+
+ :keyword on_init: On_init job node or name. On init job will be executed before all other jobs, \
+ it should not have data connections to regular nodes.
+ :paramtype on_init: Union[BaseNode, str]
+ :keyword on_finalize: On_finalize job node or name. On finalize job will be executed after pipeline run \
+ finishes (completed/failed/canceled), it should not have data connections to regular nodes.
+ :paramtype on_finalize: Union[BaseNode, str]
+ :return:
+ """
+ dsl_settings = _dsl_settings_stack.top()
+ if on_init is not None:
+ dsl_settings.init_job = on_init
+ if on_finalize is not None:
+ dsl_settings.finalize_job = on_finalize
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py
new file mode 100644
index 00000000..6faf669e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py
@@ -0,0 +1,222 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import contextlib
+import importlib
+import inspect
+import os
+import re
+import sys
+import types
+from pathlib import Path
+from typing import Generator, Optional, Union
+
+from azure.ai.ml.dsl._constants import VALID_NAME_CHARS
+from azure.ai.ml.exceptions import ComponentException, ErrorCategory, ErrorTarget
+
+
+def _normalize_identifier_name(name: str) -> str:
+ normalized_name = name.lower()
+ normalized_name = re.sub(r"[\W_]", " ", normalized_name) # No non-word characters
+ normalized_name = re.sub(" +", " ", normalized_name).strip() # No double spaces, leading or trailing spaces
+ if re.match(r"\d", normalized_name):
+ normalized_name = "n" + normalized_name # No leading digits
+ return normalized_name
+
+
+def _sanitize_python_variable_name(name: str) -> str:
+ return _normalize_identifier_name(name).replace(" ", "_")
+
+
+def is_valid_name(name: str) -> bool:
+ """Indicate whether the name is a valid component name.
+
+ :param name: The component name
+ :type name: str
+ :return: True if name is a valid name for a component, False otherwise
+ :rtype: bool
+ """
+ return all(c in VALID_NAME_CHARS for c in name)
+
+
+def _resolve_source_directory() -> Optional[Union[str, Path]]:
+ """Resolve source directory as last customer frame's module file dir position.
+
+ :return: The directory path of the last customer owner frame in the callstack
+ :rtype: Optional[Union[str, Path]]
+ """
+ source_file = _resolve_source_file()
+ # Fall back to current working directory if not found
+ return os.getcwd() if not source_file else Path(os.path.dirname(source_file)).absolute()
+
+
+def _resolve_source_file() -> Optional[Path]:
+ """Resolve source file as last customer frame's module file position.
+
+
+ :return: The filepath of the last customer owner frame in the callstack
+ :rtype: Optional[Path]
+ """
+ try:
+ frame_list = inspect.stack()
+ # We find the last frame which is in SDK code instead of customer code or dependencies code
+ # by checking whether the package name of the frame belongs to azure.ai.ml.component.
+ pattern = r"(^azure\.ai\.ml(?=\..*|$).*)"
+ for frame, last_frame in zip(frame_list, frame_list[1:]):
+ if _assert_frame_package_name(pattern, frame.frame) and not _assert_frame_package_name(
+ pattern, last_frame.frame
+ ):
+ module = inspect.getmodule(last_frame.frame)
+ return Path(str(module.__file__)).absolute() if module else None
+ except Exception: # pylint: disable=W0718
+ pass
+ return None
+
+
+def _assert_frame_package_name(pattern: str, frame: types.FrameType) -> bool:
+ """Check the package name of frame is match pattern.
+
+ :param pattern: The pattern to match the package name of `frame` against.
+ :type pattern: str
+ :param frame: The stack frame
+ :type frame: types.FrameType
+ :return: True if the package name of the frame matches pattern, False otherwise
+ :rtype: bool
+ """
+ # f_globals records the function's module globals of the frame. And __package__ of module must be set.
+ # https://docs.python.org/3/reference/import.html#__package__
+ # Although __package__ is set when importing, it may happen __package__ does not exist in globals
+ # when using exec to execute.
+ package_name = frame.f_globals.get("__package__", "")
+ return bool(package_name and re.match(pattern, package_name))
+
+
+def _relative_to(
+ path: Union[str, os.PathLike], basedir: Union[str, os.PathLike], raises_if_impossible: bool = False
+) -> Optional[Path]:
+ """Compute the relative path under basedir.
+
+ This is a wrapper function of Path.relative_to, by default Path.relative_to raises if path is not under basedir, In
+ this function, it returns None if raises_if_impossible=False, otherwise raises.
+
+ :param path: A path
+ :type path: Union[str, os.PathLike]
+ :param basedir: The base path to compute `path` relative to
+ :type basedir: Union[str, os.PathLike]
+ :param raises_if_impossible: Whether to raise if :attr:`pathlib.Path.relative_to` throws. Defaults to False.
+ :type raises_if_impossible: bool
+ :return:
+ * None if raises_if_impossible is False and basedir is not a parent of path
+ * path.relative_to(basedir) otherwise
+ :rtype: Optional[Path]
+ """
+ # The second resolve is to resolve possible win short path.
+ path = Path(path).resolve().absolute().resolve()
+ basedir = Path(basedir).resolve().absolute().resolve()
+ try:
+ return path.relative_to(basedir)
+ except ValueError:
+ if raises_if_impossible:
+ raise
+ return None
+
+
+@contextlib.contextmanager
+def inject_sys_path(path: object) -> Generator:
+ original_sys_path = sys.path.copy()
+ sys.path.insert(0, str(path))
+ try:
+ yield
+ finally:
+ sys.path = original_sys_path
+
+
+def _force_reload_module(module: types.ModuleType) -> types.ModuleType:
+ # Reload the module except the case that module.__spec__ is None.
+ # In the case module.__spec__ is None (E.g. module is __main__), reload will raise exception.
+ if module.__spec__ is None:
+ return module
+ path = Path(module.__spec__.loader.path).parent # type: ignore
+ with inject_sys_path(path):
+ return importlib.reload(module)
+
+
+@contextlib.contextmanager
+# pylint: disable-next=docstring-missing-return,docstring-missing-rtype
+def _change_working_dir(path: Union[str, os.PathLike], mkdir: bool = True) -> Generator:
+ """Context manager for changing the current working directory.
+
+ :param path: The path to change to
+ :type path: Union[str, os.PathLike]
+ :param mkdir: Whether to ensure `path` exists, creating it if it doesn't exists. Defaults to True.
+ :type mkdir: bool
+ """
+
+ saved_path = os.getcwd()
+ if mkdir:
+ os.makedirs(path, exist_ok=True)
+ os.chdir(str(path))
+ try:
+ yield
+ finally:
+ os.chdir(saved_path)
+
+
+def _import_component_with_working_dir(
+ module_name: str, working_dir: Optional[str] = None, force_reload: bool = False
+) -> types.ModuleType:
+ if working_dir is None:
+ working_dir = os.getcwd()
+ working_dir = str(Path(working_dir).resolve().absolute())
+
+ with _change_working_dir(working_dir, mkdir=False), inject_sys_path(working_dir):
+ try:
+ py_module = importlib.import_module(module_name)
+ except Exception as e:
+ raise ComponentException(
+ message=str(e),
+ no_personal_data_message="Failure importing component with working directory",
+ target=ErrorTarget.COMPONENT,
+ error=e,
+ error_category=ErrorCategory.SYSTEM_ERROR,
+ ) from e
+ except BaseException as e:
+ # raise base exception like system.exit as normal exception
+ raise ComponentException(
+ message=str(e),
+ no_personal_data_message="Failure importing component with working directory",
+ target=ErrorTarget.COMPONENT,
+ error=e,
+ error_category=ErrorCategory.USER_ERROR,
+ ) from e
+ loaded_module_file = Path(str(py_module.__file__)).resolve().absolute().as_posix()
+ posix_working_dir = Path(working_dir).absolute().as_posix()
+ if _relative_to(loaded_module_file, posix_working_dir) is None:
+ if force_reload:
+ # If force_reload is True, reload the module instead of raising exception.
+ # This is used when we don't care the original module with the same name.
+ return importlib.reload(py_module)
+ raise RuntimeError(
+ "Could not import module: '{}' because module with the same name has been loaded.\n"
+ "Path of the module: {}\n"
+ "Working dir: {}".format(module_name, loaded_module_file, posix_working_dir)
+ )
+ return py_module
+
+
+@contextlib.contextmanager
+def environment_variable_overwrite(key: str, val: str) -> Generator:
+ if key in os.environ:
+ backup_value = os.environ[key]
+ else:
+ backup_value = None
+ os.environ[key] = val
+
+ try:
+ yield
+ finally:
+ if backup_value:
+ os.environ[key] = backup_value
+ else:
+ os.environ.pop(key)