diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py | 1677 |
1 files changed, 1677 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py new file mode 100644 index 00000000..0003c8cd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py @@ -0,0 +1,1677 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access, too-many-instance-attributes, too-many-statements, too-many-lines +import json +import os.path +from os import PathLike +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Set, Union, cast + +import jwt +from marshmallow import ValidationError + +from azure.ai.ml._artifacts._artifact_utilities import ( + _upload_and_generate_remote_uri, + aml_datastore_path_exists, + download_artifact_from_aml_uri, +) +from azure.ai.ml._azure_environments import ( + _get_aml_resource_id_from_metadata, + _get_base_url_from_metadata, + _resource_to_scopes, +) +from azure.ai.ml._exception_helper import log_and_raise_error +from azure.ai.ml._restclient.dataset_dataplane import AzureMachineLearningWorkspaces as ServiceClientDatasetDataplane +from azure.ai.ml._restclient.model_dataplane import AzureMachineLearningWorkspaces as ServiceClientModelDataplane +from azure.ai.ml._restclient.runhistory import AzureMachineLearningWorkspaces as ServiceClientRunHistory +from azure.ai.ml._restclient.runhistory.models import Run +from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient022023Preview +from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase, ListViewType, UserIdentity +from azure.ai.ml._restclient.v2023_08_01_preview.models import JobType as RestJobType +from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase as JobBase_2401 +from azure.ai.ml._restclient.v2024_10_01_preview.models import JobType as RestJobType_20241001Preview +from azure.ai.ml._scope_dependent_operations import ( + OperationConfig, + OperationsContainer, + OperationScope, + _ScopeDependentOperations, +) +from azure.ai.ml._telemetry import ActivityType, monitor_with_activity, monitor_with_telemetry_mixin +from azure.ai.ml._utils._http_utils import HttpPipeline +from azure.ai.ml._utils._logger_utils import OpsLogger +from azure.ai.ml._utils.utils import ( + create_requests_pipeline_with_retry, + download_text_from_url, + is_data_binding_expression, + is_private_preview_enabled, + is_url, +) +from azure.ai.ml.constants._common import ( + AZUREML_RESOURCE_PROVIDER, + BATCH_JOB_CHILD_RUN_OUTPUT_NAME, + COMMON_RUNTIME_ENV_VAR, + DEFAULT_ARTIFACT_STORE_OUTPUT_NAME, + GIT_PATH_PREFIX, + LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT, + LOCAL_COMPUTE_TARGET, + SERVERLESS_COMPUTE, + SHORT_URI_FORMAT, + SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, + TID_FMT, + AssetTypes, + AzureMLResourceType, + WorkspaceDiscoveryUrlKey, +) +from azure.ai.ml.constants._compute import ComputeType +from azure.ai.ml.constants._job.pipeline import PipelineConstants +from azure.ai.ml.entities import Compute, Job, PipelineJob, ServiceInstance, ValidationResult +from azure.ai.ml.entities._assets._artifacts.code import Code +from azure.ai.ml.entities._builders import BaseNode, Command, Spark +from azure.ai.ml.entities._datastore._constants import WORKSPACE_BLOB_STORE +from azure.ai.ml.entities._inputs_outputs import Input +from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob +from azure.ai.ml.entities._job.base_job import _BaseJob +from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob +from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob +from azure.ai.ml.entities._job.import_job import ImportJob +from azure.ai.ml.entities._job.job import _is_pipeline_child_job +from azure.ai.ml.entities._job.parallel.parallel_job import ParallelJob +from azure.ai.ml.entities._job.to_rest_functions import to_rest_job_object +from azure.ai.ml.entities._validation import PathAwareSchemaValidatableMixin +from azure.ai.ml.exceptions import ( + ComponentException, + ErrorCategory, + ErrorTarget, + JobException, + JobParsingError, + MlException, + PipelineChildJobError, + UserErrorException, + ValidationErrorType, + ValidationException, +) +from azure.ai.ml.operations._run_history_constants import RunHistoryConstants +from azure.ai.ml.sweep import SweepJob +from azure.core.credentials import TokenCredential +from azure.core.exceptions import HttpResponseError, ResourceNotFoundError +from azure.core.polling import LROPoller +from azure.core.tracing.decorator import distributed_trace + +from ..constants._component import ComponentSource +from ..entities._builders.control_flow_node import ControlFlowNode +from ..entities._job.pipeline._io import InputOutputBase, PipelineInput, _GroupAttrDict +from ._component_operations import ComponentOperations +from ._compute_operations import ComputeOperations +from ._dataset_dataplane_operations import DatasetDataplaneOperations +from ._job_ops_helper import get_git_properties, get_job_output_uris_from_dataplane, stream_logs_until_completion +from ._local_job_invoker import is_local_run, start_run_if_local +from ._model_dataplane_operations import ModelDataplaneOperations +from ._operation_orchestrator import ( + OperationOrchestrator, + _AssetResolver, + is_ARM_id_for_resource, + is_registry_id_for_resource, + is_singularity_full_name_for_resource, + is_singularity_id_for_resource, + is_singularity_short_name_for_resource, +) +from ._run_operations import RunOperations +from ._virtual_cluster_operations import VirtualClusterOperations + +try: + pass +except ImportError: + pass + +if TYPE_CHECKING: + from azure.ai.ml.operations import DatastoreOperations + +ops_logger = OpsLogger(__name__) +module_logger = ops_logger.module_logger + + +class JobOperations(_ScopeDependentOperations): + """Initiates an instance of JobOperations + + This class should not be instantiated directly. Instead, use the `jobs` attribute of an MLClient object. + + :param operation_scope: Scope variables for the operations classes of an MLClient object. + :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope + :param operation_config: Common configuration for operations classes of an MLClient object. + :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig + :param service_client_02_2023_preview: Service client to allow end users to operate on Azure Machine Learning + Workspace resources. + :type service_client_02_2023_preview: ~azure.ai.ml._restclient.v2023_02_01_preview.AzureMachineLearningWorkspaces + :param all_operations: All operations classes of an MLClient object. + :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer + :param credential: Credential to use for authentication. + :type credential: ~azure.core.credentials.TokenCredential + """ + + def __init__( + self, + operation_scope: OperationScope, + operation_config: OperationConfig, + service_client_02_2023_preview: ServiceClient022023Preview, + all_operations: OperationsContainer, + credential: TokenCredential, + **kwargs: Any, + ) -> None: + super(JobOperations, self).__init__(operation_scope, operation_config) + ops_logger.update_filter() + + self._operation_2023_02_preview = service_client_02_2023_preview.jobs + self._service_client = service_client_02_2023_preview + self._all_operations = all_operations + self._stream_logs_until_completion = stream_logs_until_completion + # Dataplane service clients are lazily created as they are needed + self._runs_operations_client: Optional[RunOperations] = None + self._dataset_dataplane_operations_client: Optional[DatasetDataplaneOperations] = None + self._model_dataplane_operations_client: Optional[ModelDataplaneOperations] = None + # Kwargs to propagate to dataplane service clients + self._service_client_kwargs = kwargs.pop("_service_client_kwargs", {}) + self._api_base_url: Optional[str] = None + self._container = "azureml" + self._credential = credential + self._orchestrators = OperationOrchestrator(self._all_operations, self._operation_scope, self._operation_config) + + self.service_client_01_2024_preview = kwargs.pop("service_client_01_2024_preview", None) + self.service_client_10_2024_preview = kwargs.pop("service_client_10_2024_preview", None) + self.service_client_01_2025_preview = kwargs.pop("service_client_01_2025_preview", None) + self._kwargs = kwargs + + self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline") + + @property + def _component_operations(self) -> ComponentOperations: + return cast( + ComponentOperations, + self._all_operations.get_operation( # type: ignore[misc] + AzureMLResourceType.COMPONENT, lambda x: isinstance(x, ComponentOperations) + ), + ) + + @property + def _compute_operations(self) -> ComputeOperations: + return cast( + ComputeOperations, + self._all_operations.get_operation( # type: ignore[misc] + AzureMLResourceType.COMPUTE, lambda x: isinstance(x, ComputeOperations) + ), + ) + + @property + def _virtual_cluster_operations(self) -> VirtualClusterOperations: + return cast( + VirtualClusterOperations, + self._all_operations.get_operation( # type: ignore[misc] + AzureMLResourceType.VIRTUALCLUSTER, + lambda x: isinstance(x, VirtualClusterOperations), + ), + ) + + @property + def _datastore_operations(self) -> "DatastoreOperations": + from azure.ai.ml.operations import DatastoreOperations + + return cast(DatastoreOperations, self._all_operations.all_operations[AzureMLResourceType.DATASTORE]) + + @property + def _runs_operations(self) -> RunOperations: + if not self._runs_operations_client: + service_client_run_history = ServiceClientRunHistory( + self._credential, base_url=self._api_url, **self._service_client_kwargs + ) + self._runs_operations_client = RunOperations( + self._operation_scope, self._operation_config, service_client_run_history + ) + return self._runs_operations_client + + @property + def _dataset_dataplane_operations(self) -> DatasetDataplaneOperations: + if not self._dataset_dataplane_operations_client: + service_client_dataset_dataplane = ServiceClientDatasetDataplane( + self._credential, base_url=self._api_url, **self._service_client_kwargs + ) + self._dataset_dataplane_operations_client = DatasetDataplaneOperations( + self._operation_scope, + self._operation_config, + service_client_dataset_dataplane, + ) + return self._dataset_dataplane_operations_client + + @property + def _model_dataplane_operations(self) -> ModelDataplaneOperations: + if not self._model_dataplane_operations_client: + service_client_model_dataplane = ServiceClientModelDataplane( + self._credential, base_url=self._api_url, **self._service_client_kwargs + ) + self._model_dataplane_operations_client = ModelDataplaneOperations( + self._operation_scope, + self._operation_config, + service_client_model_dataplane, + ) + return self._model_dataplane_operations_client + + @property + def _api_url(self) -> str: + if not self._api_base_url: + self._api_base_url = self._get_workspace_url(url_key=WorkspaceDiscoveryUrlKey.API) + return self._api_base_url + + @distributed_trace + @monitor_with_activity(ops_logger, "Job.List", ActivityType.PUBLICAPI) + def list( + self, + *, + parent_job_name: Optional[str] = None, + list_view_type: ListViewType = ListViewType.ACTIVE_ONLY, + **kwargs: Any, + ) -> Iterable[Job]: + """Lists jobs in the workspace. + + :keyword parent_job_name: When provided, only returns jobs that are children of the named job. Defaults to None, + listing all jobs in the workspace. + :paramtype parent_job_name: Optional[str] + :keyword list_view_type: The view type for including/excluding archived jobs. Defaults to + ~azure.mgt.machinelearningservices.models.ListViewType.ACTIVE_ONLY, excluding archived jobs. + :paramtype list_view_type: ~azure.mgmt.machinelearningservices.models.ListViewType + :return: An iterator-like instance of Job objects. + :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.Job] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_list] + :end-before: [END job_operations_list] + :language: python + :dedent: 8 + :caption: Retrieving a list of the archived jobs in a workspace with parent job named + "iris-dataset-jobs". + """ + + schedule_defined = kwargs.pop("schedule_defined", None) + scheduled_job_name = kwargs.pop("scheduled_job_name", None) + max_results = kwargs.pop("max_results", None) + + if parent_job_name: + parent_job = self.get(parent_job_name) + return self._runs_operations.get_run_children(parent_job.name, max_results=max_results) + + return cast( + Iterable[Job], + self.service_client_01_2024_preview.jobs.list( + self._operation_scope.resource_group_name, + self._workspace_name, + cls=lambda objs: [self._handle_rest_errors(obj) for obj in objs], + list_view_type=list_view_type, + scheduled=schedule_defined, + schedule_id=scheduled_job_name, + **self._kwargs, + **kwargs, + ), + ) + + def _handle_rest_errors(self, job_object: Union[JobBase, Run]) -> Optional[Job]: + """Handle errors while resolving azureml_id's during list operation. + + :param job_object: The REST object to turn into a Job + :type job_object: Union[JobBase, Run] + :return: The resolved job + :rtype: Optional[Job] + """ + try: + return self._resolve_azureml_id(Job._from_rest_object(job_object)) + except JobParsingError: + return None + + @distributed_trace + @monitor_with_telemetry_mixin(ops_logger, "Job.Get", ActivityType.PUBLICAPI) + def get(self, name: str) -> Job: + """Gets a job resource. + + :param name: The name of the job. + :type name: str + :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found. + :raises ~azure.ai.ml.exceptions.UserErrorException: Raised if the name parameter is not a string. + :return: Job object retrieved from the service. + :rtype: ~azure.ai.ml.entities.Job + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_get] + :end-before: [END job_operations_get] + :language: python + :dedent: 8 + :caption: Retrieving a job named "iris-dataset-job-1". + """ + if not isinstance(name, str): + raise UserErrorException(f"{name} is a invalid input for client.jobs.get().") + job_object = self._get_job(name) + + job: Any = None + if not _is_pipeline_child_job(job_object): + job = Job._from_rest_object(job_object) + if job_object.properties.job_type != RestJobType.AUTO_ML: + # resolvers do not work with the old contract, leave the ids as is + job = self._resolve_azureml_id(job) + else: + # Child jobs are no longer available through MFE, fetch + # through run history instead + job = self._runs_operations._translate_from_rest_object(self._runs_operations.get_run(name)) + + return job + + @distributed_trace + @monitor_with_telemetry_mixin(ops_logger, "Job.ShowServices", ActivityType.PUBLICAPI) + def show_services(self, name: str, node_index: int = 0) -> Optional[Dict[str, ServiceInstance]]: + """Gets services associated with a job's node. + + :param name: The name of the job. + :type name: str + :param node_index: The node's index (zero-based). Defaults to 0. + :type node_index: int + :return: The services associated with the job for the given node. + :rtype: dict[str, ~azure.ai.ml.entities.ServiceInstance] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_show_services] + :end-before: [END job_operations_show_services] + :language: python + :dedent: 8 + :caption: Retrieving the services associated with a job's 1st node. + """ + + service_instances_dict = self._runs_operations._operation.get_run_service_instances( + self._subscription_id, + self._operation_scope.resource_group_name, + self._workspace_name, + name, + node_index, + ) + if not service_instances_dict.instances: + return None + + return { + k: ServiceInstance._from_rest_object(v, node_index) for k, v in service_instances_dict.instances.items() + } + + @distributed_trace + @monitor_with_activity(ops_logger, "Job.Cancel", ActivityType.PUBLICAPI) + def begin_cancel(self, name: str, **kwargs: Any) -> LROPoller[None]: + """Cancels a job. + + :param name: The name of the job. + :type name: str + :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found. + :return: A poller to track the operation status. + :rtype: ~azure.core.polling.LROPoller[None] + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_begin_cancel] + :end-before: [END job_operations_begin_cancel] + :language: python + :dedent: 8 + :caption: Canceling the job named "iris-dataset-job-1" and checking the poller for status. + """ + tag = kwargs.pop("tag", None) + + if not tag: + return self._operation_2023_02_preview.begin_cancel( + id=name, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + **self._kwargs, + **kwargs, + ) + + # Note: Below batch cancel is experimental and for private usage + results = [] + jobs = self.list(tag=tag) + # TODO: Do we need to show error message when no jobs is returned for the given tag? + for job in jobs: + result = self._operation_2023_02_preview.begin_cancel( + id=job.name, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + **self._kwargs, + ) + results.append(result) + return results + + def _try_get_compute_arm_id(self, compute: Union[Compute, str]) -> Optional[Union[Compute, str]]: + # pylint: disable=too-many-return-statements + # TODO: Remove in PuP with native import job/component type support in MFE/Designer + # DataFactory 'clusterless' job + if str(compute) == ComputeType.ADF: + return compute + + if compute is not None: + # Singularity + if isinstance(compute, str) and is_singularity_id_for_resource(compute): + return self._virtual_cluster_operations.get(compute)["id"] + if isinstance(compute, str) and is_singularity_full_name_for_resource(compute): + return self._orchestrators._get_singularity_arm_id_from_full_name(compute) + if isinstance(compute, str) and is_singularity_short_name_for_resource(compute): + return self._orchestrators._get_singularity_arm_id_from_short_name(compute) + # other compute + if is_ARM_id_for_resource(compute, resource_type=AzureMLResourceType.COMPUTE): + # compute is not a sub-workspace resource + compute_name = compute.split("/")[-1] # type: ignore + elif isinstance(compute, Compute): + compute_name = compute.name + elif isinstance(compute, str): + compute_name = compute + elif isinstance(compute, PipelineInput): + compute_name = str(compute) + else: + raise ValueError( + "compute must be either an arm id of Compute, a Compute object or a compute name but" + f" got {type(compute)}" + ) + + if is_data_binding_expression(compute_name): + return compute_name + if compute_name == SERVERLESS_COMPUTE: + return compute_name + try: + return self._compute_operations.get(compute_name).id + except ResourceNotFoundError as e: + # the original error is not helpful (Operation returned an invalid status 'Not Found'), + # so we raise a more helpful one + response = e.response + response.reason = "Not found compute with name {}".format(compute_name) + raise ResourceNotFoundError(response=response) from e + return None + + @distributed_trace + @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.PUBLICAPI) + def validate(self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any) -> ValidationResult: + """Validates a Job object before submitting to the service. Anonymous assets may be created if there are inline + defined entities such as Component, Environment, and Code. Only pipeline jobs are supported for validation + currently. + + :param job: The job object to be validated. + :type job: ~azure.ai.ml.entities.Job + :keyword raise_on_failure: Specifies if an error should be raised if validation fails. Defaults to False. + :paramtype raise_on_failure: bool + :return: A ValidationResult object containing all found errors. + :rtype: ~azure.ai.ml.entities.ValidationResult + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_validate] + :end-before: [END job_operations_validate] + :language: python + :dedent: 8 + :caption: Validating a PipelineJob object and printing out the found errors. + """ + return self._validate(job, raise_on_failure=raise_on_failure, **kwargs) + + @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.INTERNALCALL) + def _validate( + self, + job: Job, + *, + raise_on_failure: bool = False, + # pylint:disable=unused-argument + **kwargs: Any, + ) -> ValidationResult: + """Implementation of validate. + + Add this function to avoid calling validate() directly in + create_or_update(), which will impact telemetry statistics & + bring experimental warning in create_or_update(). + + :param job: The job to validate + :type job: Job + :keyword raise_on_failure: Whether to raise on validation failure + :paramtype raise_on_failure: bool + :return: The validation result + :rtype: ValidationResult + """ + git_code_validation_result = PathAwareSchemaValidatableMixin._create_empty_validation_result() + # TODO: move this check to Job._validate after validation is supported for all job types + # If private features are enable and job has code value of type str we need to check + # that it is a valid git path case. Otherwise we should throw a ValidationException + # saying that the code value is not a valid code value + if ( + hasattr(job, "code") + and job.code is not None + and isinstance(job.code, str) + and job.code.startswith(GIT_PATH_PREFIX) + and not is_private_preview_enabled() + ): + git_code_validation_result.append_error( + message=f"Invalid code value: {job.code}. Git paths are not supported.", + yaml_path="code", + ) + + if not isinstance(job, PathAwareSchemaValidatableMixin): + + def error_func(msg: str, no_personal_data_msg: str) -> ValidationException: + return ValidationException( + message=msg, + no_personal_data_message=no_personal_data_msg, + error_target=ErrorTarget.JOB, + error_category=ErrorCategory.USER_ERROR, + ) + + return git_code_validation_result.try_raise( + raise_error=raise_on_failure, + error_func=error_func, + ) + + validation_result = job._validate(raise_error=raise_on_failure) + validation_result.merge_with(git_code_validation_result) + # fast return to avoid remote call if local validation not passed + # TODO: use remote call to validate the entire job after MFE API is ready + if validation_result.passed and isinstance(job, PipelineJob): + try: + job.compute = self._try_get_compute_arm_id(job.compute) + except Exception as e: # pylint: disable=W0718 + validation_result.append_error(yaml_path="compute", message=str(e)) + + for node_name, node in job.jobs.items(): + try: + # TODO(1979547): refactor, not all nodes have compute + if not isinstance(node, ControlFlowNode): + node.compute = self._try_get_compute_arm_id(node.compute) + except Exception as e: # pylint: disable=W0718 + validation_result.append_error(yaml_path=f"jobs.{node_name}.compute", message=str(e)) + + validation_result.resolve_location_for_diagnostics(str(job._source_path)) + return job._try_raise(validation_result, raise_error=raise_on_failure) # pylint: disable=protected-access + + @distributed_trace + @monitor_with_telemetry_mixin(ops_logger, "Job.CreateOrUpdate", ActivityType.PUBLICAPI) + def create_or_update( + self, + job: Job, + *, + description: Optional[str] = None, + compute: Optional[str] = None, + tags: Optional[dict] = None, + experiment_name: Optional[str] = None, + skip_validation: bool = False, + **kwargs: Any, + ) -> Job: + """Creates or updates a job. If entities such as Environment or Code are defined inline, they'll be created + together with the job. + + :param job: The job object. + :type job: ~azure.ai.ml.entities.Job + :keyword description: The job description. + :paramtype description: Optional[str] + :keyword compute: The compute target for the job. + :paramtype compute: Optional[str] + :keyword tags: The tags for the job. + :paramtype tags: Optional[dict] + :keyword experiment_name: The name of the experiment the job will be created under. If None is provided, + job will be created under experiment 'Default'. + :paramtype experiment_name: Optional[str] + :keyword skip_validation: Specifies whether or not to skip validation before creating or updating the job. Note + that validation for dependent resources such as an anonymous component will not be skipped. Defaults to + False. + :paramtype skip_validation: bool + :raises Union[~azure.ai.ml.exceptions.UserErrorException, ~azure.ai.ml.exceptions.ValidationException]: Raised + if Job cannot be successfully validated. Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.AssetException: Raised if Job assets + (e.g. Data, Code, Model, Environment) cannot be successfully validated. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.ModelException: Raised if Job model cannot be successfully validated. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.JobException: Raised if Job object or attributes correctly formatted. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty + directory. + :raises ~azure.ai.ml.exceptions.DockerEngineNotAvailableError: Raised if Docker Engine is not available for + local job. + :return: Created or updated job. + :rtype: ~azure.ai.ml.entities.Job + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_create_and_update] + :end-before: [END job_operations_create_and_update] + :language: python + :dedent: 8 + :caption: Creating a new job and then updating its compute. + """ + if isinstance(job, BaseNode) and not ( + isinstance(job, (Command, Spark)) + ): # Command/Spark objects can be used directly + job = job._to_job() + + # Set job properties before submission + if description is not None: + job.description = description + if compute is not None: + job.compute = compute + if tags is not None: + job.tags = tags + if experiment_name is not None: + job.experiment_name = experiment_name + + if job.compute == LOCAL_COMPUTE_TARGET: + job.environment_variables[COMMON_RUNTIME_ENV_VAR] = "true" # type: ignore + + # TODO: why we put log logic here instead of inside self._validate()? + try: + if not skip_validation: + self._validate(job, raise_on_failure=True) + + # Create all dependent resources + self._resolve_arm_id_or_upload_dependencies(job) + except (ValidationException, ValidationError) as ex: + log_and_raise_error(ex) + + git_props = get_git_properties() + # Do not add git props if they already exist in job properties. + # This is for update specifically-- if the user switches branches and tries to update + # their job, the request will fail since the git props will be repopulated. + # MFE does not allow existing properties to be updated, only for new props to be added + if not any(prop_name in job.properties for prop_name in git_props): + job.properties = {**job.properties, **git_props} + rest_job_resource = to_rest_job_object(job) + + # Make a copy of self._kwargs instead of contaminate the original one + kwargs = {**self._kwargs} + # set headers with user aml token if job is a pipeline or has a user identity setting + if (rest_job_resource.properties.job_type == RestJobType.PIPELINE) or ( + hasattr(rest_job_resource.properties, "identity") + and (isinstance(rest_job_resource.properties.identity, UserIdentity)) + ): + self._set_headers_with_user_aml_token(kwargs) + + result = self._create_or_update_with_different_version_api(rest_job_resource=rest_job_resource, **kwargs) + + if is_local_run(result): + ws_base_url = self._all_operations.all_operations[ + AzureMLResourceType.WORKSPACE + ]._operation._client._base_url + snapshot_id = start_run_if_local( + result, + self._credential, + ws_base_url, + self._requests_pipeline, + ) + # in case of local run, the first create/update call to MFE returns the + # request for submitting to ES. Once we request to ES and start the run, we + # need to put the same body to MFE to append user tags etc. + if rest_job_resource.properties.job_type == RestJobType.PIPELINE: + job_object = self._get_job_2401(rest_job_resource.name) + else: + job_object = self._get_job(rest_job_resource.name) + if result.properties.tags is not None: + for tag_name, tag_value in rest_job_resource.properties.tags.items(): + job_object.properties.tags[tag_name] = tag_value + if result.properties.properties is not None: + for ( + prop_name, + prop_value, + ) in rest_job_resource.properties.properties.items(): + job_object.properties.properties[prop_name] = prop_value + if snapshot_id is not None: + job_object.properties.properties["ContentSnapshotId"] = snapshot_id + + result = self._create_or_update_with_different_version_api(rest_job_resource=job_object, **kwargs) + + return self._resolve_azureml_id(Job._from_rest_object(result)) + + def _create_or_update_with_different_version_api(self, rest_job_resource: JobBase, **kwargs: Any) -> JobBase: + service_client_operation = self._operation_2023_02_preview + if rest_job_resource.properties.job_type == RestJobType_20241001Preview.FINE_TUNING: + service_client_operation = self.service_client_10_2024_preview.jobs + if rest_job_resource.properties.job_type == RestJobType.PIPELINE: + service_client_operation = self.service_client_01_2024_preview.jobs + if rest_job_resource.properties.job_type == RestJobType.AUTO_ML: + service_client_operation = self.service_client_01_2024_preview.jobs + if rest_job_resource.properties.job_type == RestJobType.SWEEP: + service_client_operation = self.service_client_01_2024_preview.jobs + if rest_job_resource.properties.job_type == RestJobType.COMMAND: + service_client_operation = self.service_client_01_2025_preview.jobs + + result = service_client_operation.create_or_update( + id=rest_job_resource.name, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + body=rest_job_resource, + **kwargs, + ) + + return result + + def _create_or_update_with_latest_version_api(self, rest_job_resource: JobBase, **kwargs: Any) -> JobBase: + service_client_operation = self.service_client_01_2024_preview.jobs + result = service_client_operation.create_or_update( + id=rest_job_resource.name, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + body=rest_job_resource, + **kwargs, + ) + + return result + + def _archive_or_restore(self, name: str, is_archived: bool) -> None: + job_object = self._get_job(name) + if job_object.properties.job_type == RestJobType.PIPELINE: + job_object = self._get_job_2401(name) + if _is_pipeline_child_job(job_object): + raise PipelineChildJobError(job_id=job_object.id) + job_object.properties.is_archived = is_archived + + self._create_or_update_with_different_version_api(rest_job_resource=job_object) + + @distributed_trace + @monitor_with_telemetry_mixin(ops_logger, "Job.Archive", ActivityType.PUBLICAPI) + def archive(self, name: str) -> None: + """Archives a job. + + :param name: The name of the job. + :type name: str + :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_archive] + :end-before: [END job_operations_archive] + :language: python + :dedent: 8 + :caption: Archiving a job. + """ + + self._archive_or_restore(name=name, is_archived=True) + + @distributed_trace + @monitor_with_telemetry_mixin(ops_logger, "Job.Restore", ActivityType.PUBLICAPI) + def restore(self, name: str) -> None: + """Restores an archived job. + + :param name: The name of the job. + :type name: str + :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_restore] + :end-before: [END job_operations_restore] + :language: python + :dedent: 8 + :caption: Restoring an archived job. + """ + + self._archive_or_restore(name=name, is_archived=False) + + @distributed_trace + @monitor_with_activity(ops_logger, "Job.Stream", ActivityType.PUBLICAPI) + def stream(self, name: str) -> None: + """Streams the logs of a running job. + + :param name: The name of the job. + :type name: str + :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_stream_logs] + :end-before: [END job_operations_stream_logs] + :language: python + :dedent: 8 + :caption: Streaming a running job. + """ + job_object = self._get_job(name) + + if _is_pipeline_child_job(job_object): + raise PipelineChildJobError(job_id=job_object.id) + + self._stream_logs_until_completion( + self._runs_operations, + job_object, + self._datastore_operations, + requests_pipeline=self._requests_pipeline, + ) + + @distributed_trace + @monitor_with_activity(ops_logger, "Job.Download", ActivityType.PUBLICAPI) + def download( + self, + name: str, + *, + download_path: Union[PathLike, str] = ".", + output_name: Optional[str] = None, + all: bool = False, # pylint: disable=redefined-builtin + ) -> None: + """Downloads the logs and output of a job. + + :param name: The name of a job. + :type name: str + :keyword download_path: The local path to be used as the download destination. Defaults to ".". + :paramtype download_path: Union[PathLike, str] + :keyword output_name: The name of the output to download. Defaults to None. + :paramtype output_name: Optional[str] + :keyword all: Specifies if all logs and named outputs should be downloaded. Defaults to False. + :paramtype all: bool + :raises ~azure.ai.ml.exceptions.JobException: Raised if Job is not yet in a terminal state. + Details will be provided in the error message. + :raises ~azure.ai.ml.exceptions.MlException: Raised if logs and outputs cannot be successfully downloaded. + Details will be provided in the error message. + + .. admonition:: Example: + + .. literalinclude:: ../samples/ml_samples_misc.py + :start-after: [START job_operations_download] + :end-before: [END job_operations_download] + :language: python + :dedent: 8 + :caption: Downloading all logs and named outputs of the job "job-1" into local directory "job-1-logs". + """ + job_details = self.get(name) + # job is reused, get reused job to download + if ( + job_details.properties.get(PipelineConstants.REUSED_FLAG_FIELD) == PipelineConstants.REUSED_FLAG_TRUE + and PipelineConstants.REUSED_JOB_ID in job_details.properties + ): + reused_job_name = job_details.properties[PipelineConstants.REUSED_JOB_ID] + reused_job_detail = self.get(reused_job_name) + module_logger.info( + "job %s reuses previous job %s, download from the reused job.", + name, + reused_job_name, + ) + name, job_details = reused_job_name, reused_job_detail + job_status = job_details.status + if job_status not in RunHistoryConstants.TERMINAL_STATUSES: + msg = "This job is in state {}. Download is allowed only in states {}".format( + job_status, RunHistoryConstants.TERMINAL_STATUSES + ) + raise JobException( + message=msg, + target=ErrorTarget.JOB, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + ) + + is_batch_job = ( + job_details.tags.get("azureml.batchrun", None) == "true" + and job_details.tags.get("azureml.jobtype", None) != PipelineConstants.PIPELINE_JOB_TYPE + ) + outputs = {} + download_path = Path(download_path) + artifact_directory_name = "artifacts" + output_directory_name = "named-outputs" + + def log_missing_uri(what: str) -> None: + module_logger.debug( + 'Could not download %s for job "%s" (job status: %s)', + what, + job_details.name, + job_details.status, + ) + + if isinstance(job_details, SweepJob): + best_child_run_id = job_details.properties.get(SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, None) + if best_child_run_id: + self.download( + best_child_run_id, + download_path=download_path, + output_name=output_name, + all=all, + ) + else: + log_missing_uri(what="from best child run") + + if output_name: + # Don't need to download anything from the parent + return + # only download default artifacts (logs + default outputs) from parent + artifact_directory_name = "hd-artifacts" + output_name = None + all = False + + if is_batch_job: + scoring_uri = self._get_batch_job_scoring_output_uri(job_details.name) + if scoring_uri: + outputs = {BATCH_JOB_CHILD_RUN_OUTPUT_NAME: scoring_uri} + else: + log_missing_uri("batch job scoring file") + elif output_name: + outputs = self._get_named_output_uri(name, output_name) + + if output_name not in outputs: + log_missing_uri(what=f'output "{output_name}"') + elif all: + outputs = self._get_named_output_uri(name) + + if DEFAULT_ARTIFACT_STORE_OUTPUT_NAME not in outputs: + log_missing_uri(what="logs") + else: + outputs = self._get_named_output_uri(name, DEFAULT_ARTIFACT_STORE_OUTPUT_NAME) + + if DEFAULT_ARTIFACT_STORE_OUTPUT_NAME not in outputs: + log_missing_uri(what="logs") + + # Download all requested artifacts + for item_name, uri in outputs.items(): + if is_batch_job: + destination = download_path + elif item_name == DEFAULT_ARTIFACT_STORE_OUTPUT_NAME: + destination = download_path / artifact_directory_name + else: + destination = download_path / output_directory_name / item_name + + module_logger.info("Downloading artifact %s to %s", uri, destination) + download_artifact_from_aml_uri( + uri=uri, + destination=destination, # type: ignore[arg-type] + datastore_operation=self._datastore_operations, + ) + + def _get_named_output_uri( + self, job_name: Optional[str], output_names: Optional[Union[Iterable[str], str]] = None + ) -> Dict[str, str]: + """Gets the URIs to the specified named outputs of job. + + :param job_name: Run ID of the job + :type job_name: str + :param output_names: Either an output name, or an iterable of output names. If omitted, all outputs are + returned. + :type output_names: Optional[Union[Iterable[str], str]] + :return: Map of output_names to URIs. Note that URIs that could not be found will not be present in the map. + :rtype: Dict[str, str] + """ + + if isinstance(output_names, str): + output_names = {output_names} + elif output_names: + output_names = set(output_names) + + outputs = get_job_output_uris_from_dataplane( + job_name, + self._runs_operations, + self._dataset_dataplane_operations, + self._model_dataplane_operations, + output_names=output_names, + ) + + missing_outputs: Set = set() + if output_names is not None: + missing_outputs = set(output_names).difference(outputs.keys()) + else: + missing_outputs = set().difference(outputs.keys()) + + # Include default artifact store in outputs + if (not output_names) or DEFAULT_ARTIFACT_STORE_OUTPUT_NAME in missing_outputs: + try: + job = self.get(job_name) + artifact_store_uri = job.outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] + if artifact_store_uri is not None and artifact_store_uri.path: + outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] = artifact_store_uri.path + except (AttributeError, KeyError): + outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] = SHORT_URI_FORMAT.format( + "workspaceartifactstore", f"ExperimentRun/dcid.{job_name}/" + ) + missing_outputs.discard(DEFAULT_ARTIFACT_STORE_OUTPUT_NAME) + + # A job's output is not always reported in the outputs dict, but + # doesn't currently have a user configurable location. + # Perform a search of known paths to find output + # TODO: Remove once job output locations are reliably returned from the service + + default_datastore = self._datastore_operations.get_default().name + + for name in missing_outputs: + potential_uris = [ + SHORT_URI_FORMAT.format(default_datastore, f"azureml/{job_name}/{name}/"), + SHORT_URI_FORMAT.format(default_datastore, f"dataset/{job_name}/{name}/"), + ] + + for potential_uri in potential_uris: + if aml_datastore_path_exists(potential_uri, self._datastore_operations): + outputs[name] = potential_uri + break + + return outputs + + def _get_batch_job_scoring_output_uri(self, job_name: str) -> Optional[str]: + uri = None + # Download scoring output, which is the "score" output of the child job named "batchscoring" + # Batch Jobs are pipeline jobs with only one child, so this should terminate after an iteration + for child in self._runs_operations.get_run_children(job_name): + uri = self._get_named_output_uri(child.name, BATCH_JOB_CHILD_RUN_OUTPUT_NAME).get( + BATCH_JOB_CHILD_RUN_OUTPUT_NAME, None + ) + # After the correct child is found, break to prevent unnecessary looping + if uri is not None: + break + return uri + + def _get_job(self, name: str) -> JobBase: + job = self.service_client_01_2024_preview.jobs.get( + id=name, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + **self._kwargs, + ) + + if ( + hasattr(job, "properties") + and job.properties + and hasattr(job.properties, "job_type") + and job.properties.job_type == RestJobType_20241001Preview.FINE_TUNING + ): + return self.service_client_10_2024_preview.jobs.get( + id=name, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + **self._kwargs, + ) + + return job + + # Upgrade api from 2023-04-01-preview to 2024-01-01-preview for pipeline job + # We can remove this function once `_get_job` function has also been upgraded to the same version with pipeline + def _get_job_2401(self, name: str) -> JobBase_2401: + service_client_operation = self.service_client_01_2024_preview.jobs + return service_client_operation.get( + id=name, + resource_group_name=self._operation_scope.resource_group_name, + workspace_name=self._workspace_name, + **self._kwargs, + ) + + def _get_workspace_url(self, url_key: WorkspaceDiscoveryUrlKey) -> str: + discovery_url = ( + self._all_operations.all_operations[AzureMLResourceType.WORKSPACE] + .get(self._operation_scope.workspace_name) + .discovery_url + ) + all_urls = json.loads( + download_text_from_url( + discovery_url, + create_requests_pipeline_with_retry(requests_pipeline=self._requests_pipeline), + ) + ) + return all_urls[url_key] + + def _resolve_arm_id_or_upload_dependencies(self, job: Job) -> None: + """This method converts name or name:version to ARM id. Or it + registers/uploads nested dependencies. + + :param job: the job resource entity + :type job: Job + :return: the job resource entity that nested dependencies are resolved + :rtype: Job + """ + + self._resolve_arm_id_or_azureml_id(job, self._orchestrators.get_asset_arm_id) + + if isinstance(job, PipelineJob): + # Resolve top-level inputs + self._resolve_job_inputs(self._flatten_group_inputs(job.inputs), job._base_path) + # inputs in sub-pipelines has been resolved in + # self._resolve_arm_id_or_azureml_id(job, self._orchestrators.get_asset_arm_id) + # as they are part of the pipeline component + elif isinstance(job, AutoMLJob): + self._resolve_automl_job_inputs(job) + elif isinstance(job, FineTuningJob): + self._resolve_finetuning_job_inputs(job) + elif isinstance(job, DistillationJob): + self._resolve_distillation_job_inputs(job) + elif isinstance(job, Spark): + self._resolve_job_inputs(job._job_inputs.values(), job._base_path) + elif isinstance(job, Command): + # TODO: switch to use inputs of Command objects, once the inputs/outputs building + # logic is removed from the BaseNode constructor. + try: + self._resolve_job_inputs(job._job_inputs.values(), job._base_path) + except AttributeError: + # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs + pass + else: + try: + self._resolve_job_inputs(job.inputs.values(), job._base_path) # type: ignore + except AttributeError: + # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs + pass + + def _resolve_automl_job_inputs(self, job: AutoMLJob) -> None: + """This method resolves the inputs for AutoML jobs. + + :param job: the job resource entity + :type job: AutoMLJob + """ + if isinstance(job, AutoMLJob): + self._resolve_job_input(job.training_data, job._base_path) + if job.validation_data is not None: + self._resolve_job_input(job.validation_data, job._base_path) + if hasattr(job, "test_data") and job.test_data is not None: + self._resolve_job_input(job.test_data, job._base_path) + + def _resolve_finetuning_job_inputs(self, job: FineTuningJob) -> None: + """This method resolves the inputs for FineTuning jobs. + + :param job: the job resource entity + :type job: FineTuningJob + """ + from azure.ai.ml.entities._job.finetuning.finetuning_vertical import FineTuningVertical + + if isinstance(job, FineTuningVertical): + # self._resolve_job_input(job.model, job._base_path) + self._resolve_job_input(job.training_data, job._base_path) + if job.validation_data is not None: + self._resolve_job_input(job.validation_data, job._base_path) + + def _resolve_distillation_job_inputs(self, job: DistillationJob) -> None: + """This method resolves the inputs for Distillation jobs. + + :param job: the job resource entity + :type job: DistillationJob + """ + if isinstance(job, DistillationJob): + if job.training_data: + self._resolve_job_input(job.training_data, job._base_path) + if job.validation_data is not None: + self._resolve_job_input(job.validation_data, job._base_path) + + def _resolve_azureml_id(self, job: Job) -> Job: + """This method converts ARM id to name or name:version for nested + entities. + + :param job: the job resource entity + :type job: Job + :return: the job resource entity that nested dependencies are resolved + :rtype: Job + """ + self._append_tid_to_studio_url(job) + self._resolve_job_inputs_arm_id(job) + return self._resolve_arm_id_or_azureml_id(job, self._orchestrators.resolve_azureml_id) + + def _resolve_compute_id(self, resolver: _AssetResolver, target: Any) -> Any: + # special case for local runs + if target is not None and target.lower() == LOCAL_COMPUTE_TARGET: + return LOCAL_COMPUTE_TARGET + try: + modified_target_name = target + if target.lower().startswith(AzureMLResourceType.VIRTUALCLUSTER + "/"): + # Compute target can be either workspace-scoped compute type, + # or AML scoped VC. In the case of VC, resource name will be of form + # azureml:virtualClusters/<name> to disambiguate from azureml:name (which is always compute) + modified_target_name = modified_target_name[len(AzureMLResourceType.VIRTUALCLUSTER) + 1 :] + modified_target_name = LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT.format( + self._operation_scope.subscription_id, + self._operation_scope.resource_group_name, + AZUREML_RESOURCE_PROVIDER, + AzureMLResourceType.VIRTUALCLUSTER, + modified_target_name, + ) + return resolver( + modified_target_name, + azureml_type=AzureMLResourceType.VIRTUALCLUSTER, + sub_workspace_resource=False, + ) + except Exception: # pylint: disable=W0718 + return resolver(target, azureml_type=AzureMLResourceType.COMPUTE) + + def _resolve_job_inputs(self, entries: Iterable[Union[Input, str, bool, int, float]], base_path: str) -> None: + """resolve job inputs as ARM id or remote url. + + :param entries: An iterable of job inputs + :type entries: Iterable[Union[Input, str, bool, int, float]] + :param base_path: The base path + :type base_path: str + """ + for entry in entries: + self._resolve_job_input(entry, base_path) + + # TODO: move it to somewhere else? + @classmethod + def _flatten_group_inputs( + cls, inputs: Dict[str, Union[Input, str, bool, int, float]] + ) -> List[Union[Input, str, bool, int, float]]: + """Get flatten values from an InputDict. + + :param inputs: The input dict + :type inputs: Dict[str, Union[Input, str, bool, int, float]] + :return: A list of values from the Input Dict + :rtype: List[Union[Input, str, bool, int, float]] + """ + input_values: List = [] + # Flatten inputs for pipeline job + for key, item in inputs.items(): + if isinstance(item, _GroupAttrDict): + input_values.extend(item.flatten(group_parameter_name=key)) + else: + if not isinstance(item, (str, bool, int, float)): + # skip resolving inferred optional input without path (in do-while + dynamic input case) + if isinstance(item._data, Input): # type: ignore + if not item._data.path and item._meta._is_inferred_optional: # type: ignore + continue + input_values.append(item._data) # type: ignore + return input_values + + def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_path: str) -> None: + """resolve job input as ARM id or remote url. + + :param entry: The job input + :type entry: Union[Input, str, bool, int, float] + :param base_path: The base path + :type base_path: str + """ + + # path can be empty if the job was created from builder functions + if isinstance(entry, Input) and not entry.path: + msg = "Input path can't be empty for jobs." + raise ValidationException( + message=msg, + target=ErrorTarget.JOB, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.MISSING_FIELD, + ) + + if ( + not isinstance(entry, Input) + or is_ARM_id_for_resource(entry.path) + or is_url(entry.path) + or is_data_binding_expression(entry.path) # literal value but set mode in pipeline yaml + ): # Literal value, ARM id or remote url. Pass through + return + try: + datastore_name = ( + entry.datastore if hasattr(entry, "datastore") and entry.datastore else WORKSPACE_BLOB_STORE + ) + + # absolute local path, upload, transform to remote url + if os.path.isabs(entry.path): # type: ignore + if entry.type == AssetTypes.URI_FOLDER and not os.path.isdir(entry.path): # type: ignore + raise ValidationException( + message="There is no dir on target path: {}".format(entry.path), + target=ErrorTarget.JOB, + no_personal_data_message="There is no dir on target path", + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND, + ) + if entry.type == AssetTypes.URI_FILE and not os.path.isfile(entry.path): # type: ignore + raise ValidationException( + message="There is no file on target path: {}".format(entry.path), + target=ErrorTarget.JOB, + no_personal_data_message="There is no file on target path", + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND, + ) + # absolute local path + entry.path = _upload_and_generate_remote_uri( + self._operation_scope, + self._datastore_operations, + entry.path, + datastore_name=datastore_name, + show_progress=self._show_progress, + ) + # TODO : Move this part to a common place + if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"): + entry.path = entry.path + "/" + # Check for AzureML id, is there a better way? + elif ":" in entry.path or "@" in entry.path: # type: ignore + asset_type = AzureMLResourceType.DATA + if entry.type in [AssetTypes.MLFLOW_MODEL, AssetTypes.CUSTOM_MODEL]: + asset_type = AzureMLResourceType.MODEL + + entry.path = self._orchestrators.get_asset_arm_id(entry.path, asset_type) # type: ignore + else: # relative local path, upload, transform to remote url + # Base path will be None for dsl pipeline component for now. We have 2 choices if the dsl pipeline + # function is imported from another file: + # 1) Use cwd as default base path; + # 2) Use the file path of the dsl pipeline function as default base path. + # Pick solution 1 for now as defining input path in the script to submit is a more common scenario. + local_path = Path(base_path or Path.cwd(), entry.path).resolve() # type: ignore + entry.path = _upload_and_generate_remote_uri( + self._operation_scope, + self._datastore_operations, + local_path, + datastore_name=datastore_name, + show_progress=self._show_progress, + ) + # TODO : Move this part to a common place + if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"): + entry.path = entry.path + "/" + except (MlException, HttpResponseError) as e: + raise e + except Exception as e: + raise ValidationException( + message=f"Supported input path value are ARM id, AzureML id, remote uri or local path.\n" + f"Met {type(e)}:\n{e}", + target=ErrorTarget.JOB, + no_personal_data_message=( + "Supported input path value are ARM id, AzureML id, " "remote uri or local path." + ), + error=e, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) from e + + def _resolve_job_inputs_arm_id(self, job: Job) -> None: + try: + inputs: Dict[str, Union[Input, InputOutputBase, str, bool, int, float]] = job.inputs # type: ignore + for _, entry in inputs.items(): + if isinstance(entry, InputOutputBase): + # extract original input form input builder. + entry = entry._data + if not isinstance(entry, Input) or is_url(entry.path): # Literal value or remote url + continue + # ARM id + entry.path = self._orchestrators.resolve_azureml_id(entry.path) + + except AttributeError: + # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs + pass + + def _resolve_arm_id_or_azureml_id(self, job: Job, resolver: Union[Callable, _AssetResolver]) -> Job: + """Resolve arm_id for a given job. + + + :param job: The job + :type job: Job + :param resolver: The asset resolver function + :type resolver: _AssetResolver + :return: The provided job, with fields resolved to full ARM IDs + :rtype: Job + """ + # TODO: this will need to be parallelized when multiple tasks + # are required. Also consider the implications for dependencies. + + if isinstance(job, _BaseJob): + job.compute = self._resolve_compute_id(resolver, job.compute) + elif isinstance(job, Command): + job = self._resolve_arm_id_for_command_job(job, resolver) + elif isinstance(job, ImportJob): + job = self._resolve_arm_id_for_import_job(job, resolver) + elif isinstance(job, Spark): + job = self._resolve_arm_id_for_spark_job(job, resolver) + elif isinstance(job, ParallelJob): + job = self._resolve_arm_id_for_parallel_job(job, resolver) + elif isinstance(job, SweepJob): + job = self._resolve_arm_id_for_sweep_job(job, resolver) + elif isinstance(job, AutoMLJob): + job = self._resolve_arm_id_for_automl_job(job, resolver, inside_pipeline=False) + elif isinstance(job, PipelineJob): + job = self._resolve_arm_id_for_pipeline_job(job, resolver) + elif isinstance(job, FineTuningJob): + pass + elif isinstance(job, DistillationJob): + pass + else: + msg = f"Non supported job type: {type(job)}" + raise ValidationException( + message=msg, + target=ErrorTarget.JOB, + no_personal_data_message=msg, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + return job + + def _resolve_arm_id_for_command_job(self, job: Command, resolver: _AssetResolver) -> Command: + """Resolve arm_id for CommandJob. + + + :param job: The Command job + :type job: Command + :param resolver: The asset resolver function + :type resolver: _AssetResolver + :return: The provided Command job, with resolved fields + :rtype: Command + """ + if job.code is not None and is_registry_id_for_resource(job.code): + msg = "Format not supported for code asset: {}" + raise ValidationException( + message=msg.format(job.code), + target=ErrorTarget.JOB, + no_personal_data_message=msg.format("[job.code]"), + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE): + job.code = resolver( # type: ignore + Code(base_path=job._base_path, path=job.code), + azureml_type=AzureMLResourceType.CODE, + ) + job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT) + job.compute = self._resolve_compute_id(resolver, job.compute) + return job + + def _resolve_arm_id_for_spark_job(self, job: Spark, resolver: _AssetResolver) -> Spark: + """Resolve arm_id for SparkJob. + + :param job: The Spark job + :type job: Spark + :param resolver: The asset resolver function + :type resolver: _AssetResolver + :return: The provided SparkJob, with resolved fields + :rtype: Spark + """ + if job.code is not None and is_registry_id_for_resource(job.code): + msg = "Format not supported for code asset: {}" + raise JobException( + message=msg.format(job.code), + target=ErrorTarget.JOB, + no_personal_data_message=msg.format("[job.code]"), + error_category=ErrorCategory.USER_ERROR, + ) + + if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE): + job.code = resolver( # type: ignore + Code(base_path=job._base_path, path=job.code), + azureml_type=AzureMLResourceType.CODE, + ) + job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT) + job.compute = self._resolve_compute_id(resolver, job.compute) + return job + + def _resolve_arm_id_for_import_job(self, job: ImportJob, resolver: _AssetResolver) -> ImportJob: + """Resolve arm_id for ImportJob. + + :param job: The Import job + :type job: ImportJob + :param resolver: The asset resolver function + :type resolver: _AssetResolver + :return: The provided ImportJob, with resolved fields + :rtype: ImportJob + """ + # compute property will be no longer applicable once import job type is ready on MFE in PuP + # for PrP, we use command job type instead for import job where compute property is required + # However, MFE only validates compute resource url format. Execution service owns the real + # validation today but supports reserved compute names like AmlCompute, ContainerInstance and + # DataFactory here for 'clusterless' jobs + job.compute = self._resolve_compute_id(resolver, ComputeType.ADF) + return job + + def _resolve_arm_id_for_parallel_job(self, job: ParallelJob, resolver: _AssetResolver) -> ParallelJob: + """Resolve arm_id for ParallelJob. + + :param job: The Parallel job + :type job: ParallelJob + :param resolver: The asset resolver function + :type resolver: _AssetResolver + :return: The provided ParallelJob, with resolved fields + :rtype: ParallelJob + """ + if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE): # type: ignore + job.code = resolver( # type: ignore + Code(base_path=job._base_path, path=job.code), # type: ignore + azureml_type=AzureMLResourceType.CODE, + ) + job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT) # type: ignore + job.compute = self._resolve_compute_id(resolver, job.compute) + return job + + def _resolve_arm_id_for_sweep_job(self, job: SweepJob, resolver: _AssetResolver) -> SweepJob: + """Resolve arm_id for SweepJob. + + :param job: The Sweep job + :type job: SweepJob + :param resolver: The asset resolver function + :type resolver: _AssetResolver + :return: The provided SweepJob, with resolved fields + :rtype: SweepJob + """ + if ( + job.trial is not None + and job.trial.code is not None + and not is_ARM_id_for_resource(job.trial.code, AzureMLResourceType.CODE) + ): + job.trial.code = resolver( # type: ignore[assignment] + Code(base_path=job._base_path, path=job.trial.code), + azureml_type=AzureMLResourceType.CODE, + ) + if ( + job.trial is not None + and job.trial.environment is not None + and not is_ARM_id_for_resource(job.trial.environment, AzureMLResourceType.ENVIRONMENT) + ): + job.trial.environment = resolver( # type: ignore[assignment] + job.trial.environment, azureml_type=AzureMLResourceType.ENVIRONMENT + ) + job.compute = self._resolve_compute_id(resolver, job.compute) + return job + + def _resolve_arm_id_for_automl_job( + self, job: AutoMLJob, resolver: _AssetResolver, inside_pipeline: bool + ) -> AutoMLJob: + """Resolve arm_id for AutoMLJob. + + :param job: The AutoML job + :type job: AutoMLJob + :param resolver: The asset resolver function + :type resolver: _AssetResolver + :param inside_pipeline: Whether the job is within a pipeline + :type inside_pipeline: bool + :return: The provided AutoMLJob, with resolved fields + :rtype: AutoMLJob + """ + # AutoML does not have dependency uploads. Only need to resolve reference to arm id. + + # automl node in pipeline has optional compute + if inside_pipeline and job.compute is None: + return job + job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE) + return job + + def _resolve_arm_id_for_pipeline_job(self, pipeline_job: PipelineJob, resolver: _AssetResolver) -> PipelineJob: + """Resolve arm_id for pipeline_job. + + :param pipeline_job: The pipeline job + :type pipeline_job: PipelineJob + :param resolver: The asset resolver function + :type resolver: _AssetResolver + :return: The provided PipelineJob, with resolved fields + :rtype: PipelineJob + """ + # Get top-level job compute + _get_job_compute_id(pipeline_job, resolver) + + # Process job defaults: + if pipeline_job.settings: + pipeline_job.settings.default_datastore = resolver( + pipeline_job.settings.default_datastore, + azureml_type=AzureMLResourceType.DATASTORE, + ) + pipeline_job.settings.default_compute = resolver( + pipeline_job.settings.default_compute, + azureml_type=AzureMLResourceType.COMPUTE, + ) + + # Process each component job + try: + self._component_operations._resolve_dependencies_for_pipeline_component_jobs( + pipeline_job.component, resolver + ) + except ComponentException as e: + raise JobException( + message=e.message, + target=ErrorTarget.JOB, + no_personal_data_message=e.no_personal_data_message, + error_category=e.error_category, + ) from e + + # Create a pipeline component for pipeline job if user specified component in job yaml. + if ( + not isinstance(pipeline_job.component, str) + and getattr(pipeline_job.component, "_source", None) == ComponentSource.YAML_COMPONENT + ): + pipeline_job.component = resolver( # type: ignore + pipeline_job.component, + azureml_type=AzureMLResourceType.COMPONENT, + ) + + return pipeline_job + + def _append_tid_to_studio_url(self, job: Job) -> None: + """Appends the user's tenant ID to the end of the studio URL. + + Allows the UI to authenticate against the correct tenant. + + :param job: The job + :type job: Job + """ + try: + if job.services is not None: + studio_endpoint = job.services.get("Studio", None) + studio_url = studio_endpoint.endpoint + default_scopes = _resource_to_scopes(_get_base_url_from_metadata()) + module_logger.debug("default_scopes used: `%s`\n", default_scopes) + # Extract the tenant id from the credential using PyJWT + decode = jwt.decode( + self._credential.get_token(*default_scopes).token, + options={"verify_signature": False, "verify_aud": False}, + ) + tid = decode["tid"] + formatted_tid = TID_FMT.format(tid) + studio_endpoint.endpoint = studio_url + formatted_tid + except Exception: # pylint: disable=W0718 + module_logger.info("Proceeding with no tenant id appended to studio URL\n") + + def _set_headers_with_user_aml_token(self, kwargs: Any) -> None: + aml_resource_id = _get_aml_resource_id_from_metadata() + azure_ml_scopes = _resource_to_scopes(aml_resource_id) + module_logger.debug("azure_ml_scopes used: `%s`\n", azure_ml_scopes) + aml_token = self._credential.get_token(*azure_ml_scopes).token + # validate token has aml audience + decoded_token = jwt.decode( + aml_token, + options={"verify_signature": False, "verify_aud": False}, + ) + if decoded_token.get("aud") != aml_resource_id: + msg = """AAD token with aml scope could not be fetched using the credentials being used. + Please validate if token with {0} scope can be fetched using credentials provided to MLClient. + Token with {0} scope can be fetched using credentials.get_token({0}) + """ + raise ValidationException( + message=msg.format(*azure_ml_scopes), + target=ErrorTarget.JOB, + error_type=ValidationErrorType.RESOURCE_NOT_FOUND, + no_personal_data_message=msg.format("[job.code]"), + error_category=ErrorCategory.USER_ERROR, + ) + + headers = kwargs.pop("headers", {}) + headers["x-azureml-token"] = aml_token + kwargs["headers"] = headers + + +def _get_job_compute_id(job: Union[Job, Command], resolver: _AssetResolver) -> None: + job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE) |