about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py314
1 files changed, 314 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py
new file mode 100644
index 00000000..c542f880
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/entities/_builders/command_func.py
@@ -0,0 +1,314 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access
+
+import os
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+from azure.ai.ml.constants._common import AssetTypes, LegacyAssetTypes
+from azure.ai.ml.constants._component import ComponentSource
+from azure.ai.ml.entities._assets.environment import Environment
+from azure.ai.ml.entities._component.command_component import CommandComponent
+from azure.ai.ml.entities._credentials import (
+    AmlTokenConfiguration,
+    ManagedIdentityConfiguration,
+    UserIdentityConfiguration,
+)
+from azure.ai.ml.entities._inputs_outputs import Input, Output
+from azure.ai.ml.entities._job.distribution import (
+    DistributionConfiguration,
+    MpiDistribution,
+    PyTorchDistribution,
+    RayDistribution,
+    TensorFlowDistribution,
+)
+from azure.ai.ml.entities._job.job_service import (
+    JobService,
+    JupyterLabJobService,
+    SshJobService,
+    TensorBoardJobService,
+    VsCodeJobService,
+)
+from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
+from azure.ai.ml.entities._job.sweep.search_space import SweepDistribution
+from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException
+
+from .command import Command
+
+SUPPORTED_INPUTS = [
+    LegacyAssetTypes.PATH,
+    AssetTypes.URI_FILE,
+    AssetTypes.URI_FOLDER,
+    AssetTypes.CUSTOM_MODEL,
+    AssetTypes.MLFLOW_MODEL,
+    AssetTypes.MLTABLE,
+    AssetTypes.TRITON_MODEL,
+]
+
+
+def _parse_input(input_value: Union[Input, Dict, SweepDistribution, str, bool, int, float]) -> Tuple:
+    component_input = None
+    job_input: Optional[Union[Input, Dict, SweepDistribution, str, bool, int, float]] = None
+
+    if isinstance(input_value, Input):
+        component_input = Input(**input_value._to_dict())
+        input_type = input_value.type
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value._to_dict())
+    elif isinstance(input_value, dict):
+        # if user provided dict, we try to parse it to Input.
+        # for job input, only parse for path type
+        input_type = input_value.get("type", None)
+        if input_type in SUPPORTED_INPUTS:
+            job_input = Input(**input_value)
+        component_input = Input(**input_value)
+    elif isinstance(input_value, (SweepDistribution, str, bool, int, float)):
+        # Input bindings are not supported
+        component_input = ComponentTranslatableMixin._to_input_builder_function(input_value)
+        job_input = input_value
+    else:
+        msg = f"Unsupported input type: {type(input_value)}"
+        msg += ", only Input, dict, str, bool, int and float are supported."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+    return component_input, job_input
+
+
+def _parse_output(output_value: Optional[Union[Output, Dict, str]]) -> Tuple:
+    component_output = None
+    job_output: Optional[Union[Output, Dict, str]] = None
+
+    if isinstance(output_value, Output):
+        component_output = Output(**output_value._to_dict())
+        job_output = Output(**output_value._to_dict())
+    elif not output_value:
+        # output value can be None or empty dictionary
+        # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder
+        component_output = ComponentTranslatableMixin._to_output(output_value)
+        job_output = output_value
+    elif isinstance(output_value, dict):  # When output value is a non-empty dictionary
+        job_output = Output(**output_value)
+        component_output = Output(**output_value)
+    elif isinstance(output_value, str):  # When output is passed in from pipeline job yaml
+        job_output = output_value
+    else:
+        msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.JOB,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+    return component_output, job_output
+
+
+def _parse_inputs_outputs(io_dict: Dict, parse_func: Callable) -> Tuple[Dict, Dict]:
+    component_io_dict, job_io_dict = {}, {}
+    if io_dict:
+        for key, val in io_dict.items():
+            component_io, job_io = parse_func(val)
+            component_io_dict[key] = component_io
+            job_io_dict[key] = job_io
+    return component_io_dict, job_io_dict
+
+
+def command(
+    *,
+    name: Optional[str] = None,
+    description: Optional[str] = None,
+    tags: Optional[Dict] = None,
+    properties: Optional[Dict] = None,
+    display_name: Optional[str] = None,
+    command: Optional[str] = None,  # pylint: disable=redefined-outer-name
+    experiment_name: Optional[str] = None,
+    environment: Optional[Union[str, Environment]] = None,
+    environment_variables: Optional[Dict] = None,
+    distribution: Optional[
+        Union[
+            Dict,
+            MpiDistribution,
+            TensorFlowDistribution,
+            PyTorchDistribution,
+            RayDistribution,
+            DistributionConfiguration,
+        ]
+    ] = None,
+    compute: Optional[str] = None,
+    inputs: Optional[Dict] = None,
+    outputs: Optional[Dict] = None,
+    instance_count: Optional[int] = None,
+    instance_type: Optional[str] = None,
+    locations: Optional[List[str]] = None,
+    docker_args: Optional[Union[str, List[str]]] = None,
+    shm_size: Optional[str] = None,
+    timeout: Optional[int] = None,
+    code: Optional[Union[str, os.PathLike]] = None,
+    identity: Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]] = None,
+    is_deterministic: bool = True,
+    services: Optional[
+        Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]]
+    ] = None,
+    job_tier: Optional[str] = None,
+    priority: Optional[str] = None,
+    parent_job_name: Optional[str] = None,
+    **kwargs: Any,
+) -> Command:
+    """Creates a Command object which can be used inside a dsl.pipeline function or used as a standalone Command job.
+
+    :keyword name: The name of the Command job or component.
+    :paramtype name: Optional[str]
+    :keyword description: The description of the Command. Defaults to None.
+    :paramtype description: Optional[str]
+    :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None.
+    :paramtype tags: Optional[dict[str, str]]
+    :keyword properties: The job property dictionary. Defaults to None.
+    :paramtype properties: Optional[dict[str, str]]
+    :keyword display_name: The display name of the job. Defaults to a randomly generated name.
+    :paramtype display_name: Optional[str]
+    :keyword command: The command to be executed. Defaults to None.
+    :paramtype command: Optional[str]
+    :keyword experiment_name: The name of the experiment that the job will be created under. Defaults to current
+        directory name.
+    :paramtype experiment_name: Optional[str]
+    :keyword environment: The environment that the job will run in.
+    :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
+    :keyword environment_variables: A dictionary of environment variable names and values.
+        These environment variables are set on the process where user script is being executed.
+        Defaults to None.
+    :paramtype environment_variables: Optional[dict[str, str]]
+    :keyword distribution: The configuration for distributed jobs. Defaults to None.
+    :paramtype distribution: Optional[Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
+        ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]]
+    :keyword compute: The compute target the job will run on. Defaults to default compute.
+    :paramtype compute: Optional[str]
+    :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None.
+    :paramtype inputs: Optional[dict[str, Union[~azure.ai.ml.Input, str, bool, int, float, Enum]]]
+    :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None.
+    :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
+    :keyword instance_count: The number of instances or nodes to be used by the compute target. Defaults to 1.
+    :paramtype instance_count: Optional[int]
+    :keyword instance_type: The type of VM to be used by the compute target.
+    :paramtype instance_type: Optional[str]
+    :keyword locations: The list of locations where the job will run.
+    :paramtype locations: Optional[List[str]]
+    :keyword docker_args: Extra arguments to pass to the Docker run command. This would override any
+        parameters that have already been set by the system, or in this section. This parameter is only
+        supported for Azure ML compute types. Defaults to None.
+    :paramtype docker_args: Optional[Union[str,List[str]]]
+    :keyword shm_size: The size of the Docker container's shared memory block. This should be in the
+        format of (number)(unit) where the number has to be greater than 0 and the unit can be one of
+        b(bytes), k(kilobytes), m(megabytes), or g(gigabytes).
+    :paramtype shm_size: Optional[str]
+    :keyword timeout: The number, in seconds, after which the job will be cancelled.
+    :paramtype timeout: Optional[int]
+    :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url
+        pointing to a remote location.
+    :paramtype code: Optional[Union[str, os.PathLike]]
+    :keyword identity: The identity that the command job will use while running on compute.
+    :paramtype identity: Optional[Union[
+        ~azure.ai.ml.entities.ManagedIdentityConfiguration,
+        ~azure.ai.ml.entities.AmlTokenConfiguration,
+        ~azure.ai.ml.entities.UserIdentityConfiguration]]
+    :keyword is_deterministic: Specifies whether the Command will return the same output given the same input.
+        Defaults to True. When True, if a Command Component is deterministic and has been run before in the
+        current workspace with the same input and settings, it will reuse results from a previously submitted
+        job when used as a node or step in a pipeline. In that scenario, no compute resources will be used.
+    :paramtype is_deterministic: bool
+    :keyword services: The interactive services for the node. Defaults to None. This is an experimental parameter,
+        and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
+    :paramtype services: Optional[dict[str, Union[~azure.ai.ml.entities.JobService,
+        ~azure.ai.ml.entities.JupyterLabJobService, ~azure.ai.ml.entities.SshJobService,
+        ~azure.ai.ml.entities.TensorBoardJobService, ~azure.ai.ml.entities.VsCodeJobService]]]
+    :keyword job_tier: The job tier. Accepted values are "Spot", "Basic", "Standard", or "Premium".
+    :paramtype job_tier: Optional[str]
+    :keyword priority: The priority of the job on the compute. Accepted values are "low", "medium", and "high".
+        Defaults to "medium".
+    :paramtype priority: Optional[str]
+    :keyword parent_job_name: parent job id for command job
+    :paramtype parent_job_name: Optional[str]
+    :return: A Command object.
+    :rtype: ~azure.ai.ml.entities.Command
+
+    .. admonition:: Example:
+
+        .. literalinclude:: ../samples/ml_samples_command_configurations.py
+            :start-after: [START command_function]
+            :end-before: [END command_function]
+            :language: python
+            :dedent: 8
+            :caption: Creating a Command Job using the command() builder method.
+    """
+    # pylint: disable=too-many-locals
+    inputs = inputs or {}
+    outputs = outputs or {}
+    component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
+    # job inputs can not be None
+    job_inputs = {k: v for k, v in job_inputs.items() if v is not None}
+    component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output)
+
+    component = kwargs.pop("component", None)
+    if component is None:
+        component = CommandComponent(
+            name=name,
+            tags=tags,
+            code=code,
+            command=command,
+            environment=environment,
+            display_name=display_name,
+            description=description,
+            inputs=component_inputs,
+            outputs=component_outputs,
+            distribution=distribution,
+            environment_variables=environment_variables,
+            _source=ComponentSource.BUILDER,
+            is_deterministic=is_deterministic,
+            **kwargs,
+        )
+    command_obj = Command(
+        component=component,
+        name=name,
+        description=description,
+        tags=tags,
+        properties=properties,
+        display_name=display_name,
+        experiment_name=experiment_name,
+        compute=compute,
+        inputs=job_inputs,
+        outputs=job_outputs,
+        identity=identity,
+        distribution=distribution,
+        environment=environment,
+        environment_variables=environment_variables,
+        services=services,
+        parent_job_name=parent_job_name,
+        **kwargs,
+    )
+
+    if (
+        locations is not None
+        or instance_count is not None
+        or instance_type is not None
+        or docker_args is not None
+        or shm_size is not None
+    ):
+        command_obj.set_resources(
+            locations=locations,
+            instance_count=instance_count,
+            instance_type=instance_type,
+            docker_args=docker_args,
+            shm_size=shm_size,
+        )
+
+    if timeout is not None:
+        command_obj.set_limits(timeout=timeout)
+
+    if job_tier is not None or priority is not None:
+        command_obj.set_queue_settings(job_tier=job_tier, priority=priority)
+
+    return command_obj