diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py')
| -rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py | 285 |
1 files changed, 285 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py new file mode 100644 index 00000000..a8f08d1e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/parallel_func.py @@ -0,0 +1,285 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import os +from typing import Any, Dict, List, Optional, Union + +from azure.ai.ml.constants._component import ComponentSource +from azure.ai.ml.entities._component.parallel_component import ParallelComponent +from azure.ai.ml.entities._credentials import ( + AmlTokenConfiguration, + ManagedIdentityConfiguration, + UserIdentityConfiguration, +) +from azure.ai.ml.entities._deployment.deployment_settings import BatchRetrySettings +from azure.ai.ml.entities._job.parallel.run_function import RunFunction + +from .command_func import _parse_input, _parse_inputs_outputs, _parse_output +from .parallel import Parallel + + +def parallel_run_function( + *, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Dict] = None, + properties: Optional[Dict] = None, + display_name: Optional[str] = None, + experiment_name: Optional[str] = None, + compute: Optional[str] = None, + retry_settings: Optional[BatchRetrySettings] = None, + environment_variables: Optional[Dict] = None, + logging_level: Optional[str] = None, + max_concurrency_per_instance: Optional[int] = None, + error_threshold: Optional[int] = None, + mini_batch_error_threshold: Optional[int] = None, + task: Optional[RunFunction] = None, + mini_batch_size: Optional[str] = None, + partition_keys: Optional[List] = None, + input_data: Optional[str] = None, + inputs: Optional[Dict] = None, + outputs: Optional[Dict] = None, + instance_count: Optional[int] = None, + instance_type: Optional[str] = None, + docker_args: Optional[str] = None, + shm_size: Optional[str] = None, + identity: Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]] = None, + is_deterministic: bool = True, + **kwargs: Any, +) -> Parallel: + """Create a Parallel object which can be used inside dsl.pipeline as a function and can also be created as a + standalone parallel job. + + For an example of using ParallelRunStep, see the notebook + https://aka.ms/parallel-example-notebook + + .. note:: + + To use parallel_run_function: + + * Create a :class:`azure.ai.ml.entities._builders.Parallel` object to specify how parallel run is performed, + with parameters to control batch size,number of nodes per compute target, and a + reference to your custom Python script. + + * Build pipeline with the parallel object as a function. defines inputs and + outputs for the step. + + * Sumbit the pipeline to run. + + .. code:: python + + from azure.ai.ml import Input, Output, parallel + + parallel_run = parallel_run_function( + name="batch_score_with_tabular_input", + display_name="Batch Score with Tabular Dataset", + description="parallel component for batch score", + inputs=dict( + job_data_path=Input( + type=AssetTypes.MLTABLE, + description="The data to be split and scored in parallel", + ), + score_model=Input( + type=AssetTypes.URI_FOLDER, description="The model for batch score." + ), + ), + outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), + input_data="${{inputs.job_data_path}}", + max_concurrency_per_instance=2, # Optional, default is 1 + mini_batch_size="100", # optional + mini_batch_error_threshold=5, # Optional, allowed failed count on mini batch items, default is -1 + logging_level="DEBUG", # Optional, default is INFO + error_threshold=5, # Optional, allowed failed count totally, default is -1 + retry_settings=dict(max_retries=2, timeout=60), # Optional + task=RunFunction( + code="./src", + entry_script="tabular_batch_inference.py", + environment=Environment( + image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04", + conda_file="./src/environment_parallel.yml", + ), + program_arguments="--model ${{inputs.score_model}}", + append_row_to="${{outputs.job_output_path}}", # Optional, if not set, summary_only + ), + ) + + :keyword name: Name of the parallel job or component created. + :paramtype name: str + :keyword description: A friendly description of the parallel. + :paramtype description: str + :keyword tags: Tags to be attached to this parallel. + :paramtype tags: Dict + :keyword properties: The asset property dictionary. + :paramtype properties: Dict + :keyword display_name: A friendly name. + :paramtype display_name: str + :keyword experiment_name: Name of the experiment the job will be created under, + if None is provided, default will be set to current directory name. + Will be ignored as a pipeline step. + :paramtype experiment_name: str + :keyword compute: The name of the compute where the parallel job is executed (will not be used + if the parallel is used as a component/function). + :paramtype compute: str + :keyword retry_settings: Parallel component run failed retry + :paramtype retry_settings: ~azure.ai.ml.entities._deployment.deployment_settings.BatchRetrySettings + :keyword environment_variables: A dictionary of environment variables names and values. + These environment variables are set on the process + where user script is being executed. + :paramtype environment_variables: Dict[str, str] + :keyword logging_level: A string of the logging level name, which is defined in 'logging'. + Possible values are 'WARNING', 'INFO', and 'DEBUG'. (optional, default value is 'INFO'.) + This value could be set through PipelineParameter. + :paramtype logging_level: str + :keyword max_concurrency_per_instance: The max parallellism that each compute instance has. + :paramtype max_concurrency_per_instance: int + :keyword error_threshold: The number of record failures for Tabular Dataset and file failures for File Dataset + that should be ignored during processing. + If the error count goes above this value, then the job will be aborted. + Error threshold is for the entire input rather + than the individual mini-batch sent to run() method. + The range is [-1, int.max]. -1 indicates ignore all failures during processing + :paramtype error_threshold: int + :keyword mini_batch_error_threshold: The number of mini batch processing failures should be ignored + :paramtype mini_batch_error_threshold: int + :keyword task: The parallel task + :paramtype task: ~azure.ai.ml.entities._job.parallel.run_function.RunFunction + :keyword mini_batch_size: For FileDataset input, + this field is the number of files a user script can process in one run() call. + For TabularDataset input, this field is the approximate size of data + the user script can process in one run() call. + Example values are 1024, 1024KB, 10MB, and 1GB. + (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) + This value could be set through PipelineParameter. + :paramtype mini_batch_size: str + :keyword partition_keys: The keys used to partition dataset into mini-batches. If specified, + the data with the same key will be partitioned into the same mini-batch. + If both partition_keys and mini_batch_size are specified, + the partition keys will take effect. + The input(s) must be partitioned dataset(s), + and the partition_keys must be a subset of the keys of every input dataset for this to work + :paramtype partition_keys: List + :keyword input_data: The input data. + :paramtype input_data: str + :keyword inputs: A dict of inputs used by this parallel. + :paramtype inputs: Dict + :keyword outputs: The outputs of this parallel + :paramtype outputs: Dict + :keyword instance_count: Optional number of instances or nodes used by the compute target. + Defaults to 1 + :paramtype instance_count: int + :keyword instance_type: Optional type of VM used as supported by the compute target.. + :paramtype instance_type: str + :keyword docker_args: Extra arguments to pass to the Docker run command. + This would override any parameters that have already been set by the system, + or in this section. + This parameter is only supported for Azure ML compute types. + :paramtype docker_args: str + :keyword shm_size: Size of the docker container's shared memory block. + This should be in the format of (number)(unit) where number as to be greater than 0 + and the unit can be one of b(bytes), k(kilobytes), m(megabytes), or g(gigabytes). + :paramtype shm_size: str + :keyword identity: Identity that PRS job will use while running on compute. + :paramtype identity: Optional[Union[ + ~azure.ai.ml.entities.ManagedIdentityConfiguration, + ~azure.ai.ml.entities.AmlTokenConfiguration, + ~azure.ai.ml.entities.UserIdentityConfiguration]] + :keyword is_deterministic: Specify whether the parallel will return same output given same input. + If a parallel (component) is deterministic, when use it as a node/step in a pipeline, + it will reuse results from a previous submitted job in current workspace + which has same inputs and settings. + In this case, this step will not use any compute resource. Defaults to True, + specify is_deterministic=False if you would like to avoid such reuse behavior, + defaults to True. + :paramtype is_deterministic: bool + :return: The parallel node + :rtype: ~azure.ai.ml._builders.parallel.Parallel + """ + # pylint: disable=too-many-locals + inputs = inputs or {} + outputs = outputs or {} + component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) + # job inputs can not be None + job_inputs = {k: v for k, v in job_inputs.items() if v is not None} + component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) + + component = kwargs.pop("component", None) + + if component is None: + if task is None: + component = ParallelComponent( + base_path=os.getcwd(), # base path should be current folder + name=name, + tags=tags, + code=None, + display_name=display_name, + description=description, + inputs=component_inputs, + outputs=component_outputs, + retry_settings=retry_settings, # type: ignore[arg-type] + logging_level=logging_level, + max_concurrency_per_instance=max_concurrency_per_instance, + error_threshold=error_threshold, + mini_batch_error_threshold=mini_batch_error_threshold, + task=task, + mini_batch_size=mini_batch_size, + partition_keys=partition_keys, + input_data=input_data, + _source=ComponentSource.BUILDER, + is_deterministic=is_deterministic, + **kwargs, + ) + else: + component = ParallelComponent( + base_path=os.getcwd(), # base path should be current folder + name=name, + tags=tags, + code=task.code, + display_name=display_name, + description=description, + inputs=component_inputs, + outputs=component_outputs, + retry_settings=retry_settings, # type: ignore[arg-type] + logging_level=logging_level, + max_concurrency_per_instance=max_concurrency_per_instance, + error_threshold=error_threshold, + mini_batch_error_threshold=mini_batch_error_threshold, + task=task, + mini_batch_size=mini_batch_size, + partition_keys=partition_keys, + input_data=input_data, + _source=ComponentSource.BUILDER, + is_deterministic=is_deterministic, + **kwargs, + ) + + parallel_obj = Parallel( + component=component, + name=name, + description=description, + tags=tags, + properties=properties, + display_name=display_name, + experiment_name=experiment_name, + compute=compute, + inputs=job_inputs, + outputs=job_outputs, + identity=identity, + environment_variables=environment_variables, + retry_settings=retry_settings, # type: ignore[arg-type] + logging_level=logging_level, + max_concurrency_per_instance=max_concurrency_per_instance, + error_threshold=error_threshold, + mini_batch_error_threshold=mini_batch_error_threshold, + task=task, + mini_batch_size=mini_batch_size, + partition_keys=partition_keys, + input_data=input_data, + **kwargs, + ) + + if instance_count is not None or instance_type is not None or docker_args is not None or shm_size is not None: + parallel_obj.set_resources( + instance_count=instance_count, instance_type=instance_type, docker_args=docker_args, shm_size=shm_size + ) + + return parallel_obj |
