about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/dsl
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/dsl')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py9
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py103
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py75
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py4
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py101
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py192
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py148
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py301
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py43
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py46
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py33
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py22
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py67
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py615
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py327
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py128
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py222
17 files changed, 2436 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py
new file mode 100644
index 00000000..72c77310
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/__init__.py
@@ -0,0 +1,9 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
+
+from azure.ai.ml.dsl._pipeline_decorator import pipeline
+
+__all__ = ["pipeline"]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py
new file mode 100644
index 00000000..69547cd1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_component_func.py
@@ -0,0 +1,103 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+from typing import Any, Callable, List, Mapping
+
+from azure.ai.ml.dsl._dynamic import KwParameter, create_kw_function_from_parameters
+from azure.ai.ml.entities import Component as ComponentEntity
+from azure.ai.ml.entities._builders import Command
+from azure.ai.ml.entities._component.datatransfer_component import DataTransferImportComponent
+
+
+def get_dynamic_input_parameter(inputs: Mapping) -> List:
+    """Return the dynamic parameter of the definition's input ports.
+
+    :param inputs: The mapping of input names to input objects.
+    :type inputs: Mapping
+    :return: The list of dynamic parameters.
+    :rtype: List[~azure.ai.ml.dsl._dynamic.KwParameter]
+    """
+    return [
+        KwParameter(
+            name=name,
+            annotation=input._get_python_builtin_type_str(),
+            default=None,
+            _type=input._get_python_builtin_type_str(),
+        )
+        for name, input in inputs.items()
+    ]
+
+
+def get_dynamic_source_parameter(source: Any) -> List:
+    """Return the dynamic parameter of the definition's source port.
+
+    :param source: The source object.
+    :type source: Any
+    :return: The list of dynamic parameters.
+    :rtype: List[~azure.ai.ml.dsl._dynamic.KwParameter]
+    """
+    return [
+        KwParameter(
+            name="source",
+            annotation=source.type,
+            default=None,
+            _type=source.type,
+        )
+    ]
+
+
+def to_component_func(entity: ComponentEntity, component_creation_func: Callable) -> Callable[..., Command]:
+    """Convert a ComponentEntity to a callable component function.
+
+    :param entity: The ComponentEntity to convert.
+    :type entity: ~azure.ai.ml.entities.Component
+    :param component_creation_func: The function for creating a component.
+    :type component_creation_func: Callable
+    :return: The callable component function.
+    :rtype: Callable[..., ~azure.ai.ml.entities._builders.Command]
+    """
+    func_name = "[component] {}".format(entity.display_name)
+
+    func_docstring_lines = []
+    if entity.description is not None:
+        func_docstring_lines.append(entity.description.strip())
+
+    if isinstance(entity, DataTransferImportComponent):
+        all_params = get_dynamic_source_parameter(entity.source)
+    else:
+        all_params = get_dynamic_input_parameter(entity.inputs)
+
+    flattened_group_keys = []
+    # Flatten all group parameters, for function parameter validation.
+    from azure.ai.ml.entities._inputs_outputs import GroupInput
+
+    for name, item in entity.inputs.items():
+        if isinstance(item, GroupInput):
+            flattened_group_keys.extend(list(item.flatten(group_parameter_name=name).keys()))
+
+    doc_string = entity.description
+    # Try add yaml to doc string
+    try:
+        yaml_str = entity._yaml_str if entity._yaml_str else entity._to_yaml()
+        doc_string = "{0}\n\nComponent yaml:\n```yaml\n{1}\n```".format(doc_string, yaml_str)
+    except Exception:  # pylint: disable=W0718
+        pass
+
+    params_assignment_str = ", ".join([f"{param.name}=xxx" for param in all_params])
+    example = f"component_func({params_assignment_str})"
+
+    dynamic_func = create_kw_function_from_parameters(
+        component_creation_func,
+        documentation=str(doc_string),
+        parameters=all_params,
+        func_name=func_name,
+        flattened_group_keys=flattened_group_keys,
+    )
+
+    # Bug Item number: 2883188
+    dynamic_func._func_calling_example = example  # type: ignore
+    dynamic_func._has_parameters = bool(all_params)  # type: ignore
+    return dynamic_func
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py
new file mode 100644
index 00000000..20c46169
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_condition.py
@@ -0,0 +1,75 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import List, Optional, Union
+
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.condition_node import ConditionNode
+from azure.ai.ml.entities._job.pipeline._io import InputOutputBase
+from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression
+from azure.ai.ml.exceptions import UserErrorException
+
+# pylint: disable=redefined-outer-name
+
+
+def condition(
+    condition: Union[str, bool, InputOutputBase, BaseNode, PipelineExpression],
+    *,
+    true_block: Optional[Union[BaseNode, List[BaseNode]]] = None,
+    false_block: Optional[Union[BaseNode, List[BaseNode]]] = None,
+) -> ConditionNode:
+    """Create a condition node to provide runtime condition graph experience.
+
+    Below is an example of using an expression result to control which step is executed.
+    If the pipeline parameter 'int_param1' is greater than 'int_param2', then 'true_step' will be executed,
+    otherwise, the 'false_step' will be executed.
+
+    .. code-block:: python
+
+        @dsl.pipeline
+        def pipeline_func(int_param1: int, int_param2: int):
+            true_step = component_func()
+            false_step = another_component_func()
+            dsl.condition(
+                int_param1 > int_param2,
+                true_block=true_step,
+                false_block=false_step
+            )
+
+    :param condition: The condition of the execution flow.
+        The value could be a boolean type control output, a node with exactly one boolean type output,
+        or a pipeline expression.
+    :type condition: Union[
+        str,
+        bool,
+        ~azure.ai.ml.entities._job.pipeline._io.InputOutputBase,
+        ~azure.ai.ml.entities._builders.BaseNode,
+        ~azure.ai.ml.entities._job.pipeline._pipeline_expression.PipelineExpression]
+    :keyword true_block: The block to be executed if the condition resolves to True.
+    :paramtype true_block: Union[
+        ~azure.ai.ml.entities._builders.BaseNode,
+        List[~azure.ai.ml.entities._builders.BaseNode]]
+    :keyword false_block: The block to be executed if the condition resolves to False.
+    :paramtype false_block: Union[
+        ~azure.ai.ml.entities._builders.BaseNode,
+        List[~azure.ai.ml.entities._builders.BaseNode]]
+    :return: The condition node.
+    :rtype: ConditionNode
+    :raises UserErrorException: Raised if the condition node has an incorrect number of outputs.
+    """
+    # resolve expression as command component
+    if isinstance(condition, PipelineExpression):
+        condition = condition.resolve()
+    if isinstance(condition, BaseNode):
+        if len(condition.outputs) != 1:
+            error_message = (
+                f"Exactly one output is expected for condition node, {len(condition.outputs)} outputs found."
+            )
+            raise UserErrorException(message=error_message, no_personal_data_message=error_message)
+        condition = list(condition.outputs.values())[0]
+    return ConditionNode(
+        condition=condition,
+        true_block=true_block,  # type: ignore[arg-type]
+        false_block=false_block,  # type: ignore[arg-type]
+        _from_component_func=True,
+    )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py
new file mode 100644
index 00000000..461c8c4c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_constants.py
@@ -0,0 +1,4 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+VALID_NAME_CHARS = set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._-")
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py
new file mode 100644
index 00000000..59ce50e9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_do_while.py
@@ -0,0 +1,101 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Dict, Optional, Union
+
+from azure.ai.ml.entities._builders import Command
+from azure.ai.ml.entities._builders.do_while import DoWhile
+from azure.ai.ml.entities._builders.pipeline import Pipeline
+from azure.ai.ml.entities._inputs_outputs import Output
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput
+
+
+def do_while(
+    body: Union[Pipeline, Command], mapping: Dict, max_iteration_count: int, condition: Optional[Output] = None
+) -> DoWhile:
+    """Build a do_while node by specifying the loop body, output-input mapping, and termination condition.
+
+    .. note::
+        The following example shows how to use the `do_while` function to create a pipeline with a `do_while` node.
+
+        .. code-block:: python
+
+            from azure.ai.ml.dsl import pipeline
+            from mldesigner.dsl import do_while
+
+            @pipeline()
+            def your_do_while_body():
+                pass
+
+            @pipeline()
+            def pipeline_with_do_while_node():
+                do_while_body = your_do_while_body()
+                do_while_node = do_while(
+                    body=do_while_body,
+                    condition=do_while_body.outputs.condition_output,
+                    mapping={
+                        do_while_body.outputs.output1: do_while_body_inputs.input1,
+                        do_while_body.outputs.output2: [
+                            do_while_body_inputs.input2,
+                            do_while_body_inputs.input3,
+                        ],
+                    },
+                )
+                # Connect to the do_while_node outputs
+                component = component_func(
+                    input1=do_while_body.outputs.output1, input2=do_while_body.outputs.output2
+                )
+
+    :param body: The pipeline job or command node for the do-while loop body.
+    :type body: Union[~azure.ai.ml.entities._builders.pipeline.Pipeline, ~azure.ai.ml.entities._builders.Command]
+    :param mapping: The output-input mapping for each round of the do-while loop.
+        The key is the last round's output of the body, and the value is the input port for the current body.
+    :type mapping: Dict[
+        Union[str,  ~azure.ai.ml.entities.Output],
+        Union[str, ~azure.ai.ml.entities.Input, List]]
+    :param max_iteration_count: The limit on running the do-while node.
+    :type max_iteration_count: int
+    :param condition: The name of a boolean output of the body.
+        The do-while loop stops if its value is evaluated to be negative.
+        If not specified, it handles as a while-true loop.
+    :type condition:  ~azure.ai.ml.entities.Output
+    :return: The do-while node.
+    :rtype: ~azure.ai.ml.entities._builders.do_while.DoWhile
+    """
+    do_while_node = DoWhile(
+        body=body,
+        condition=condition,  # type: ignore[arg-type]
+        mapping=mapping,
+        _from_component_func=True,
+    )
+    do_while_node.set_limits(max_iteration_count=max_iteration_count)
+
+    def _infer_and_update_body_input_from_mapping() -> None:
+        # pylint: disable=protected-access
+        for source_output, body_input in mapping.items():
+            # handle case that mapping key is a NodeOutput
+            output_name = source_output._port_name if isinstance(source_output, NodeOutput) else source_output
+            # if loop body output type is not specified, skip as we have no place to infer
+            if body.outputs[output_name].type is None:
+                continue
+            # body input can be a list of inputs, normalize as a list to process
+            if not isinstance(body_input, list):
+                body_input = [body_input]
+            for single_input in body_input:
+                # if input type is specified, no need to infer and skip
+                if single_input.type is not None:
+                    continue
+                inferred_type = body.outputs[output_name].type
+                # update node input
+                single_input._meta._is_inferred_optional = True
+                single_input.type = inferred_type
+                # update node corresponding component input
+                input_name = single_input._meta.name
+                body.component.inputs[input_name]._is_inferred_optional = True  # type: ignore[union-attr]
+                body.component.inputs[input_name].type = inferred_type  # type: ignore[union-attr]
+
+    # when mapping is a dictionary, infer and update for dynamic input
+    if isinstance(mapping, dict):
+        _infer_and_update_body_input_from_mapping()
+
+    return do_while_node
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py
new file mode 100644
index 00000000..3fcb42fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_dynamic.py
@@ -0,0 +1,192 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import logging
+import types
+from inspect import Parameter, Signature
+from typing import Any, Callable, Dict, Sequence, cast
+
+from azure.ai.ml.entities import Component
+from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, UnexpectedKeywordError, ValidationException
+
+module_logger = logging.getLogger(__name__)
+
+
+class KwParameter(Parameter):
+    """A keyword-only parameter with a default value.
+
+    :param name: The name of the parameter.
+    :type name: str
+    :param default: The default value of the parameter.
+    :param annotation: The annotation type of the parameter, defaults to `Parameter.empty`.
+    :type annotation: Any
+    :param _type: The type of the parameter, defaults to "str".
+    :type _type: str
+    :param _optional: Indicates if the parameter is optional, defaults to False.
+    :type _optional: bool
+    """
+
+    def __init__(
+        self, name: str, default: Any, annotation: Any = Parameter.empty, _type: str = "str", _optional: bool = False
+    ) -> None:
+        super().__init__(name, Parameter.KEYWORD_ONLY, default=default, annotation=annotation)
+        self._type = _type
+        self._optional = _optional
+
+
+def _replace_function_name(func: types.FunctionType, new_name: str) -> types.FunctionType:
+    """Replaces the name of a function with a new name
+
+    :param func: The function to update
+    :type func: types.FunctionType
+    :param new_name: The new function name
+    :type new_name: str
+    :return: The function with a replaced name, but otherwise unchanged body
+    :rtype: types.FunctionType
+    """
+    try:
+        # Use the original code of the function to initialize a new code object for the new function.
+        code_template = func.__code__
+        # For python>=3.8, it is recommended to use `CodeType.replace`, since the interface is change in py3.8
+        # See https://github.com/python/cpython/blob/384621c42f9102e31ba2c47feba144af09c989e5/Objects/codeobject.c#L646
+        # The interface has been changed in py3.8, so the CodeType initializing code is invalid.
+        # See https://github.com/python/cpython/blob/384621c42f9102e31ba2c47feba144af09c989e5/Objects/codeobject.c#L446
+        if hasattr(code_template, "replace"):
+            code = code_template.replace(co_name=new_name)
+        else:
+            # Before python<3.8, replace is not available, we can only initialize the code as following.
+            # https://github.com/python/cpython/blob/v3.7.8/Objects/codeobject.c#L97
+
+            # Bug Item number: 2881688
+            code = types.CodeType(  # type: ignore
+                code_template.co_argcount,
+                code_template.co_kwonlyargcount,
+                code_template.co_nlocals,
+                code_template.co_stacksize,
+                code_template.co_flags,
+                code_template.co_code,  # type: ignore
+                code_template.co_consts,  # type: ignore
+                code_template.co_names,
+                code_template.co_varnames,
+                code_template.co_filename,  # type: ignore
+                new_name,  # Use the new name for the new code object.
+                code_template.co_firstlineno,  # type: ignore
+                code_template.co_lnotab,  # type: ignore
+                # The following two values are required for closures.
+                code_template.co_freevars,  # type: ignore
+                code_template.co_cellvars,  # type: ignore
+            )
+        # Initialize a new function with the code object and the new name, see the following ref for more details.
+        # https://github.com/python/cpython/blob/4901fe274bc82b95dc89bcb3de8802a3dfedab32/Objects/clinic/funcobject.c.h#L30
+        return types.FunctionType(
+            code,
+            globals=func.__globals__,
+            name=new_name,
+            argdefs=func.__defaults__,
+            # Closure must be set to make sure free variables work.
+            closure=func.__closure__,
+        )
+    except BaseException:  # pylint: disable=W0718
+        # If the dynamic replacing failed in corner cases, simply set the two fields.
+        func.__name__ = func.__qualname__ = new_name
+        return func
+
+
+# pylint: disable-next=docstring-missing-param
+def _assert_arg_valid(kwargs: dict, keys: list, func_name: str) -> None:
+    """Assert the arg keys are all in keys."""
+    # pylint: disable=protected-access
+    # validate component input names
+    Component._validate_io_names(kwargs, raise_error=True)
+    lower2original_parameter_names = {x.lower(): x for x in keys}
+    kwargs_need_to_update = []
+    for key in kwargs:
+        if key not in keys:
+            lower_key = key.lower()
+            if lower_key in lower2original_parameter_names:
+                # record key that need to update
+                kwargs_need_to_update.append(key)
+                if key != lower_key:
+                    # raise warning if name not match sanitize version
+                    module_logger.warning(
+                        "Component input name %s, treat it as %s", key, lower2original_parameter_names[lower_key]
+                    )
+            else:
+                raise UnexpectedKeywordError(func_name=func_name, keyword=key, keywords=keys)
+    # update kwargs to align with yaml definition
+    for key in kwargs_need_to_update:
+        kwargs[lower2original_parameter_names[key.lower()]] = kwargs.pop(key)
+
+
+def _update_dct_if_not_exist(dst: Dict, src: Dict) -> None:
+    """Computes the union of `src` and `dst`, in-place within `dst`
+
+    If a key exists in `dst` and `src` the value in `dst` is preserved
+
+    :param dst: The destination to compute the union within
+    :type dst: Dict
+    :param src: A dictionary to include in the union
+    :type src: Dict
+    """
+    for k, v in src.items():
+        if k not in dst:
+            dst[k] = v
+
+
+def create_kw_function_from_parameters(
+    func: Callable,
+    parameters: Sequence[Parameter],
+    flattened_group_keys: list,
+    func_name: str,
+    documentation: str,
+) -> Callable:
+    """Create a new keyword-only function with provided parameters.
+
+    :param func: The original function to be wrapped.
+    :type func: Callable
+    :param parameters: The sequence of parameters for the new function.
+    :type parameters: Sequence[Parameter]
+    :param flattened_group_keys: The list of valid group keys.
+    :type flattened_group_keys: list
+    :param func_name: The name of the new function.
+    :type func_name: str
+    :param documentation: The documentation string for the new function.
+    :type documentation: str
+    :return: The new keyword-only function.
+    :rtype: Callable
+    :raises ValidationException: If the provided function parameters are not keyword-only.
+    """
+    if any(p.default == p.empty or p.kind != Parameter.KEYWORD_ONLY for p in parameters):
+        msg = "This function only accept keyword only parameters."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            error_category=ErrorCategory.USER_ERROR,
+            target=ErrorTarget.COMPONENT,
+        )
+    default_kwargs = {p.name: p.default for p in parameters}
+
+    def f(**kwargs: Any) -> Any:
+        # We need to make sure all keys of kwargs are valid.
+        # Merge valid group keys with original keys.
+        _assert_arg_valid(kwargs, [*list(default_kwargs.keys()), *flattened_group_keys], func_name=func_name)
+        # We need to put the default args to the kwargs before invoking the original function.
+        _update_dct_if_not_exist(kwargs, default_kwargs)
+        return func(**kwargs)
+
+    f = _replace_function_name(cast(types.FunctionType, f), func_name)
+    # Set the signature so jupyter notebook could have param hint by calling inspect.signature()
+    # Bug Item number: 2883223
+    f.__signature__ = Signature(parameters)  # type: ignore
+    # Set doc/name/module to make sure help(f) shows following expected result.
+    # Expected help(f):
+    #
+    # Help on function FUNC_NAME:
+    # FUNC_NAME(SIGNATURE)
+    #     FUNC_DOC
+    #
+    f.__doc__ = documentation  # Set documentation to update FUNC_DOC in help.
+    # Set module = None to avoid showing the sentence `in module 'azure.ai.ml.component._dynamic' in help.`
+    # See https://github.com/python/cpython/blob/2145c8c9724287a310bc77a2760d4f1c0ca9eb0c/Lib/pydoc.py#L1757
+    f.__module__ = None  # type: ignore
+    return f
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py
new file mode 100644
index 00000000..7f88a70b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_fl_scatter_gather_node.py
@@ -0,0 +1,148 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import importlib
+
+from typing import Any, Dict, List, Optional, Union
+
+from azure.ai.ml._utils._experimental import experimental
+from azure.ai.ml.entities import CommandComponent, PipelineJob
+from azure.ai.ml.entities._assets.federated_learning_silo import FederatedLearningSilo
+from azure.ai.ml.entities._builders.fl_scatter_gather import FLScatterGather
+
+
+def _check_for_import(package_name: str) -> None:
+    try:
+        importlib.import_module(package_name)
+    except ImportError as e:
+        raise ImportError(
+            "The DSL FL Node has an additional requirement above the rest of the "
+            + "AML SDK repo in that the mldesigner package is required. Please run `pip install mldesigner` "
+            + "and try again."
+        ) from e
+
+
+@experimental
+def fl_scatter_gather(
+    *,
+    silo_configs: List[FederatedLearningSilo],
+    silo_component: Union[PipelineJob, CommandComponent],
+    aggregation_component: Union[PipelineJob, CommandComponent],
+    aggregation_compute: Optional[str] = None,
+    aggregation_datastore: Optional[str] = None,
+    shared_silo_kwargs: Optional[Dict] = None,
+    aggregation_kwargs: Optional[Dict] = None,
+    silo_to_aggregation_argument_map: Optional[Dict] = None,
+    aggregation_to_silo_argument_map: Optional[Dict] = None,
+    max_iterations: int = 1,
+    _create_default_mappings_if_needed: bool = False,
+    **kwargs: Any,
+) -> FLScatterGather:
+    """A federated learning scatter-gather subgraph node.
+
+    It's assumed that this will be used inside of a `@pipeline`-decorated function in order to create a subgraph which
+    will:
+        - Execute a specified pipeline step multiple times (the silo step), with each execution using slightly
+            different inputs, datastores, and computes based on an inputted config.
+        - Merge the outputs of the multiple silo steps into a single input for another step (the aggregation step),
+            which will then process the values into a single unified result.
+        - With the process above being a 'scatter gather' sequence, iterate and perform the scatter gather
+            a number of times according to the max_iterations input, with the output of any given iteration's
+            aggregation step being fed back into the silo steps of the subsequent iteration.
+        - Return the outputs of the last iteration's aggregation step as the node's final output.
+
+    The process of assigning computes, datastores, and inputs to steps is called 'anchoring'. The following
+    details of the anchoring process should be noted:
+        - Computes will always be overridden to their respective compute.
+        - The paths of 'internal' outputs for silo steps (i.e. a component output that becomes an input for another
+            component within the silo step) are anchored to that silo's datastore. All other outputs are anchored
+            to the aggregation datastore.
+        - Some steps are automatically created by this node to merge inputs from the silo steps to the aggregation
+            step. These steps are anchored in the same manner as the aggregation step.
+        - Datastore anchoring ONLY occurs if an output's path value is empty. If a path already exists, it will NOT
+            try to replace it.
+
+    The process of trimming down inputs from the silo steps to a single input for the aggregate step has some
+    caveats:
+        - Only silo outputs of type mltable and uri_folder are supported.
+        - Both the above output types become an mltable which mounts all the silo step outputs as sub-folders.
+
+    The expected use case of this node is shown in the following code snippet:
+
+    .. code-block:: python
+
+        @experimental
+        def fl_pipeline():
+            fl_node = fl_scatter_gather(**many_inputs)
+            return {"pipeline_output" : fl_node.outputs["aggregated_model"]}
+
+        submitted_pipeline_job = my_client.jobs.create_or_update(fl_pipeline(), experiment_name="example_fl_pipeline")
+
+    :keyword silo_configs: A list of FederatedLearningSilo objects,
+        which contain the necessary data to reconfigure components to run on specific computes and datastores,
+        while also targeting specific inputs located on the aforementioned datastores.
+    :paramtype silo_configs: List[~azure.ai.ml.entities._assets.federated_learning_silo.FederatedLearningSilo]
+    :keyword silo_component: A pipeline step that will be run multiple times across different silos, as specified
+        by the silo_configs input. In a typical horizontal federated learning context, this step is what will perform
+        model training using siloed subsets of data. Can be either a PipelineJob or a CommandComponent.
+    :paramtype silo_component: Union[~azure.ai.ml.entities.PipelineJob, ~azure.ai.ml.entities.CommandComponent]
+    :keyword aggregation_component: A pipeline step which receives inputs from the myriad executed silo components,
+        and does something with them. In a typical horizontal federated learning context, this component will merge
+        the models that were independently trained on each silo's data in a single model. Can be either a
+        PipelineJob or a CommandComponent.
+    :paramtype aggregation_component: Union[
+        ~azure.ai.ml.entities.PipelineJob,
+        ~azure.ai.ml.entities.CommandComponent]
+    :keyword aggregation_compute: The name of the compute that the aggregation component will use.
+    :paramtype aggregation_compute: str
+    :keyword aggregation_datastore: The name of the datastore that the aggregation component will use.
+    :paramtype aggregation_datastore: str
+    :keyword shared_silo_kwargs: A dictionary of string keywords to component inputs. This dictionary is treated
+        like kwargs and is injected into ALL executed silo components.
+    :paramtype shared_silo_kwargs: Dict
+    :keyword aggregation_kwargs: A dictionary of string keywords to component inputs. This dictionary is treated
+        like kwargs and is injected into ALL executed aggregation components.
+    :paramtype aggregation_kwargs: Dict
+    :keyword silo_to_aggregation_argument_map: A dictionary specifying the mapping of outputs from the silo step to
+        inputs in the aggregation step. The keys should be output names from the silo step, and the values should be
+        input names in the aggregation step. This allows for customization of the mapping between the steps.
+    :paramtype silo_to_aggregation_argument_map: Dict
+    :keyword aggregation_to_silo_argument_map: A dictionary specifying the mapping of outputs from the aggregation step
+        to inputs in the silo step. The keys should be output names from the aggregation step, and the values should be
+        input names in the silo step. This allows for customization of the mapping between the steps.
+    :paramtype aggregation_to_silo_argument_map: Dict
+    :keyword max_iterations: The maximum number of scatter gather iterations that should be performed.
+    :paramtype max_iterations: int
+    :keyword _create_default_mappings_if_needed:
+        If True, try to automatically create input/output mappings if they're unset.
+    :paramtype _create_default_mappings_if_needed: bool
+    :return: The federated learning scatter-gather subgraph node.
+    :rtype: ~azure.ai.ml.entities._builders.fl_scatter_gather.FLScatterGather
+
+    .. warning::
+
+        Using this node directly from the SDK repo requires that the user have the 'mldesigner' package installed,
+        which is not a standard dependency of this package.
+    """
+    # Private kwargs:
+    # _create_default_mappings_if_needed: if true, then try to automatically create i/o mappings if they're unset.
+
+    # check that mldesigner is available
+    _check_for_import("mldesigner")
+
+    # Like other DSL nodes, this is just a wrapper around a node builder entity initializer.
+    return FLScatterGather(
+        silo_configs=silo_configs,
+        silo_component=silo_component,  # type: ignore[arg-type]
+        aggregation_component=aggregation_component,  # type: ignore[arg-type]
+        shared_silo_kwargs=shared_silo_kwargs,
+        aggregation_compute=aggregation_compute,
+        aggregation_datastore=aggregation_datastore,
+        aggregation_kwargs=aggregation_kwargs,
+        silo_to_aggregation_argument_map=silo_to_aggregation_argument_map,
+        aggregation_to_silo_argument_map=aggregation_to_silo_argument_map,
+        max_iterations=max_iterations,
+        create_default_mappings_if_needed=_create_default_mappings_if_needed,
+        **kwargs,
+    )
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py
new file mode 100644
index 00000000..6455f793
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_group_decorator.py
@@ -0,0 +1,301 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+# pylint: disable=protected-access, redefined-builtin
+# disable redefined-builtin to use globals/locals as argument name
+
+# Attribute on customized group class to mark a value type as a group of inputs/outputs.
+import threading
+import functools
+from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union
+
+from azure.ai.ml import Input, Output
+from azure.ai.ml.constants._component import IOConstants
+from azure.ai.ml.entities._inputs_outputs import GroupInput, _get_param_with_standard_annotation
+from azure.ai.ml.entities._inputs_outputs.utils import Annotation
+
+T = TypeVar("T")
+
+
+def group(_cls: Type[T]) -> Type[T]:
+    """Group decorator to make user-defined class as a group of inputs/outputs.
+
+    Usage:
+        Group a set of component inputs make component configuration easier.
+
+        e.g. Define a group
+
+        .. code-block:: python
+
+            # define a group
+            @dsl.group
+            class ParamClass:
+                str_param: str
+                int_param: int = 1
+
+
+            # see the help of auto-gen __init__ function
+            help(ParamClass.__init__)
+
+        e.g. Define a group with inheritance
+
+        .. code-block:: python
+
+            # define the parent group
+            @dsl.group
+            class ParentClass:
+                str_param: str
+                int_param: int = 1
+
+
+            @dsl.group
+            class GroupClass(ParentClass):
+                float_param: float
+                str_param: str = "test"
+
+
+            # see the help of auto-gen __init__ function
+            help(GroupClass.__init__)
+
+        e.g. Define a multi-level group
+
+        .. code-block:: python
+
+            # define a sub group
+            @dsl.group
+            class SubGroupClass:
+                str_param: str
+                int_param: int = 1
+
+
+            @dsl.group
+            class ParentClass:
+                param_group1: SubGroupClass
+                # declare a sub group field with default
+                param_group2: SubGroupClass = SubGroupClass(str_param="test")
+
+
+            # see the help of auto-gen __init__ function
+            help(ParentClass.__init__)
+
+        e.g. Link and assign Group
+
+        .. code-block:: python
+
+            # create a sub group value
+            my_param_group = SubGroupClass(str_param="dataset", int_param=2)
+
+            # create a parent group value
+            my_parent_group = ParentClass(param_group1=my_param_group)
+
+
+            # option 1. use annotation to declare the input is a group.
+            @pipeline
+            def pipeline_func(params: SubGroupClass):
+                component = component_func(
+                    string_parameter=params.str_param, int_parameter=params.int_param
+                )
+                return component.outputs
+
+
+            # create a pipeline instance
+            pipeline = pipeline_func(my_param_group)
+
+
+            # option 2. use default of input to declare itself a group.
+            @pipeline
+            def pipeline_func(params=my_param_group):
+                component = component_func(
+                    string_parameter=params.str_param, int_parameter=params.int_param
+                )
+                return component.outputs
+
+
+            # create a pipeline instance
+            pipeline = pipeline_func()
+
+
+            # use multi-level group in pipeline.
+            @pipeline
+            def parent_func(params: ParentClass):
+                # pass sub group to sub pipeline.
+                pipeline = pipeline_func(params.param_group1)
+                return pipeline.outputs
+
+    Supported Types:
+        * Primitive type: int, str, float, bool
+            * Declare with python type annotation: `param: int`
+            * Declare with Input annotation: `param: Input(type='integer', min=1, max=5)`
+
+        * For sub group class
+            * Sub group class should be decorated with `dsl.group`
+
+    Restrictions:
+        * Each group member's name must be public (not start with '_').
+        * When use group as a pipeline input, user **MUST** write the type annotation
+          or give it a non-None default value to infer the group class.
+
+    :param _cls: The class to decorate
+    :type _cls: Type[T]
+    :return: The decorated class
+    :rtype: Type[T]
+    """
+
+    T2 = TypeVar("T2")
+
+    def _create_fn(
+        name: str,
+        args: Union[List, str],
+        body: Union[List, str],
+        *,
+        globals: Optional[Dict[str, Any]] = None,
+        locals: Optional[Dict[str, Any]] = None,
+        return_type: Optional[Type[T2]],
+    ) -> Callable[..., T2]:
+        """To generate function in class.
+
+        :param name: The name of the new function
+        :type name: str
+        :param args: The parameter names of the new function
+        :type args: List[str]
+        :param body: The source code of the body of the new function
+        :type body: List[str]
+        :keyword globals: The global variables to make available to the new function.
+        :paramtype globals: Dict[str, Any]
+        :keyword locals: The local variables to make available to the new function
+        :paramtype locals: Dict[str, Any]
+        :keyword return_type: The return type of the new function
+        :paramtype return_type: Type[T2]
+        :return: The created function
+        :rtype: Callable[..., T2]
+        """
+        # Reference: Source code of dataclasses.dataclass
+        # Doc link: https://docs.python.org/3/library/dataclasses.html
+        # Reference code link:
+        # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L412
+        # Note that we mutate locals when exec() is called
+        if locals is None:
+            locals = {}
+        if "BUILTINS" not in locals:
+            import builtins
+
+            locals["BUILTINS"] = builtins
+        locals["_return_type"] = return_type
+        return_annotation = "->_return_type"
+        args = ",".join(args)
+        body = "\n".join(f"  {b}" for b in body)
+        # Compute the text of the entire function.
+        txt = f" def {name}({args}){return_annotation}:\n{body}"
+        local_vars = ", ".join(locals.keys())
+        txt = f"def __create_fn__({local_vars}):\n{txt}\n return {name}"
+        ns: Dict = {}
+        exec(txt, globals, ns)  # pylint: disable=exec-used # nosec
+        res: Callable = ns["__create_fn__"](**locals)
+        return res
+
+    def _create_init_fn(  # pylint: disable=unused-argument
+        cls: Type[T], fields: Dict[str, Union[Annotation, Input, Output]]
+    ) -> Callable[..., None]:
+        """Generate the __init__ function for user-defined class.
+
+        :param cls: The class to update
+        :type cls: Type[T]
+        :param fields: The fields
+        :type fields: Dict[str, Union[Annotation, Input, Output]]
+        :return: The __init__ function
+        :rtype: Callable[..., None]
+        """
+
+        # Reference code link:
+        # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L523
+        def _get_data_type_from_annotation(anno: Any) -> Any:
+            if isinstance(anno, GroupInput):
+                return anno._group_class
+            # keep original annotation for Outputs
+            if isinstance(anno, Output):
+                return anno
+            try:
+                # convert to primitive type annotation if possible
+                return IOConstants.PRIMITIVE_STR_2_TYPE[anno.type]
+            except KeyError:
+                # otherwise, keep original annotation
+                return anno
+
+        def _get_default(key: str) -> Any:
+            # will set None as default value when default not exist so won't need to reorder the init params
+            val = fields[key]
+            if val is not None and hasattr(val, "default"):
+                return val.default
+            return None
+
+        locals = {f"_type_{key}": _get_data_type_from_annotation(val) for key, val in fields.items()}
+        # Collect field defaults if val is parameter and is optional
+        defaults = {f"_default_{key}": _get_default(key) for key, val in fields.items()}
+        locals.update(defaults)
+        # Ban positional init as we reordered the parameter.
+        _init_param = ["self", "*"]
+        for key in fields:
+            _init_param.append(f"{key}:_type_{key}=_default_{key}")
+
+        body_lines = [f"self.{key}={key}" for key in fields]
+        # If no body lines, use 'pass'.
+        if not body_lines:
+            body_lines = ["pass"]
+        return _create_fn("__init__", _init_param, body_lines, locals=locals, return_type=None)
+
+    def _create_repr_fn(fields: Dict[str, Union[Annotation, Input, Output]]) -> Callable[..., str]:
+        """Generate the __repr__ function for user-defined class.
+
+        :param fields: The fields
+        :type fields: Dict[str, Union[Annotation, Input, Output]]
+        :return: The __repr__ function
+        :rtype: Callable[..., str]
+        """
+        # Reference code link:
+        # https://github.com/python/cpython/blob/17b16e13bb444001534ed6fccb459084596c8bcf/Lib/dataclasses.py#L582
+        fn = _create_fn(
+            "__repr__",
+            [
+                "self",
+            ],
+            ['return self.__class__.__qualname__ + f"(' + ", ".join([f"{k}={{self.{k}!r}}" for k in fields]) + ')"'],
+            return_type=str,
+        )
+
+        # This function's logic is copied from "recursive_repr" function in
+        # reprlib module to avoid dependency.
+        def _recursive_repr(user_function: Any) -> Any:
+            # Decorator to make a repr function return "..." for a recursive
+            # call.
+            repr_running = set()
+
+            @functools.wraps(user_function)
+            def wrapper(self: Any) -> Any:
+                key = id(self), threading.get_ident()
+                if key in repr_running:
+                    return "..."
+                repr_running.add(key)
+                try:
+                    result = user_function(self)
+                finally:
+                    repr_running.discard(key)
+                return result
+
+            return wrapper
+
+        res: Callable = _recursive_repr(fn)
+        return res
+
+    def _process_class(cls: Type[T], all_fields: Dict[str, Union[Annotation, Input, Output]]) -> Type[T]:
+        setattr(cls, "__init__", _create_init_fn(cls, all_fields))
+        setattr(cls, "__repr__", _create_repr_fn(all_fields))
+        return cls
+
+    def _wrap(cls: Type[T]) -> Type[T]:
+        all_fields = _get_param_with_standard_annotation(cls)
+        # Set group info on cls
+        setattr(cls, IOConstants.GROUP_ATTR_NAME, GroupInput(all_fields, _group_class=cls))
+        # Add init, repr, eq to class with user-defined annotation type
+        return _process_class(cls, all_fields)
+
+    return _wrap(_cls)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py
new file mode 100644
index 00000000..074c0502
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_load_import.py
@@ -0,0 +1,43 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+from typing import Any, Callable
+
+from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
+from azure.ai.ml.entities._builders import Command
+from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
+
+
+# pylint: disable=unused-argument
+def to_component(*, job: ComponentTranslatableMixin, **kwargs: Any) -> Callable[..., Command]:
+    """Translate a job object to a component function, provided job should be able to translate to a component.
+
+    For example:
+
+    .. code-block:: python
+
+        # Load a local command job to a component function.
+        my_job = load_job("my_job.yaml")
+        component_func = dsl.to_component(my_job)
+        # Load a remote command job component to a component function.
+        my_job = ml_client.jobs.get("my_job")
+        component_func = dsl.to_component(my_job)
+
+        # Consuming the component func
+        component = component_func(param1=xxx, param2=xxx)
+
+    :keyword job: Job load from local or remote.
+    :paramtype job: ~azure.ai.ml.entities._job.pipeline._component_translatable.ComponentTranslatableMixin
+
+    :return: Wrapped function call.
+    :rtype: typing.Callable[..., ~azure.ai.ml.entities._builders.command.Command]
+    """
+    from pathlib import Path
+
+    # set default base path as "./". Because if code path is relative path and base path is None, will raise error when
+    # get arm id of Code
+    res: Callable = job._to_component(context={BASE_PATH_CONTEXT_KEY: Path("./")})  # type: ignore[arg-type, assignment]
+    return res
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py
new file mode 100644
index 00000000..2732cd75
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/__init__.py
@@ -0,0 +1,46 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+"""This file stores functions and objects that will be used in mldesigner package.
+
+DO NOT change the module names in "all" list. If the interface has changed in source code, wrap it here and keep
+original function/module names the same as before, otherwise mldesigner will be broken by this change.
+"""
+
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
+
+from azure.ai.ml._internal.entities import InternalComponent
+from azure.ai.ml._internal.entities._additional_includes import InternalAdditionalIncludes
+from azure.ai.ml._utils._asset_utils import get_ignore_file
+from azure.ai.ml._utils.utils import try_enable_internal_components
+from azure.ai.ml.dsl._condition import condition
+from azure.ai.ml.dsl._do_while import do_while
+from azure.ai.ml.dsl._group_decorator import group
+from azure.ai.ml.dsl._parallel_for import ParallelFor, parallel_for
+from azure.ai.ml.entities._component.component_factory import component_factory
+from azure.ai.ml.entities._inputs_outputs import _get_param_with_standard_annotation
+from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function
+
+from ._constants import V1_COMPONENT_TO_NODE
+
+component_factory_load_from_dict = component_factory.load_from_dict
+
+
+__all__ = [
+    # to be put in main package
+    "condition",
+    "do_while",
+    "group",
+    "parallel_for",
+    "ParallelFor",
+    # must keep
+    "get_ignore_file",
+    "_get_param_with_standard_annotation",
+    "_generate_component_function",
+    "component_factory_load_from_dict",
+    "V1_COMPONENT_TO_NODE",
+    "try_enable_internal_components",
+    "InternalAdditionalIncludes",
+    "InternalComponent",
+]
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py
new file mode 100644
index 00000000..f770c97d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_mldesigner/_constants.py
@@ -0,0 +1,33 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+"""This file stores constants that will be used in mldesigner package."""
+from azure.ai.ml._internal._schema.component import NodeType as V1NodeType
+from azure.ai.ml._internal.entities import (
+    Ae365exepool,
+    AetherBridge,
+    Command as InternalCommand,
+    Parallel as InternalParallel,
+    Pipeline as InternalPipeline,
+    DataTransfer,
+    Distributed,
+    HDInsight,
+    Hemera,
+    Scope,
+    Starlite,
+)
+
+V1_COMPONENT_TO_NODE = {
+    V1NodeType.SCOPE: Scope,
+    V1NodeType.COMMAND: InternalCommand,
+    V1NodeType.PARALLEL: InternalParallel,
+    V1NodeType.PIPELINE: InternalPipeline,
+    V1NodeType.DATA_TRANSFER: DataTransfer,
+    V1NodeType.DISTRIBUTED: Distributed,
+    V1NodeType.HDI: HDInsight,
+    V1NodeType.STARLITE: Starlite,
+    V1NodeType.HEMERA: Hemera,
+    V1NodeType.AE365EXEPOOL: Ae365exepool,
+    V1NodeType.AETHER_BRIDGE: AetherBridge,
+}
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py
new file mode 100644
index 00000000..6460ec6f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_overrides_definition.py
@@ -0,0 +1,22 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+from typing import Mapping, Optional
+
+
+class OverrideDefinition(dict):
+    """Definition of a overridable field of a component job."""
+
+
+def get_override_definition_from_schema(
+    schema: str,  # pylint: disable=unused-argument
+) -> Optional[Mapping[str, OverrideDefinition]]:
+    """Ger override definition from a json schema.
+
+    :param schema: Json schema of component job.
+    :type schema: str
+    :return: A dictionary from a override definition name to a override definition.
+    """
+    # TODO: gen override definition
+    return None
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py
new file mode 100644
index 00000000..4158f744
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_parallel_for.py
@@ -0,0 +1,67 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+from typing import Any, Dict, List, Union
+
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.parallel_for import ParallelFor
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
+
+
+def parallel_for(
+    *, body: BaseNode, items: Union[List, Dict, str, PipelineInput, NodeOutput], **kwargs: Any
+) -> ParallelFor:
+    """Build a parallel for loop by specifying the loop body and input items.
+
+    .. note::
+        The following example shows how to use parallel for API to create a pipeline with parallel for node.
+
+        .. code-block:: python
+
+            from azure.ai.ml.dsl import pipeline
+            from mldesigner.dsl import parallel_for
+
+
+            @pipeline
+            def your_loop_body(input):
+                pass
+
+
+            @pipeline
+            def pipeline_with_parallel_for_node():
+                # specify fixed inputs and leave loop config inputs unprovided
+                loop_body = your_loop_body()
+
+                # link loop body & loop config to loop node
+                loop_node = parallel_for(
+                    body=loop_body,
+                    items=[
+                        {"loop_body_input": 1},
+                        {"loop_body_input": 2},
+                    ],
+                )
+
+                # collect aggregated output from loop body
+                aggregate_node = collect_aggregated_outputs_from_for_each_node(
+                    input=loop_node.outputs.output,
+                )
+
+    :keyword body: Node to execute as the loop body.
+    :paramtype body: ~azure.ai.ml.entities._builders.BaseNode
+    :keyword items: The loop body's input which will bind to the loop node.
+    :paramtype items: Union[
+        list,
+        dict,
+        str,
+        ~azure.ai.ml.entities._job.pipeline._io.PipelineInput,
+        ~azure.ai.ml.entities._job.pipeline._io.NodeOutput]
+    :return: The parallel for loop
+    :rtype: ~azure.ai.ml.entities._builders.parallel_for.ParallelFor
+    """
+    parallel_for_node = ParallelFor(
+        body=body,  # type: ignore[arg-type]
+        items=items,
+        _from_component_func=True,
+        **kwargs,
+    )
+    return parallel_for_node
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py
new file mode 100644
index 00000000..44026174
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py
@@ -0,0 +1,615 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+import copy
+import inspect
+import logging
+import typing
+from collections import OrderedDict
+from inspect import Parameter, signature
+from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union
+
+from azure.ai.ml._utils._func_utils import get_outputs_and_locals
+from azure.ai.ml._utils.utils import is_valid_node_name, parse_args_description_from_docstring
+from azure.ai.ml.constants._component import ComponentSource, IOConstants
+from azure.ai.ml.constants._job.pipeline import COMPONENT_IO_KEYWORDS
+from azure.ai.ml.dsl._utils import _sanitize_python_variable_name
+from azure.ai.ml.entities import PipelineJob
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
+from azure.ai.ml.entities._component.component import Component
+from azure.ai.ml.entities._component.pipeline_component import PipelineComponent
+from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output, _get_param_with_standard_annotation
+from azure.ai.ml.entities._inputs_outputs.utils import _get_annotation_by_value, is_group
+from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+from azure.ai.ml.entities._job.pipeline._attr_dict import has_attr_safe
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, PipelineOutput, _GroupAttrDict
+from azure.ai.ml.entities._util import copy_output_setting
+
+# We need to limit the depth of pipeline to avoid the built graph goes too deep and prevent potential
+# stack overflow in dsl.pipeline.
+from azure.ai.ml.exceptions import UserErrorException
+
+_BUILDER_STACK_MAX_DEPTH = 100
+
+module_logger = logging.getLogger(__name__)
+
+
+class _PipelineComponentBuilderStack:
+    def __init__(self) -> None:
+        self.items: List["PipelineComponentBuilder"] = []
+
+    def top(self) -> Optional["PipelineComponentBuilder"]:
+        if self.is_empty():
+            return None
+        return self.items[-1]
+
+    def pop(self) -> Optional["PipelineComponentBuilder"]:
+        if self.is_empty():
+            return None
+        return self.items.pop()
+
+    def push(self, item: object) -> None:
+        error_msg = f"{self.__class__.__name__} only " f"allows pushing `{PipelineComponentBuilder.__name__}` element"
+        assert isinstance(item, PipelineComponentBuilder), error_msg
+
+        # TODO: validate cycle
+        self.items.append(item)
+        if self.size() >= _BUILDER_STACK_MAX_DEPTH:
+            current_pipeline = self.items[0].name
+            # clear current pipeline stack
+            self.items = []
+            msg = "Pipeline {} depth exceeds limitation. Max depth: {}"
+            raise UserErrorException(
+                message=msg.format(current_pipeline, _BUILDER_STACK_MAX_DEPTH),
+                no_personal_data_message=msg.format("[current_pipeline]", _BUILDER_STACK_MAX_DEPTH),
+            )
+
+    def is_empty(self) -> bool:
+        return len(self.items) == 0
+
+    def size(self) -> int:
+        return len(self.items)
+
+
+# This collection is used to record pipeline component builders in current call stack
+_definition_builder_stack = _PipelineComponentBuilderStack()
+
+
+def _is_inside_dsl_pipeline_func() -> bool:
+    """Checks whether executing within a dsl pipeline func
+
+    :return: True if is inside DSL pipeline func.
+    :rtype: bool
+    """
+    return _definition_builder_stack.size() > 0
+
+
+def _add_component_to_current_definition_builder(component: Union[BaseNode, AutoMLJob]) -> None:
+    if _is_inside_dsl_pipeline_func():
+        builder = _definition_builder_stack.top()
+        if builder is not None:
+            builder.add_node(component)
+
+
+class PipelineComponentBuilder:
+    # map from python built-in type to component type
+    # pylint: disable=too-many-instance-attributes
+    DEFAULT_DATA_TYPE_MAPPING = {
+        "float": "number",
+        "int": "integer",
+        "bool": "boolean",
+        "str": "string",
+    }
+    DEFAULT_OUTPUT_NAME = "output"
+
+    def __init__(
+        self,
+        func: Callable,
+        name: Optional[str] = None,
+        version: Optional[str] = None,
+        display_name: Optional[str] = None,
+        description: Optional[str] = None,
+        default_datastore: Any = None,
+        tags: Optional[Union[Dict[str, str], str]] = None,
+        source_path: Optional[str] = None,
+        non_pipeline_inputs: Optional[List] = None,
+    ):
+        self.func = func
+        name = name if name is not None else func.__name__
+        display_name = display_name if display_name else name
+        description = description if description else func.__doc__
+        self._args_description = parse_args_description_from_docstring(func.__doc__)
+        # List of nodes, order by it's creation order in pipeline.
+        self.nodes: List = []
+        self.non_pipeline_parameter_names = non_pipeline_inputs or []
+        # A dict of inputs name to InputDefinition.
+        # TODO: infer pipeline component input meta from assignment
+        self.inputs = self._build_inputs(func)
+        self.output_annotation = self._get_output_annotation(func)
+        self._name = name
+        self.version = version
+        self.display_name = display_name
+        self.description = description
+        self.default_datastore = default_datastore
+        self.tags = tags
+        self.source_path = source_path
+
+    @property
+    def name(self) -> str:
+        """Name of pipeline builder, it's name will be same as the pipeline definition it builds.
+
+        :return: Pipeline builder name
+        :rtype: str
+        """
+        return self._name
+
+    def add_node(self, node: Union[BaseNode, AutoMLJob]) -> None:
+        """Add node to pipeline builder.
+
+        :param node: A pipeline node.
+        :type node: Union[BaseNode, AutoMLJob]
+        """
+        self.nodes.append(node)
+
+    def build(
+        self,
+        *,
+        user_provided_kwargs: Optional[Dict] = None,
+        non_pipeline_inputs_dict: Optional[Dict] = None,
+        non_pipeline_inputs: Optional[List] = None,
+    ) -> PipelineComponent:
+        """Build a pipeline component from current pipeline builder.
+
+        :keyword user_provided_kwargs: The kwargs user provided to dsl pipeline function. None if not provided.
+        :keyword non_pipeline_inputs_dict: The non-pipeline input provided key-value. None if not exist.
+        :keyword non_pipeline_inputs: List of non-pipeline input name. None if not exist.
+        :return: The built PipelineComponent
+        :rtype: PipelineComponent
+        """
+        if user_provided_kwargs is None:
+            user_provided_kwargs = {}
+        # Clear nodes as we may call build multiple times.
+        self.nodes = []
+
+        kwargs = _build_pipeline_parameter(
+            func=self.func,
+            user_provided_kwargs=user_provided_kwargs,
+            # TODO: support result() for pipeline input inside parameter group
+            group_default_kwargs=self._get_group_parameter_defaults(),
+            non_pipeline_inputs=non_pipeline_inputs,
+        )
+        kwargs.update(non_pipeline_inputs_dict or {})
+
+        # Use a dict to store all variables in self.func
+        # We use this stack to store the dsl pipeline definition hierarchy
+        _definition_builder_stack.push(self)
+
+        try:
+            outputs, _locals = get_outputs_and_locals(self.func, kwargs)
+        finally:
+            _definition_builder_stack.pop()
+
+        if outputs is None:
+            outputs = {}
+
+        jobs: Dict = self._update_nodes_variable_names(_locals)
+        pipeline_component = PipelineComponent(
+            name=self.name,
+            version=self.version,
+            display_name=self.display_name,
+            description=self.description,
+            inputs=self.inputs,
+            jobs=jobs,
+            tags=self.tags,  # type: ignore[arg-type]
+            source_path=self.source_path,
+            _source=ComponentSource.DSL,
+        )
+        # TODO: Refine here. The output can not be built first then pass into pipeline component creation,
+        # exception will be raised in component.build_validate_io().
+        pipeline_component._outputs = self._build_pipeline_outputs(outputs)
+        return pipeline_component
+
+    def _validate_group_annotation(self, name: str, val: GroupInput) -> None:
+        for k, v in val.values.items():
+            if isinstance(v, GroupInput):
+                self._validate_group_annotation(k, v)
+            elif isinstance(v, Output):
+                # TODO(2097468): automatically change it to Input when used in input annotation
+                raise UserErrorException("Output annotation cannot be used in @pipeline.")
+            elif isinstance(v, Input):
+                if v.type not in IOConstants.PRIMITIVE_STR_2_TYPE:
+                    # TODO(2097478): support port type groups
+                    raise UserErrorException(f"Only primitive types can be used as input of group, got {v.type}")
+            else:
+                raise UserErrorException(f"Unsupported annotation type {type(v)} for group field {name}.{k}")
+
+    def _build_inputs(self, func: Union[Callable, Type]) -> Dict:
+        inputs: Dict = _get_param_with_standard_annotation(
+            func, is_func=True, skip_params=self.non_pipeline_parameter_names
+        )
+        for k, v in inputs.items():
+            if isinstance(v, GroupInput):
+                self._validate_group_annotation(name=k, val=v)
+            # add arg description
+            if k in self._args_description:
+                v["description"] = self._args_description[k]
+        return inputs
+
+    def _build_pipeline_outputs(self, outputs: typing.Dict[str, NodeOutput]) -> Dict[str, PipelineOutput]:
+        """Validate if dsl.pipeline returns valid outputs and set output binding. Create PipelineOutput as pipeline's
+        output definition based on node outputs from return.
+
+        :param outputs: Outputs of pipeline
+        :type outputs: Mapping[str, azure.ai.ml.Output]
+        :return: The mapping of output names to PipelineOutput
+        :rtype: Dict[str, PipelineOutput]
+        """
+        error_msg = (
+            "The return type of dsl.pipeline decorated function should be a mapping from output name to "
+            "azure.ai.ml.Output with owner."
+        )
+        if is_group(outputs):
+            outputs = {key: val for key, val in outputs.__dict__.items() if val}
+        if not isinstance(outputs, dict):
+            raise UserErrorException(message=error_msg, no_personal_data_message=error_msg)
+        output_dict = {}
+        output_meta_dict = {}
+        for key, value in outputs.items():
+            if not isinstance(key, str) or not isinstance(value, NodeOutput) or value._owner is None:
+                raise UserErrorException(message=error_msg, no_personal_data_message=error_msg)
+            if value._meta is not None:
+                meta = value._meta
+            else:
+                meta = Output(type=value.type, path=value.path, mode=value.mode, description=value.description)
+
+            # Hack: map internal output type to pipeline output type
+            def _map_internal_output_type(_meta: Output) -> str:
+                """Map component output type to valid pipeline output type.
+
+                :param _meta: The output
+                :type _meta: Output
+                :return: Output type
+                :rtype: str
+                """
+                if type(_meta).__name__ != "InternalOutput":
+                    return str(_meta.type)
+                return str(_meta.map_pipeline_output_type())  # type: ignore[attr-defined]
+
+            # Note: Here we set PipelineOutput as Pipeline's output definition as we need output binding.
+            output_meta = Output(
+                type=_map_internal_output_type(meta),  # type: ignore[arg-type]
+                description=meta.description,
+                mode=meta.mode,
+            )
+            pipeline_output = PipelineOutput(
+                port_name=key,
+                data=None,
+                # meta is used to create pipeline component, store it here to make sure pipeline component and inner
+                # node output type are consistent
+                meta=output_meta,
+                owner="pipeline",
+                description=self._args_description.get(key, None),
+                # store original node output to be able to trace back to inner node from a pipeline output builder.
+                binding_output=value,
+            )
+            # copy node level output setting to pipeline output
+            copy_output_setting(
+                source=value._owner.outputs[value._port_name], target=pipeline_output  # type: ignore[arg-type]
+            )
+
+            value._owner.outputs[value._port_name]._data = pipeline_output  # type: ignore[union-attr]
+
+            output_dict[key] = pipeline_output
+            output_meta_dict[key] = output_meta._to_dict()
+
+        self._validate_inferred_outputs(output_meta_dict, output_dict)
+        return output_dict
+
+    def _get_group_parameter_defaults(self) -> Dict:
+        group_defaults = {}
+        for key, val in self.inputs.items():
+            if not isinstance(val, GroupInput):
+                continue
+            # Copy and insert top-level parameter name into group names for all items
+            group_defaults[key] = copy.deepcopy(val.default)
+            group_defaults[key].insert_group_name_for_items(key)
+        return group_defaults
+
+    def _update_nodes_variable_names(self, func_variables: dict) -> Dict[str, Union[BaseNode, AutoMLJob]]:
+        """Update nodes list to ordered dict with variable name key and component object value.
+
+        Variable naming priority:
+             1. Specified by using xxx.name.
+                 e.g.
+                 module1 = module_func()
+                 module1.name = "node1"     # final node name is "node1"
+
+             2. Variable name
+                 e.g.
+                 my_node = module_func()     # final node name is "my_node"
+
+             3. Anonymous node, but another node with same component.name has user-defined name
+                 e.g.
+                 my_node = module_func()     # final node name is "my_node"
+                 module_fun()                # final node name is "my_node_1"
+                 module_fun()                # final node name is "my_node_2"
+
+             4. Anonymous node
+                 e.g.
+                 my_node = module_func()     # final node name is "my_node"
+                 module_func_1()             # final node name is its component name
+
+        :param func_variables: The function variables
+        :type func_variables: dict
+        :return: Map of variable name to component object
+        :rtype: Dict[str, Union[BaseNode, AutoMLJob]]
+        """
+
+        def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]) -> Optional[Union[str, Component]]:
+            # TODO(1979547): refactor this
+            if isinstance(node, AutoMLJob):
+                return node.name or _sanitize_python_variable_name(node.__class__.__name__)
+            if isinstance(node, ControlFlowNode):
+                return _sanitize_python_variable_name(node.__class__.__name__)
+            return node.name or node._get_component_name()
+
+        valid_component_ids = set(item._instance_id for item in self.nodes)
+        id_name_dict = {}
+        name_count_dict = {}
+        compname_udfname_dict = {}
+        local_names = set()
+        result = OrderedDict()
+
+        for k, v in func_variables.items():
+            # TODO(1979547): refactor this
+            if not isinstance(v, (BaseNode, AutoMLJob, PipelineJob, ControlFlowNode)):
+                continue
+            instance_id = getattr(v, "_instance_id", None)
+            if instance_id not in valid_component_ids:
+                continue
+            name = getattr(v, "name", None) or k
+            # for node name _, treat it as anonymous node with name unset
+            if name == "_":
+                continue
+
+            # User defined name must be valid python identifier
+            if not is_valid_node_name(name):
+                raise UserErrorException(
+                    f"Invalid node name found: {name!r}. Node name must start with a lower letter or underscore, "
+                    "and can only contain lower letters, numbers and underscore."
+                )
+
+            # Raise error when setting a name that already exists, likely conflict with a variable name
+            if name in local_names and instance_id not in id_name_dict:
+                raise UserErrorException(
+                    f"Duplicate node name found in pipeline: {self.name!r}, "
+                    f"node name: {name!r}. Duplicate check is case-insensitive."
+                )
+            local_names.add(name)
+            id_name_dict[v._instance_id] = name  # type: ignore[union-attr]
+            name_count_dict[name] = 1
+
+        # Find the last user-defined name for the same type of components
+        for node in self.nodes:
+            _id = node._instance_id
+            if _id in id_name_dict:
+                compname_udfname_dict[_get_name_or_component_name(node)] = id_name_dict[_id]
+
+        # Refine and fill default name
+        # If component name is same, append '_{count}' suffix
+        for node in self.nodes:
+            _id = node._instance_id
+            if _id not in id_name_dict:
+                target_name = _get_name_or_component_name(node)
+                if node.name is None and target_name in compname_udfname_dict:
+                    target_name = compname_udfname_dict[target_name]
+                if target_name not in name_count_dict:
+                    name_count_dict[target_name] = 0
+                name_count_dict[target_name] += 1
+                suffix = "" if name_count_dict[target_name] == 1 else f"_{name_count_dict[target_name] - 1}"
+                id_name_dict[_id] = f"{_sanitize_python_variable_name(str(target_name))}{suffix}"
+            final_name = id_name_dict[_id]
+            node.name = final_name
+            result[final_name] = node
+
+            # Validate IO name of node with correct node name, and log warning if there is keyword.
+            self._validate_keyword_in_node_io(node)
+        return result
+
+    def _update_inputs(self, pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]) -> None:
+        """Update the pipeline inputs by the dict.
+
+        :param pipeline_inputs: The pipeline inputs
+        :type pipeline_inputs: Dict[str, Union[PipelineInput, Input, NodeOutput, Any]]
+        """
+        for input_name, value in pipeline_inputs.items():
+            anno: Any = None
+            if input_name not in self.inputs:
+                if isinstance(value, PipelineInput):
+                    value = value._data
+                if isinstance(value, Input):
+                    anno = copy.copy(value)
+                elif isinstance(value, NodeOutput):
+                    anno = Input(type=value.type)
+                else:
+                    anno = _get_annotation_by_value(value)
+                anno.name = input_name
+                anno.description = self._args_description.get(input_name)
+                self.inputs[input_name] = anno
+
+    @classmethod
+    def _get_output_annotation(cls, func: Callable) -> Dict[str, Dict]:
+        """Get the output annotation of the function, validate & refine it.
+
+        :param func: The function to retrieve output annotations from
+        :type func: Callable
+        :return: A dict of output annotations
+        :rtype: Dict[str, Dict]
+        """
+        return_annotation = inspect.signature(func).return_annotation
+
+        if is_group(return_annotation):
+            outputs = _get_param_with_standard_annotation(return_annotation, is_func=False)
+        elif isinstance(return_annotation, Output):
+            outputs = {cls.DEFAULT_OUTPUT_NAME: return_annotation}
+        else:
+            # skip if return annotation is not group or output
+            return {}
+
+        output_annotations = {}
+        for key, val in outputs.items():
+            if isinstance(val, GroupInput):
+                raise UserErrorException(message="Nested group annotation is not supported in pipeline output.")
+            # normalize annotation since currently annotation in @group will be converted to Input
+            if isinstance(val, Input):
+                val = Output(type=val.type)
+            if not isinstance(val, Output):
+                raise UserErrorException(
+                    message="Invalid output annotation. "
+                    f"Only Output annotation in return annotation is supported. Got {type(val)}."
+                )
+            output_annotations[key] = val._to_dict()
+        return output_annotations
+
+    def _validate_inferred_outputs(self, output_meta_dict: dict, output_dict: Dict[str, PipelineOutput]) -> None:
+        """Validate inferred output dict against annotation.
+
+        :param output_meta_dict: The output meta dict
+        :type output_meta_dict: dict
+        :param output_dict: The output dict
+        :type output_dict: Dict[str, PipelineOutput]
+        """
+        if not self.output_annotation:
+            return
+        error_prefix = "Unmatched outputs between actual pipeline output and output in annotation"
+        if output_meta_dict.keys() != self.output_annotation.keys():
+            raise UserErrorException(
+                "{}: actual pipeline component outputs: {}, annotation outputs: {}".format(
+                    error_prefix, output_meta_dict.keys(), self.output_annotation.keys()
+                )
+            )
+
+        unmatched_outputs = []
+        for key, actual_output in output_meta_dict.items():
+            expected_output = self.output_annotation[key]
+            actual_output.pop("description", None)
+            expected_description = expected_output.pop("description", None)
+            # skip comparing mode since when component's from remote, output mode is not available
+            actual_output.pop("mode", None)
+            expected_mode = expected_output.pop("mode", None)
+            if expected_output != actual_output:
+                unmatched_outputs.append(
+                    f"{key}: pipeline component output: {actual_output} != annotation output {expected_output}"
+                )
+            res = output_dict[key]._meta
+            if expected_description:
+                if res is not None:
+                    res.description = expected_description
+                # also copy the description to pipeline job
+                output_dict[key].description = expected_description
+            if expected_mode:
+                if res is not None:
+                    res.mode = expected_mode
+                # also copy the mode to pipeline job
+                output_dict[key].mode = expected_mode
+
+        if unmatched_outputs:
+            raise UserErrorException(f"{error_prefix}: {unmatched_outputs}")
+
+    @staticmethod
+    def _validate_keyword_in_node_io(node: Union[BaseNode, AutoMLJob]) -> None:
+        if has_attr_safe(node, "inputs"):
+            for input_name in set(node.inputs) & COMPONENT_IO_KEYWORDS:  # type: ignore[arg-type]
+                module_logger.warning(
+                    'Reserved word "%s" is used as input name in node "%s", '
+                    "can only be accessed with '%s.inputs[\"%s\"]'",
+                    input_name,
+                    node.name,
+                    node.name,
+                    input_name,
+                )
+        if has_attr_safe(node, "outputs"):
+            for output_name in set(node.outputs) & COMPONENT_IO_KEYWORDS:  # type: ignore[arg-type]
+                module_logger.warning(
+                    'Reserved word "%s" is used as output name in node "%s", '
+                    "can only be accessed with '%s.outputs[\"%s\"]'",
+                    output_name,
+                    node.name,
+                    node.name,
+                    output_name,
+                )
+
+
+def _build_pipeline_parameter(
+    func: Optional[Callable],
+    *,
+    user_provided_kwargs: Dict,
+    group_default_kwargs: Optional[Dict] = None,
+    non_pipeline_inputs: Optional[List] = None,
+) -> Dict:
+    # Pass group defaults into kwargs to support group.item can be used even if no default on function.
+    # example:
+    # @group
+    # class Group:
+    #   key = 'val'
+    #
+    # @pipeline
+    # def pipeline_func(param: Group):
+    #   component_func(input=param.key)  <--- param.key should be val.
+
+    # transform kwargs
+    transformed_kwargs, non_pipeline_inputs = {}, non_pipeline_inputs or []
+    if group_default_kwargs:
+        transformed_kwargs.update(
+            {
+                key: _wrap_pipeline_parameter(key, default_value=value, actual_value=value)
+                for key, value in group_default_kwargs.items()
+                if key not in non_pipeline_inputs
+            }
+        )
+
+    def all_params(parameters: Any) -> Generator:
+        yield from parameters.values()
+
+    if func is None:
+        return transformed_kwargs
+
+    parameters = all_params(signature(func).parameters)
+    # transform default values
+    for left_args in parameters:
+        if (
+            left_args.name not in transformed_kwargs
+            and left_args.kind != Parameter.VAR_KEYWORD
+            and left_args.name not in non_pipeline_inputs
+        ):
+            default_value = left_args.default if left_args.default is not Parameter.empty else None
+            actual_value = user_provided_kwargs.get(left_args.name)
+            transformed_kwargs[left_args.name] = _wrap_pipeline_parameter(
+                key=left_args.name, default_value=default_value, actual_value=actual_value
+            )
+    # Add variable kwargs to transformed_kwargs.
+    for key, value in user_provided_kwargs.items():
+        if key not in transformed_kwargs:
+            transformed_kwargs[key] = _wrap_pipeline_parameter(key=key, default_value=None, actual_value=value)
+    return transformed_kwargs
+
+
+def _wrap_pipeline_parameter(
+    key: str, default_value: Any, actual_value: Any, group_names: Optional[List[str]] = None
+) -> Union[_GroupAttrDict, PipelineInput]:
+    # Append parameter path in group
+    group_names = [*group_names] if group_names else []
+    if isinstance(default_value, _GroupAttrDict):
+        group_names.append(key)
+        return _GroupAttrDict(
+            {
+                k: _wrap_pipeline_parameter(k, default_value=v, actual_value=v, group_names=group_names)
+                for k, v in default_value.items()
+            }
+        )
+    # Note: this PipelineInput object is built to mark input as a data binding.
+    # It only exists in dsl.pipeline function execution time and won't store in pipeline job or pipeline component.
+    return PipelineInput(name=key, meta=None, default_data=default_value, data=actual_value, group_names=group_names)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py
new file mode 100644
index 00000000..21b5b6e3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py
@@ -0,0 +1,327 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import inspect
+import logging
+from collections import OrderedDict
+from functools import wraps
+from inspect import Parameter, signature
+from pathlib import Path
+from typing import Any, Callable, Dict, List, Optional, TypeVar, Union, overload
+
+from typing_extensions import ParamSpec
+
+from azure.ai.ml._utils.utils import is_private_preview_enabled
+from azure.ai.ml.entities import Data, Model, PipelineJob, PipelineJobSettings
+from azure.ai.ml.entities._builders.pipeline import Pipeline
+from azure.ai.ml.entities._inputs_outputs import Input, is_group
+from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, _GroupAttrDict
+from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpression
+from azure.ai.ml.exceptions import (
+    MultipleValueError,
+    ParamValueNotExistsError,
+    TooManyPositionalArgsError,
+    UnexpectedKeywordError,
+    UnsupportedParameterKindError,
+    UserErrorException,
+)
+
+from ..entities._builders import BaseNode
+from ._pipeline_component_builder import PipelineComponentBuilder, _is_inside_dsl_pipeline_func
+from ._settings import _dsl_settings_stack
+from ._utils import _resolve_source_file
+
+SUPPORTED_INPUT_TYPES = (
+    PipelineInput,
+    NodeOutput,
+    Input,
+    Model,
+    Data,  # For the case use a Data object as an input, we will convert it to Input object
+    Pipeline,  # For the case use a pipeline node as the input, we use its only one output as the real input.
+    str,
+    bool,
+    int,
+    float,
+    PipelineExpression,
+    _GroupAttrDict,
+)
+module_logger = logging.getLogger(__name__)
+
+T = TypeVar("T")
+P = ParamSpec("P")
+
+
+# Overload the returns a decorator when func is None
+@overload
+def pipeline(
+    func: None,
+    *,
+    name: Optional[str] = None,
+    version: Optional[str] = None,
+    display_name: Optional[str] = None,
+    description: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    tags: Optional[Dict[str, str]] = None,
+    **kwargs: Any,
+) -> Callable[[Callable[P, T]], Callable[P, PipelineJob]]: ...
+
+
+# Overload the returns a decorated function when func isn't None
+@overload
+def pipeline(
+    func: Callable[P, T],
+    *,
+    name: Optional[str] = None,
+    version: Optional[str] = None,
+    display_name: Optional[str] = None,
+    description: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    tags: Optional[Dict[str, str]] = None,
+    **kwargs: Any,
+) -> Callable[P, PipelineJob]: ...
+
+
+def pipeline(
+    func: Optional[Callable[P, T]] = None,
+    *,
+    name: Optional[str] = None,
+    version: Optional[str] = None,
+    display_name: Optional[str] = None,
+    description: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    tags: Optional[Union[Dict[str, str], str]] = None,
+    **kwargs: Any,
+) -> Union[Callable[[Callable[P, T]], Callable[P, PipelineJob]], Callable[P, PipelineJob]]:
+    """Build a pipeline which contains all component nodes defined in this function.
+
+    :param func: The user pipeline function to be decorated.
+    :type func: types.FunctionType
+    :keyword name: The name of pipeline component, defaults to function name.
+    :paramtype name: str
+    :keyword version: The version of pipeline component, defaults to "1".
+    :paramtype version: str
+    :keyword display_name: The display name of pipeline component, defaults to function name.
+    :paramtype display_name: str
+    :keyword description: The description of the built pipeline.
+    :paramtype description: str
+    :keyword experiment_name: Name of the experiment the job will be created under, \
+        if None is provided, experiment will be set to current directory.
+    :paramtype experiment_name: str
+    :keyword tags: The tags of pipeline component.
+    :paramtype tags: dict[str, str]
+    :return: Either
+      * A decorator, if `func` is None
+      * The decorated `func`
+
+    :rtype: Union[
+        Callable[[Callable], Callable[..., ~azure.ai.ml.entities.PipelineJob]],
+        Callable[P, ~azure.ai.ml.entities.PipelineJob]
+
+      ]
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_pipeline_job_configurations.py
+            :start-after: [START configure_pipeline]
+            :end-before: [END configure_pipeline]
+            :language: python
+            :dedent: 8
+            :caption: Shows how to create a pipeline using this decorator.
+    """
+
+    def pipeline_decorator(func: Callable[P, T]) -> Callable:
+        if not isinstance(func, Callable):  # type: ignore
+            raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.")
+
+        non_pipeline_inputs = kwargs.get("non_pipeline_inputs", []) or kwargs.get("non_pipeline_parameters", [])
+        # compute variable names changed from default_compute_targe -> compute -> default_compute -> none
+        # to support legacy usage, we support them with priority.
+        compute = kwargs.get("compute", None)
+        default_compute_target = kwargs.get("default_compute_target", None)
+        default_compute_target = kwargs.get("default_compute", None) or default_compute_target
+        continue_on_step_failure = kwargs.get("continue_on_step_failure", None)
+        on_init = kwargs.get("on_init", None)
+        on_finalize = kwargs.get("on_finalize", None)
+
+        default_datastore = kwargs.get("default_datastore", None)
+        force_rerun = kwargs.get("force_rerun", None)
+        job_settings = {
+            "default_datastore": default_datastore,
+            "continue_on_step_failure": continue_on_step_failure,
+            "force_rerun": force_rerun,
+            "default_compute": default_compute_target,
+            "on_init": on_init,
+            "on_finalize": on_finalize,
+        }
+        func_entry_path = _resolve_source_file()
+        if not func_entry_path:
+            func_path = Path(inspect.getfile(func))
+            # in notebook, func_path may be a fake path and will raise error when trying to resolve this fake path
+            if func_path.exists():
+                func_entry_path = func_path.resolve().absolute()
+
+        job_settings = {k: v for k, v in job_settings.items() if v is not None}
+        pipeline_builder = PipelineComponentBuilder(
+            func=func,
+            name=name,
+            version=version,
+            display_name=display_name,
+            description=description,
+            default_datastore=default_datastore,
+            tags=tags,
+            source_path=str(func_entry_path),
+            non_pipeline_inputs=non_pipeline_inputs,
+        )
+
+        @wraps(func)
+        def wrapper(*args: P.args, **kwargs: P.kwargs) -> Union[Pipeline, PipelineJob]:
+            # Default args will be added here.
+            # Node: push/pop stack here instead of put it inside build()
+            # Because we only want to enable dsl settings on top level pipeline
+            _dsl_settings_stack.push()  # use this stack to track on_init/on_finalize settings
+            try:
+                # Convert args to kwargs
+                provided_positional_kwargs = _validate_args(func, args, kwargs, non_pipeline_inputs)
+
+                # When pipeline supports variable params, update pipeline component to support the inputs in **kwargs.
+                pipeline_parameters = {
+                    k: v for k, v in provided_positional_kwargs.items() if k not in non_pipeline_inputs
+                }
+                pipeline_builder._update_inputs(pipeline_parameters)
+
+                non_pipeline_params_dict = {
+                    k: v for k, v in provided_positional_kwargs.items() if k in non_pipeline_inputs
+                }
+
+                # TODO: cache built pipeline component
+                pipeline_component = pipeline_builder.build(
+                    user_provided_kwargs=provided_positional_kwargs,
+                    non_pipeline_inputs_dict=non_pipeline_params_dict,
+                    non_pipeline_inputs=non_pipeline_inputs,
+                )
+            finally:
+                # use `finally` to ensure pop operation from the stack
+                dsl_settings = _dsl_settings_stack.pop()
+
+            # update on_init/on_finalize settings if init/finalize job is set
+            if dsl_settings.init_job_set:
+                job_settings["on_init"] = dsl_settings.init_job_name(pipeline_component.jobs)
+            if dsl_settings.finalize_job_set:
+                job_settings["on_finalize"] = dsl_settings.finalize_job_name(pipeline_component.jobs)
+
+            # TODO: pass compute & default_compute separately?
+            common_init_args: Any = {
+                "experiment_name": experiment_name,
+                "component": pipeline_component,
+                "inputs": pipeline_parameters,
+                "tags": tags,
+            }
+            built_pipeline: Any = None
+            if _is_inside_dsl_pipeline_func():
+                # on_init/on_finalize is not supported for pipeline component
+                if job_settings.get("on_init") is not None or job_settings.get("on_finalize") is not None:
+                    raise UserErrorException("On_init/on_finalize is not supported for pipeline component.")
+                # Build pipeline node instead of pipeline job if inside dsl.
+                built_pipeline = Pipeline(_from_component_func=True, **common_init_args)
+                if job_settings:
+                    module_logger.warning(
+                        ("Job settings %s on pipeline function %r are ignored when using inside PipelineJob."),
+                        job_settings,
+                        func.__name__,
+                    )
+            else:
+                built_pipeline = PipelineJob(
+                    jobs=pipeline_component.jobs,
+                    compute=compute,
+                    settings=PipelineJobSettings(**job_settings),
+                    **common_init_args,
+                )
+
+            return built_pipeline
+
+        # Bug Item number: 2883169
+        wrapper._is_dsl_func = True  # type: ignore
+        wrapper._job_settings = job_settings  # type: ignore
+        wrapper._pipeline_builder = pipeline_builder  # type: ignore
+        return wrapper
+
+    # enable use decorator without "()" if all arguments are default values
+    if func is not None:
+        return pipeline_decorator(func)
+    return pipeline_decorator
+
+
+# pylint: disable-next=docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+def _validate_args(func: Callable, args: Any, kwargs: Dict, non_pipeline_inputs: List) -> OrderedDict:
+    """Validate customer function args and convert them to kwargs."""
+    if not isinstance(non_pipeline_inputs, List) or any(not isinstance(param, str) for param in non_pipeline_inputs):
+        msg = "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string"
+        raise UserErrorException(message=msg, no_personal_data_message=msg)
+    # Positional arguments validate
+    is_support_variable_params = is_private_preview_enabled()
+    all_parameters = signature(func).parameters
+    # Implicit parameter are *args
+    var_positional_arg = next(filter(lambda param: param.kind == param.VAR_POSITIONAL, all_parameters.values()), None)
+    if var_positional_arg:
+        raise UnsupportedParameterKindError(func.__name__, parameter_kind=f"*{var_positional_arg.name}")
+
+    non_pipeline_inputs = non_pipeline_inputs or []
+    unexpected_non_pipeline_inputs = [param for param in non_pipeline_inputs if param not in all_parameters]
+    if unexpected_non_pipeline_inputs:
+        raise ParamValueNotExistsError(func.__name__, unexpected_non_pipeline_inputs)
+
+    named_parameters = [
+        param for param in all_parameters.values() if param.kind not in [param.VAR_KEYWORD, param.VAR_POSITIONAL]
+    ]
+    empty_parameters = {param.name: param for param in named_parameters if param.default is Parameter.empty}
+    # Implicit parameter are *args and **kwargs
+    if not is_support_variable_params:
+        if len(all_parameters.values()) != len(named_parameters):
+            raise UnsupportedParameterKindError(func.__name__)
+
+        min_num = len(empty_parameters)
+        max_num = len(named_parameters)
+        if len(args) > max_num:
+            raise TooManyPositionalArgsError(func.__name__, min_num, max_num, len(args))
+
+    func_args, provided_args = list(args), OrderedDict({})
+    provided_args = OrderedDict({param.name: func_args.pop(0) for param in named_parameters if len(func_args) > 0})
+    for _k, _v in kwargs.items():
+        if not is_support_variable_params and _k not in all_parameters:
+            raise UnexpectedKeywordError(func.__name__, _k, all_parameters.keys())
+        if _k in provided_args.keys():
+            raise MultipleValueError(func.__name__, _k)
+        provided_args[_k] = _v
+
+    def _is_supported_data_type(_data: object) -> bool:
+        return isinstance(_data, SUPPORTED_INPUT_TYPES) or is_group(_data)
+
+    for pipeline_input_name in provided_args:
+        data = provided_args[pipeline_input_name]
+        # for input_key, input_value in kwargs.items():
+        if isinstance(data, BaseNode):
+            if len(data.outputs) != 1:
+                raise ValueError(
+                    "Provided input {} is not a single output node, cannot be used as a node input.".format(
+                        pipeline_input_name
+                    )
+                )
+            data = next(iter(data.outputs.values()))
+            provided_args[pipeline_input_name] = data
+
+        if data is not None and not _is_supported_data_type(data) and pipeline_input_name not in non_pipeline_inputs:
+            msg = (
+                "Pipeline input expected an azure.ai.ml.Input or primitive types (str, bool, int or float), "
+                "but got type {}."
+            )
+            raise UserErrorException(
+                message=msg.format(type(data)),
+                no_personal_data_message=msg.format("[type(pipeline_input_name)]"),
+            )
+
+    # Note: unprovided required inputs won't cause exception when calling pipeline func
+    #       the exception will be raised in pipeline's customized validate.
+    return provided_args
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py
new file mode 100644
index 00000000..784aaece
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_settings.py
@@ -0,0 +1,128 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import logging
+from collections import deque
+from typing import Any, Deque, Dict, Optional, Union
+
+from azure.ai.ml.entities._builders import BaseNode
+from azure.ai.ml.exceptions import UserErrorException
+
+module_logger = logging.getLogger(__name__)
+
+
+class _DSLSettingsStack:
+    def __init__(self) -> None:
+        self._stack: Deque["_DSLSettings"] = deque()
+
+    def push(self) -> None:
+        self._stack.append(_DSLSettings())
+
+    def pop(self) -> "_DSLSettings":
+        self._check_stack()
+        return self._stack.pop()
+
+    def top(self) -> "_DSLSettings":
+        self._check_stack()
+        return self._stack[-1]
+
+    def _check_stack(self) -> None:
+        # as there is a separate stack push & pop operation management, if stack is empty,
+        # it should be the scenario that user call `set_pipeline_settings` out of `pipeline` decorator,
+        # then directly raise user error.
+        if len(self._stack) == 0:
+            error_message = "Please call `set_pipeline_settings` inside a `pipeline` decorated function."
+            raise UserErrorException(
+                message=error_message,
+                no_personal_data_message=error_message,
+            )
+
+
+_dsl_settings_stack = _DSLSettingsStack()
+
+
+class _DSLSettings:
+    """Initialization & finalization job settings for DSL pipeline job.
+
+    Store settings from `dsl.set_pipeline_settings` during pipeline definition.
+    """
+
+    def __init__(self) -> None:
+        self._init_job: Any = None
+        self._finalize_job: Any = None
+
+    @property
+    def init_job(self) -> Union[BaseNode, str]:
+        return self._init_job
+
+    @init_job.setter
+    def init_job(self, value: Optional[Union[BaseNode, str]]) -> None:
+        # pylint: disable=logging-fstring-interpolation
+        if isinstance(value, (BaseNode, str)):
+            self._init_job = value
+        else:
+            module_logger.warning(f"Initialization job setting is ignored as input parameter type is {type(value)!r}.")
+
+    @property
+    def init_job_set(self) -> bool:
+        # note: need to use `BaseNode is not None` as `bool(BaseNode)` will return False
+        return self._init_job is not None
+
+    def init_job_name(self, jobs: Dict[str, BaseNode]) -> Optional[str]:
+        if isinstance(self._init_job, str):
+            return self._init_job
+        for name, job in jobs.items():
+            if id(self.init_job) == id(job):
+                return name
+        module_logger.warning("Initialization job setting is ignored as cannot find corresponding job node.")
+        return None
+
+    @property
+    def finalize_job(self) -> Union[BaseNode, str]:
+        return self._finalize_job
+
+    @finalize_job.setter
+    def finalize_job(self, value: Optional[Union[BaseNode, str]]) -> None:
+        # pylint: disable=logging-fstring-interpolation
+        if isinstance(value, (BaseNode, str)):
+            self._finalize_job = value
+        else:
+            module_logger.warning(f"Finalization job setting is ignored as input parameter type is {type(value)!r}.")
+
+    @property
+    def finalize_job_set(self) -> bool:
+        # note: need to use `BaseNode is not None` as `bool(BaseNode)` will return False
+        return self._finalize_job is not None
+
+    def finalize_job_name(self, jobs: Dict[str, BaseNode]) -> Optional[str]:
+        if isinstance(self._finalize_job, str):
+            return self._finalize_job
+        for name, job in jobs.items():
+            if id(self.finalize_job) == id(job):
+                return name
+        module_logger.warning("Finalization job setting is ignored as cannot find corresponding job node.")
+        return None
+
+
+def set_pipeline_settings(
+    *,
+    on_init: Optional[Union[BaseNode, str]] = None,
+    on_finalize: Optional[Union[BaseNode, str]] = None,
+) -> None:
+    """Set pipeline settings for current `dsl.pipeline` definition.
+
+    This function should be called inside a `dsl.pipeline` decorated function, otherwise will raise exception.
+
+    :keyword on_init: On_init job node or name. On init job will be executed before all other jobs, \
+                it should not have data connections to regular nodes.
+    :paramtype on_init: Union[BaseNode, str]
+    :keyword on_finalize: On_finalize job node or name. On finalize job will be executed after pipeline run \
+                finishes (completed/failed/canceled), it should not have data connections to regular nodes.
+    :paramtype on_finalize: Union[BaseNode, str]
+    :return:
+    """
+    dsl_settings = _dsl_settings_stack.top()
+    if on_init is not None:
+        dsl_settings.init_job = on_init
+    if on_finalize is not None:
+        dsl_settings.finalize_job = on_finalize
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py
new file mode 100644
index 00000000..6faf669e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/dsl/_utils.py
@@ -0,0 +1,222 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+import contextlib
+import importlib
+import inspect
+import os
+import re
+import sys
+import types
+from pathlib import Path
+from typing import Generator, Optional, Union
+
+from azure.ai.ml.dsl._constants import VALID_NAME_CHARS
+from azure.ai.ml.exceptions import ComponentException, ErrorCategory, ErrorTarget
+
+
+def _normalize_identifier_name(name: str) -> str:
+    normalized_name = name.lower()
+    normalized_name = re.sub(r"[\W_]", " ", normalized_name)  # No non-word characters
+    normalized_name = re.sub(" +", " ", normalized_name).strip()  # No double spaces, leading or trailing spaces
+    if re.match(r"\d", normalized_name):
+        normalized_name = "n" + normalized_name  # No leading digits
+    return normalized_name
+
+
+def _sanitize_python_variable_name(name: str) -> str:
+    return _normalize_identifier_name(name).replace(" ", "_")
+
+
+def is_valid_name(name: str) -> bool:
+    """Indicate whether the name is a valid component name.
+
+    :param name: The component name
+    :type name: str
+    :return: True if name is a valid name for a component, False otherwise
+    :rtype: bool
+    """
+    return all(c in VALID_NAME_CHARS for c in name)
+
+
+def _resolve_source_directory() -> Optional[Union[str, Path]]:
+    """Resolve source directory as last customer frame's module file dir position.
+
+    :return: The directory path of the last customer owner frame in the callstack
+    :rtype: Optional[Union[str, Path]]
+    """
+    source_file = _resolve_source_file()
+    # Fall back to current working directory if not found
+    return os.getcwd() if not source_file else Path(os.path.dirname(source_file)).absolute()
+
+
+def _resolve_source_file() -> Optional[Path]:
+    """Resolve source file as last customer frame's module file position.
+
+
+    :return: The filepath of the last customer owner frame in the callstack
+    :rtype: Optional[Path]
+    """
+    try:
+        frame_list = inspect.stack()
+        # We find the last frame which is in SDK code instead of customer code or dependencies code
+        # by checking whether the package name of the frame belongs to azure.ai.ml.component.
+        pattern = r"(^azure\.ai\.ml(?=\..*|$).*)"
+        for frame, last_frame in zip(frame_list, frame_list[1:]):
+            if _assert_frame_package_name(pattern, frame.frame) and not _assert_frame_package_name(
+                pattern, last_frame.frame
+            ):
+                module = inspect.getmodule(last_frame.frame)
+                return Path(str(module.__file__)).absolute() if module else None
+    except Exception:  # pylint: disable=W0718
+        pass
+    return None
+
+
+def _assert_frame_package_name(pattern: str, frame: types.FrameType) -> bool:
+    """Check the package name of frame is match pattern.
+
+    :param pattern: The pattern to match the package name of `frame` against.
+    :type pattern: str
+    :param frame: The stack frame
+    :type frame: types.FrameType
+    :return: True if the package name of the frame matches pattern, False otherwise
+    :rtype: bool
+    """
+    # f_globals records the function's module globals of the frame. And __package__ of module must be set.
+    # https://docs.python.org/3/reference/import.html#__package__
+    # Although __package__ is set when importing, it may happen __package__ does not exist in globals
+    # when using exec to execute.
+    package_name = frame.f_globals.get("__package__", "")
+    return bool(package_name and re.match(pattern, package_name))
+
+
+def _relative_to(
+    path: Union[str, os.PathLike], basedir: Union[str, os.PathLike], raises_if_impossible: bool = False
+) -> Optional[Path]:
+    """Compute the relative path under basedir.
+
+    This is a wrapper function of Path.relative_to, by default Path.relative_to raises if path is not under basedir, In
+    this function, it returns None if raises_if_impossible=False, otherwise raises.
+
+    :param path: A path
+    :type path: Union[str, os.PathLike]
+    :param basedir: The base path to compute `path` relative to
+    :type basedir: Union[str, os.PathLike]
+    :param raises_if_impossible: Whether to raise if :attr:`pathlib.Path.relative_to` throws. Defaults to False.
+    :type raises_if_impossible: bool
+    :return:
+        * None if raises_if_impossible is False and basedir is not a parent of path
+        * path.relative_to(basedir) otherwise
+    :rtype: Optional[Path]
+    """
+    # The second resolve is to resolve possible win short path.
+    path = Path(path).resolve().absolute().resolve()
+    basedir = Path(basedir).resolve().absolute().resolve()
+    try:
+        return path.relative_to(basedir)
+    except ValueError:
+        if raises_if_impossible:
+            raise
+        return None
+
+
+@contextlib.contextmanager
+def inject_sys_path(path: object) -> Generator:
+    original_sys_path = sys.path.copy()
+    sys.path.insert(0, str(path))
+    try:
+        yield
+    finally:
+        sys.path = original_sys_path
+
+
+def _force_reload_module(module: types.ModuleType) -> types.ModuleType:
+    # Reload the module except the case that module.__spec__ is None.
+    # In the case module.__spec__ is None (E.g. module is __main__), reload will raise exception.
+    if module.__spec__ is None:
+        return module
+    path = Path(module.__spec__.loader.path).parent  # type: ignore
+    with inject_sys_path(path):
+        return importlib.reload(module)
+
+
+@contextlib.contextmanager
+# pylint: disable-next=docstring-missing-return,docstring-missing-rtype
+def _change_working_dir(path: Union[str, os.PathLike], mkdir: bool = True) -> Generator:
+    """Context manager for changing the current working directory.
+
+    :param path: The path to change to
+    :type path: Union[str, os.PathLike]
+    :param mkdir: Whether to ensure `path` exists, creating it if it doesn't exists. Defaults to True.
+    :type mkdir: bool
+    """
+
+    saved_path = os.getcwd()
+    if mkdir:
+        os.makedirs(path, exist_ok=True)
+    os.chdir(str(path))
+    try:
+        yield
+    finally:
+        os.chdir(saved_path)
+
+
+def _import_component_with_working_dir(
+    module_name: str, working_dir: Optional[str] = None, force_reload: bool = False
+) -> types.ModuleType:
+    if working_dir is None:
+        working_dir = os.getcwd()
+    working_dir = str(Path(working_dir).resolve().absolute())
+
+    with _change_working_dir(working_dir, mkdir=False), inject_sys_path(working_dir):
+        try:
+            py_module = importlib.import_module(module_name)
+        except Exception as e:
+            raise ComponentException(
+                message=str(e),
+                no_personal_data_message="Failure importing component with working directory",
+                target=ErrorTarget.COMPONENT,
+                error=e,
+                error_category=ErrorCategory.SYSTEM_ERROR,
+            ) from e
+        except BaseException as e:
+            # raise base exception like system.exit as normal exception
+            raise ComponentException(
+                message=str(e),
+                no_personal_data_message="Failure importing component with working directory",
+                target=ErrorTarget.COMPONENT,
+                error=e,
+                error_category=ErrorCategory.USER_ERROR,
+            ) from e
+        loaded_module_file = Path(str(py_module.__file__)).resolve().absolute().as_posix()
+        posix_working_dir = Path(working_dir).absolute().as_posix()
+        if _relative_to(loaded_module_file, posix_working_dir) is None:
+            if force_reload:
+                # If force_reload is True, reload the module instead of raising exception.
+                # This is used when we don't care the original module with the same name.
+                return importlib.reload(py_module)
+            raise RuntimeError(
+                "Could not import module: '{}' because module with the same name has been loaded.\n"
+                "Path of the module: {}\n"
+                "Working dir: {}".format(module_name, loaded_module_file, posix_working_dir)
+            )
+        return py_module
+
+
+@contextlib.contextmanager
+def environment_variable_overwrite(key: str, val: str) -> Generator:
+    if key in os.environ:
+        backup_value = os.environ[key]
+    else:
+        backup_value = None
+    os.environ[key] = val
+
+    try:
+        yield
+    finally:
+        if backup_value:
+            os.environ[key] = backup_value
+        else:
+            os.environ.pop(key)