about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py285
1 files changed, 285 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py
new file mode 100644
index 00000000..a8f08d1e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py
@@ -0,0 +1,285 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+import os
+from typing import Any, Dict, List, Optional, Union
+
+from azure.ai.ml.constants._component import ComponentSource
+from azure.ai.ml.entities._component.parallel_component import ParallelComponent
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+)
+from azure.ai.ml.entities._deployment.deployment_settings import BatchRetrySettings
+from azure.ai.ml.entities._job.parallel.run_function import RunFunction
+
+from .command_func import _parse_input, _parse_inputs_outputs, _parse_output
+from .parallel import Parallel
+
+
+def parallel_run_function(
+    *,
+    name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    properties: Optional[Dict] = None,
+    display_name: Optional[str] = None,
+    experiment_name: Optional[str] = None,
+    compute: Optional[str] = None,
+    retry_settings: Optional[BatchRetrySettings] = None,
+    environment_variables: Optional[Dict] = None,
+    logging_level: Optional[str] = None,
+    max_concurrency_per_instance: Optional[int] = None,
+    error_threshold: Optional[int] = None,
+    mini_batch_error_threshold: Optional[int] = None,
+    task: Optional[RunFunction] = None,
+    mini_batch_size: Optional[str] = None,
+    partition_keys: Optional[List] = None,
+    input_data: Optional[str] = None,
+    inputs: Optional[Dict] = None,
+    outputs: Optional[Dict] = None,
+    instance_count: Optional[int] = None,
+    instance_type: Optional[str] = None,
+    docker_args: Optional[str] = None,
+    shm_size: Optional[str] = None,
+    identity: Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]] = None,
+    is_deterministic: bool = True,
+    **kwargs: Any,
+) -> Parallel:
+    """Create a Parallel object which can be used inside dsl.pipeline as a function and can also be created as a
+    standalone parallel job.
+
+    For an example of using ParallelRunStep, see the notebook
+    https://aka.ms/parallel-example-notebook
+
+    .. note::
+
+        To use parallel_run_function:
+
+        * Create a :class:`azure.ai.ml.entities._builders.Parallel` object to specify how parallel run is performed,
+          with parameters to control batch size,number of nodes per compute target, and a
+          reference to your custom Python script.
+
+        * Build pipeline with the parallel object as a function. defines inputs and
+          outputs for the step.
+
+        * Sumbit the pipeline to run.
+
+    .. code:: python
+
+        from azure.ai.ml import Input, Output, parallel
+
+        parallel_run = parallel_run_function(
+            name="batch_score_with_tabular_input",
+            display_name="Batch Score with Tabular Dataset",
+            description="parallel component for batch score",
+            inputs=dict(
+                job_data_path=Input(
+                    type=AssetTypes.MLTABLE,
+                    description="The data to be split and scored in parallel",
+                ),
+                score_model=Input(
+                    type=AssetTypes.URI_FOLDER, description="The model for batch score."
+                ),
+            ),
+            outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
+            input_data="${{inputs.job_data_path}}",
+            max_concurrency_per_instance=2,  # Optional, default is 1
+            mini_batch_size="100",  # optional
+            mini_batch_error_threshold=5,  # Optional, allowed failed count on mini batch items, default is -1
+            logging_level="DEBUG",  # Optional, default is INFO
+            error_threshold=5,  # Optional, allowed failed count totally, default is -1
+            retry_settings=dict(max_retries=2, timeout=60),  # Optional
+            task=RunFunction(
+                code="./src",
+                entry_script="tabular_batch_inference.py",
+                environment=Environment(
+                    image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
+                    conda_file="./src/environment_parallel.yml",
+                ),
+                program_arguments="--model ${{inputs.score_model}}",
+                append_row_to="${{outputs.job_output_path}}",  # Optional, if not set, summary_only
+            ),
+        )
+
+    :keyword name: Name of the parallel job or component created.
+    :paramtype name: str
+    :keyword description: A friendly description of the parallel.
+    :paramtype description: str
+    :keyword tags: Tags to be attached to this parallel.
+    :paramtype tags: Dict
+    :keyword properties: The asset property dictionary.
+    :paramtype properties: Dict
+    :keyword display_name: A friendly name.
+    :paramtype display_name: str
+    :keyword experiment_name: Name of the experiment the job will be created under,
+                            if None is provided, default will be set to current directory name.
+                            Will be ignored as a pipeline step.
+    :paramtype experiment_name: str
+    :keyword compute: The name of the compute where the parallel job is executed (will not be used
+                    if the parallel is used as a component/function).
+    :paramtype compute: str
+    :keyword retry_settings: Parallel component run failed retry
+    :paramtype retry_settings: ~azure.ai.ml.entities._deployment.deployment_settings.BatchRetrySettings
+    :keyword environment_variables: A dictionary of environment variables names and values.
+                                  These environment variables are set on the process
+                                  where user script is being executed.
+    :paramtype environment_variables: Dict[str, str]
+    :keyword logging_level: A string of the logging level name, which is defined in 'logging'.
+                          Possible values are 'WARNING', 'INFO', and 'DEBUG'. (optional, default value is 'INFO'.)
+                          This value could be set through PipelineParameter.
+    :paramtype logging_level: str
+    :keyword max_concurrency_per_instance: The max parallellism that each compute instance has.
+    :paramtype max_concurrency_per_instance: int
+    :keyword error_threshold: The number of record failures for Tabular Dataset and file failures for File Dataset
+                            that should be ignored during processing.
+                            If the error count goes above this value, then the job will be aborted.
+                            Error threshold is for the entire input rather
+                            than the individual mini-batch sent to run() method.
+                            The range is [-1, int.max]. -1 indicates ignore all failures during processing
+    :paramtype error_threshold: int
+    :keyword mini_batch_error_threshold: The number of mini batch processing failures should be ignored
+    :paramtype mini_batch_error_threshold: int
+    :keyword task: The parallel task
+    :paramtype task: ~azure.ai.ml.entities._job.parallel.run_function.RunFunction
+    :keyword mini_batch_size: For FileDataset input,
+                            this field is the number of files a user script can process in one run() call.
+                            For TabularDataset input, this field is the approximate size of data
+                            the user script can process in one run() call.
+                            Example values are 1024, 1024KB, 10MB, and 1GB.
+                            (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.)
+                            This value could be set through PipelineParameter.
+    :paramtype mini_batch_size: str
+    :keyword partition_keys: The keys used to partition dataset into mini-batches. If specified,
+                           the data with the same key will be partitioned into the same mini-batch.
+                           If both partition_keys and mini_batch_size are specified,
+                           the partition keys will take effect.
+                           The input(s) must be partitioned dataset(s),
+                           and the partition_keys must be a subset of the keys of every input dataset for this to work
+    :paramtype partition_keys: List
+    :keyword input_data: The input data.
+    :paramtype input_data: str
+    :keyword inputs: A dict of inputs used by this parallel.
+    :paramtype inputs: Dict
+    :keyword outputs: The outputs of this parallel
+    :paramtype outputs: Dict
+    :keyword instance_count: Optional number of instances or nodes used by the compute target.
+                           Defaults to 1
+    :paramtype instance_count: int
+    :keyword instance_type: Optional type of VM used as supported by the compute target..
+    :paramtype instance_type: str
+    :keyword docker_args: Extra arguments to pass to the Docker run command.
+                        This would override any parameters that have already been set by the system,
+                        or in this section.
+                        This parameter is only supported for Azure ML compute types.
+    :paramtype docker_args: str
+    :keyword shm_size: Size of the docker container's shared memory block.
+                     This should be in the format of (number)(unit) where number as to be greater than 0
+                     and the unit can be one of b(bytes), k(kilobytes), m(megabytes), or g(gigabytes).
+    :paramtype shm_size: str
+    :keyword identity: Identity that PRS job will use while running on compute.
+    :paramtype identity: Optional[Union[
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration]]
+    :keyword is_deterministic: Specify whether the parallel will return same output given same input.
+                             If a parallel (component) is deterministic, when use it as a node/step in a pipeline,
+                             it will reuse results from a previous submitted job in current workspace
+                             which has same inputs and settings.
+                             In this case, this step will not use any compute resource. Defaults to True,
+                             specify is_deterministic=False if you would like to avoid such reuse behavior,
+                             defaults to True.
+    :paramtype is_deterministic: bool
+    :return: The parallel node
+    :rtype: ~azure.ai.ml._builders.parallel.Parallel
+    """
+    # pylint: disable=too-many-locals
+    inputs = inputs or {}
+    outputs = outputs or {}
+    component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+
+    component = kwargs.pop("component", None)
+
+    if component is None:
+        if task is None:
+            component = ParallelComponent(
+                base_path=os.getcwd(),  # base path should be current folder
+                name=name,
+                tags=tags,
+                code=None,
+                display_name=display_name,
+                description=description,
+                inputs=component_inputs,
+                outputs=component_outputs,
+                retry_settings=retry_settings,  # type: ignore[arg-type]
+                logging_level=logging_level,
+                max_concurrency_per_instance=max_concurrency_per_instance,
+                error_threshold=error_threshold,
+                mini_batch_error_threshold=mini_batch_error_threshold,
+                task=task,
+                mini_batch_size=mini_batch_size,
+                partition_keys=partition_keys,
+                input_data=input_data,
+                _source=ComponentSource.BUILDER,
+                is_deterministic=is_deterministic,
+                **kwargs,
+            )
+        else:
+            component = ParallelComponent(
+                base_path=os.getcwd(),  # base path should be current folder
+                name=name,
+                tags=tags,
+                code=task.code,
+                display_name=display_name,
+                description=description,
+                inputs=component_inputs,
+                outputs=component_outputs,
+                retry_settings=retry_settings,  # type: ignore[arg-type]
+                logging_level=logging_level,
+                max_concurrency_per_instance=max_concurrency_per_instance,
+                error_threshold=error_threshold,
+                mini_batch_error_threshold=mini_batch_error_threshold,
+                task=task,
+                mini_batch_size=mini_batch_size,
+                partition_keys=partition_keys,
+                input_data=input_data,
+                _source=ComponentSource.BUILDER,
+                is_deterministic=is_deterministic,
+                **kwargs,
+            )
+
+    parallel_obj = Parallel(
+        component=component,
+        name=name,
+        description=description,
+        tags=tags,
+        properties=properties,
+        display_name=display_name,
+        experiment_name=experiment_name,
+        compute=compute,
+        inputs=job_inputs,
+        outputs=job_outputs,
+        identity=identity,
+        environment_variables=environment_variables,
+        retry_settings=retry_settings,  # type: ignore[arg-type]
+        logging_level=logging_level,
+        max_concurrency_per_instance=max_concurrency_per_instance,
+        error_threshold=error_threshold,
+        mini_batch_error_threshold=mini_batch_error_threshold,
+        task=task,
+        mini_batch_size=mini_batch_size,
+        partition_keys=partition_keys,
+        input_data=input_data,
+        **kwargs,
+    )
+
+    if instance_count is not None or instance_type is not None or docker_args is not None or shm_size is not None:
+        parallel_obj.set_resources(
+            instance_count=instance_count, instance_type=instance_type, docker_args=docker_args, shm_size=shm_size
+        )
+
+    return parallel_obj