aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py1677
1 files changed, 1677 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py
new file mode 100644
index 00000000..0003c8cd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py
@@ -0,0 +1,1677 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access, too-many-instance-attributes, too-many-statements, too-many-lines
+import json
+import os.path
+from os import PathLike
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Set, Union, cast
+
+import jwt
+from marshmallow import ValidationError
+
+from azure.ai.ml._artifacts._artifact_utilities import (
+ _upload_and_generate_remote_uri,
+ aml_datastore_path_exists,
+ download_artifact_from_aml_uri,
+)
+from azure.ai.ml._azure_environments import (
+ _get_aml_resource_id_from_metadata,
+ _get_base_url_from_metadata,
+ _resource_to_scopes,
+)
+from azure.ai.ml._exception_helper import log_and_raise_error
+from azure.ai.ml._restclient.dataset_dataplane import AzureMachineLearningWorkspaces as ServiceClientDatasetDataplane
+from azure.ai.ml._restclient.model_dataplane import AzureMachineLearningWorkspaces as ServiceClientModelDataplane
+from azure.ai.ml._restclient.runhistory import AzureMachineLearningWorkspaces as ServiceClientRunHistory
+from azure.ai.ml._restclient.runhistory.models import Run
+from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient022023Preview
+from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase, ListViewType, UserIdentity
+from azure.ai.ml._restclient.v2023_08_01_preview.models import JobType as RestJobType
+from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase as JobBase_2401
+from azure.ai.ml._restclient.v2024_10_01_preview.models import JobType as RestJobType_20241001Preview
+from azure.ai.ml._scope_dependent_operations import (
+ OperationConfig,
+ OperationsContainer,
+ OperationScope,
+ _ScopeDependentOperations,
+)
+from azure.ai.ml._telemetry import ActivityType, monitor_with_activity, monitor_with_telemetry_mixin
+from azure.ai.ml._utils._http_utils import HttpPipeline
+from azure.ai.ml._utils._logger_utils import OpsLogger
+from azure.ai.ml._utils.utils import (
+ create_requests_pipeline_with_retry,
+ download_text_from_url,
+ is_data_binding_expression,
+ is_private_preview_enabled,
+ is_url,
+)
+from azure.ai.ml.constants._common import (
+ AZUREML_RESOURCE_PROVIDER,
+ BATCH_JOB_CHILD_RUN_OUTPUT_NAME,
+ COMMON_RUNTIME_ENV_VAR,
+ DEFAULT_ARTIFACT_STORE_OUTPUT_NAME,
+ GIT_PATH_PREFIX,
+ LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT,
+ LOCAL_COMPUTE_TARGET,
+ SERVERLESS_COMPUTE,
+ SHORT_URI_FORMAT,
+ SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME,
+ TID_FMT,
+ AssetTypes,
+ AzureMLResourceType,
+ WorkspaceDiscoveryUrlKey,
+)
+from azure.ai.ml.constants._compute import ComputeType
+from azure.ai.ml.constants._job.pipeline import PipelineConstants
+from azure.ai.ml.entities import Compute, Job, PipelineJob, ServiceInstance, ValidationResult
+from azure.ai.ml.entities._assets._artifacts.code import Code
+from azure.ai.ml.entities._builders import BaseNode, Command, Spark
+from azure.ai.ml.entities._datastore._constants import WORKSPACE_BLOB_STORE
+from azure.ai.ml.entities._inputs_outputs import Input
+from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+from azure.ai.ml.entities._job.base_job import _BaseJob
+from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob
+from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob
+from azure.ai.ml.entities._job.import_job import ImportJob
+from azure.ai.ml.entities._job.job import _is_pipeline_child_job
+from azure.ai.ml.entities._job.parallel.parallel_job import ParallelJob
+from azure.ai.ml.entities._job.to_rest_functions import to_rest_job_object
+from azure.ai.ml.entities._validation import PathAwareSchemaValidatableMixin
+from azure.ai.ml.exceptions import (
+ ComponentException,
+ ErrorCategory,
+ ErrorTarget,
+ JobException,
+ JobParsingError,
+ MlException,
+ PipelineChildJobError,
+ UserErrorException,
+ ValidationErrorType,
+ ValidationException,
+)
+from azure.ai.ml.operations._run_history_constants import RunHistoryConstants
+from azure.ai.ml.sweep import SweepJob
+from azure.core.credentials import TokenCredential
+from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
+from azure.core.polling import LROPoller
+from azure.core.tracing.decorator import distributed_trace
+
+from ..constants._component import ComponentSource
+from ..entities._builders.control_flow_node import ControlFlowNode
+from ..entities._job.pipeline._io import InputOutputBase, PipelineInput, _GroupAttrDict
+from ._component_operations import ComponentOperations
+from ._compute_operations import ComputeOperations
+from ._dataset_dataplane_operations import DatasetDataplaneOperations
+from ._job_ops_helper import get_git_properties, get_job_output_uris_from_dataplane, stream_logs_until_completion
+from ._local_job_invoker import is_local_run, start_run_if_local
+from ._model_dataplane_operations import ModelDataplaneOperations
+from ._operation_orchestrator import (
+ OperationOrchestrator,
+ _AssetResolver,
+ is_ARM_id_for_resource,
+ is_registry_id_for_resource,
+ is_singularity_full_name_for_resource,
+ is_singularity_id_for_resource,
+ is_singularity_short_name_for_resource,
+)
+from ._run_operations import RunOperations
+from ._virtual_cluster_operations import VirtualClusterOperations
+
+try:
+ pass
+except ImportError:
+ pass
+
+if TYPE_CHECKING:
+ from azure.ai.ml.operations import DatastoreOperations
+
+ops_logger = OpsLogger(__name__)
+module_logger = ops_logger.module_logger
+
+
+class JobOperations(_ScopeDependentOperations):
+ """Initiates an instance of JobOperations
+
+ This class should not be instantiated directly. Instead, use the `jobs` attribute of an MLClient object.
+
+ :param operation_scope: Scope variables for the operations classes of an MLClient object.
+ :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
+ :param operation_config: Common configuration for operations classes of an MLClient object.
+ :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
+ :param service_client_02_2023_preview: Service client to allow end users to operate on Azure Machine Learning
+ Workspace resources.
+ :type service_client_02_2023_preview: ~azure.ai.ml._restclient.v2023_02_01_preview.AzureMachineLearningWorkspaces
+ :param all_operations: All operations classes of an MLClient object.
+ :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer
+ :param credential: Credential to use for authentication.
+ :type credential: ~azure.core.credentials.TokenCredential
+ """
+
+ def __init__(
+ self,
+ operation_scope: OperationScope,
+ operation_config: OperationConfig,
+ service_client_02_2023_preview: ServiceClient022023Preview,
+ all_operations: OperationsContainer,
+ credential: TokenCredential,
+ **kwargs: Any,
+ ) -> None:
+ super(JobOperations, self).__init__(operation_scope, operation_config)
+ ops_logger.update_filter()
+
+ self._operation_2023_02_preview = service_client_02_2023_preview.jobs
+ self._service_client = service_client_02_2023_preview
+ self._all_operations = all_operations
+ self._stream_logs_until_completion = stream_logs_until_completion
+ # Dataplane service clients are lazily created as they are needed
+ self._runs_operations_client: Optional[RunOperations] = None
+ self._dataset_dataplane_operations_client: Optional[DatasetDataplaneOperations] = None
+ self._model_dataplane_operations_client: Optional[ModelDataplaneOperations] = None
+ # Kwargs to propagate to dataplane service clients
+ self._service_client_kwargs = kwargs.pop("_service_client_kwargs", {})
+ self._api_base_url: Optional[str] = None
+ self._container = "azureml"
+ self._credential = credential
+ self._orchestrators = OperationOrchestrator(self._all_operations, self._operation_scope, self._operation_config)
+
+ self.service_client_01_2024_preview = kwargs.pop("service_client_01_2024_preview", None)
+ self.service_client_10_2024_preview = kwargs.pop("service_client_10_2024_preview", None)
+ self.service_client_01_2025_preview = kwargs.pop("service_client_01_2025_preview", None)
+ self._kwargs = kwargs
+
+ self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline")
+
+ @property
+ def _component_operations(self) -> ComponentOperations:
+ return cast(
+ ComponentOperations,
+ self._all_operations.get_operation( # type: ignore[misc]
+ AzureMLResourceType.COMPONENT, lambda x: isinstance(x, ComponentOperations)
+ ),
+ )
+
+ @property
+ def _compute_operations(self) -> ComputeOperations:
+ return cast(
+ ComputeOperations,
+ self._all_operations.get_operation( # type: ignore[misc]
+ AzureMLResourceType.COMPUTE, lambda x: isinstance(x, ComputeOperations)
+ ),
+ )
+
+ @property
+ def _virtual_cluster_operations(self) -> VirtualClusterOperations:
+ return cast(
+ VirtualClusterOperations,
+ self._all_operations.get_operation( # type: ignore[misc]
+ AzureMLResourceType.VIRTUALCLUSTER,
+ lambda x: isinstance(x, VirtualClusterOperations),
+ ),
+ )
+
+ @property
+ def _datastore_operations(self) -> "DatastoreOperations":
+ from azure.ai.ml.operations import DatastoreOperations
+
+ return cast(DatastoreOperations, self._all_operations.all_operations[AzureMLResourceType.DATASTORE])
+
+ @property
+ def _runs_operations(self) -> RunOperations:
+ if not self._runs_operations_client:
+ service_client_run_history = ServiceClientRunHistory(
+ self._credential, base_url=self._api_url, **self._service_client_kwargs
+ )
+ self._runs_operations_client = RunOperations(
+ self._operation_scope, self._operation_config, service_client_run_history
+ )
+ return self._runs_operations_client
+
+ @property
+ def _dataset_dataplane_operations(self) -> DatasetDataplaneOperations:
+ if not self._dataset_dataplane_operations_client:
+ service_client_dataset_dataplane = ServiceClientDatasetDataplane(
+ self._credential, base_url=self._api_url, **self._service_client_kwargs
+ )
+ self._dataset_dataplane_operations_client = DatasetDataplaneOperations(
+ self._operation_scope,
+ self._operation_config,
+ service_client_dataset_dataplane,
+ )
+ return self._dataset_dataplane_operations_client
+
+ @property
+ def _model_dataplane_operations(self) -> ModelDataplaneOperations:
+ if not self._model_dataplane_operations_client:
+ service_client_model_dataplane = ServiceClientModelDataplane(
+ self._credential, base_url=self._api_url, **self._service_client_kwargs
+ )
+ self._model_dataplane_operations_client = ModelDataplaneOperations(
+ self._operation_scope,
+ self._operation_config,
+ service_client_model_dataplane,
+ )
+ return self._model_dataplane_operations_client
+
+ @property
+ def _api_url(self) -> str:
+ if not self._api_base_url:
+ self._api_base_url = self._get_workspace_url(url_key=WorkspaceDiscoveryUrlKey.API)
+ return self._api_base_url
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "Job.List", ActivityType.PUBLICAPI)
+ def list(
+ self,
+ *,
+ parent_job_name: Optional[str] = None,
+ list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
+ **kwargs: Any,
+ ) -> Iterable[Job]:
+ """Lists jobs in the workspace.
+
+ :keyword parent_job_name: When provided, only returns jobs that are children of the named job. Defaults to None,
+ listing all jobs in the workspace.
+ :paramtype parent_job_name: Optional[str]
+ :keyword list_view_type: The view type for including/excluding archived jobs. Defaults to
+ ~azure.mgt.machinelearningservices.models.ListViewType.ACTIVE_ONLY, excluding archived jobs.
+ :paramtype list_view_type: ~azure.mgmt.machinelearningservices.models.ListViewType
+ :return: An iterator-like instance of Job objects.
+ :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.Job]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_list]
+ :end-before: [END job_operations_list]
+ :language: python
+ :dedent: 8
+ :caption: Retrieving a list of the archived jobs in a workspace with parent job named
+ "iris-dataset-jobs".
+ """
+
+ schedule_defined = kwargs.pop("schedule_defined", None)
+ scheduled_job_name = kwargs.pop("scheduled_job_name", None)
+ max_results = kwargs.pop("max_results", None)
+
+ if parent_job_name:
+ parent_job = self.get(parent_job_name)
+ return self._runs_operations.get_run_children(parent_job.name, max_results=max_results)
+
+ return cast(
+ Iterable[Job],
+ self.service_client_01_2024_preview.jobs.list(
+ self._operation_scope.resource_group_name,
+ self._workspace_name,
+ cls=lambda objs: [self._handle_rest_errors(obj) for obj in objs],
+ list_view_type=list_view_type,
+ scheduled=schedule_defined,
+ schedule_id=scheduled_job_name,
+ **self._kwargs,
+ **kwargs,
+ ),
+ )
+
+ def _handle_rest_errors(self, job_object: Union[JobBase, Run]) -> Optional[Job]:
+ """Handle errors while resolving azureml_id's during list operation.
+
+ :param job_object: The REST object to turn into a Job
+ :type job_object: Union[JobBase, Run]
+ :return: The resolved job
+ :rtype: Optional[Job]
+ """
+ try:
+ return self._resolve_azureml_id(Job._from_rest_object(job_object))
+ except JobParsingError:
+ return None
+
+ @distributed_trace
+ @monitor_with_telemetry_mixin(ops_logger, "Job.Get", ActivityType.PUBLICAPI)
+ def get(self, name: str) -> Job:
+ """Gets a job resource.
+
+ :param name: The name of the job.
+ :type name: str
+ :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+ :raises ~azure.ai.ml.exceptions.UserErrorException: Raised if the name parameter is not a string.
+ :return: Job object retrieved from the service.
+ :rtype: ~azure.ai.ml.entities.Job
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_get]
+ :end-before: [END job_operations_get]
+ :language: python
+ :dedent: 8
+ :caption: Retrieving a job named "iris-dataset-job-1".
+ """
+ if not isinstance(name, str):
+ raise UserErrorException(f"{name} is a invalid input for client.jobs.get().")
+ job_object = self._get_job(name)
+
+ job: Any = None
+ if not _is_pipeline_child_job(job_object):
+ job = Job._from_rest_object(job_object)
+ if job_object.properties.job_type != RestJobType.AUTO_ML:
+ # resolvers do not work with the old contract, leave the ids as is
+ job = self._resolve_azureml_id(job)
+ else:
+ # Child jobs are no longer available through MFE, fetch
+ # through run history instead
+ job = self._runs_operations._translate_from_rest_object(self._runs_operations.get_run(name))
+
+ return job
+
+ @distributed_trace
+ @monitor_with_telemetry_mixin(ops_logger, "Job.ShowServices", ActivityType.PUBLICAPI)
+ def show_services(self, name: str, node_index: int = 0) -> Optional[Dict[str, ServiceInstance]]:
+ """Gets services associated with a job's node.
+
+ :param name: The name of the job.
+ :type name: str
+ :param node_index: The node's index (zero-based). Defaults to 0.
+ :type node_index: int
+ :return: The services associated with the job for the given node.
+ :rtype: dict[str, ~azure.ai.ml.entities.ServiceInstance]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_show_services]
+ :end-before: [END job_operations_show_services]
+ :language: python
+ :dedent: 8
+ :caption: Retrieving the services associated with a job's 1st node.
+ """
+
+ service_instances_dict = self._runs_operations._operation.get_run_service_instances(
+ self._subscription_id,
+ self._operation_scope.resource_group_name,
+ self._workspace_name,
+ name,
+ node_index,
+ )
+ if not service_instances_dict.instances:
+ return None
+
+ return {
+ k: ServiceInstance._from_rest_object(v, node_index) for k, v in service_instances_dict.instances.items()
+ }
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "Job.Cancel", ActivityType.PUBLICAPI)
+ def begin_cancel(self, name: str, **kwargs: Any) -> LROPoller[None]:
+ """Cancels a job.
+
+ :param name: The name of the job.
+ :type name: str
+ :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+ :return: A poller to track the operation status.
+ :rtype: ~azure.core.polling.LROPoller[None]
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_begin_cancel]
+ :end-before: [END job_operations_begin_cancel]
+ :language: python
+ :dedent: 8
+ :caption: Canceling the job named "iris-dataset-job-1" and checking the poller for status.
+ """
+ tag = kwargs.pop("tag", None)
+
+ if not tag:
+ return self._operation_2023_02_preview.begin_cancel(
+ id=name,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._kwargs,
+ **kwargs,
+ )
+
+ # Note: Below batch cancel is experimental and for private usage
+ results = []
+ jobs = self.list(tag=tag)
+ # TODO: Do we need to show error message when no jobs is returned for the given tag?
+ for job in jobs:
+ result = self._operation_2023_02_preview.begin_cancel(
+ id=job.name,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._kwargs,
+ )
+ results.append(result)
+ return results
+
+ def _try_get_compute_arm_id(self, compute: Union[Compute, str]) -> Optional[Union[Compute, str]]:
+ # pylint: disable=too-many-return-statements
+ # TODO: Remove in PuP with native import job/component type support in MFE/Designer
+ # DataFactory 'clusterless' job
+ if str(compute) == ComputeType.ADF:
+ return compute
+
+ if compute is not None:
+ # Singularity
+ if isinstance(compute, str) and is_singularity_id_for_resource(compute):
+ return self._virtual_cluster_operations.get(compute)["id"]
+ if isinstance(compute, str) and is_singularity_full_name_for_resource(compute):
+ return self._orchestrators._get_singularity_arm_id_from_full_name(compute)
+ if isinstance(compute, str) and is_singularity_short_name_for_resource(compute):
+ return self._orchestrators._get_singularity_arm_id_from_short_name(compute)
+ # other compute
+ if is_ARM_id_for_resource(compute, resource_type=AzureMLResourceType.COMPUTE):
+ # compute is not a sub-workspace resource
+ compute_name = compute.split("/")[-1] # type: ignore
+ elif isinstance(compute, Compute):
+ compute_name = compute.name
+ elif isinstance(compute, str):
+ compute_name = compute
+ elif isinstance(compute, PipelineInput):
+ compute_name = str(compute)
+ else:
+ raise ValueError(
+ "compute must be either an arm id of Compute, a Compute object or a compute name but"
+ f" got {type(compute)}"
+ )
+
+ if is_data_binding_expression(compute_name):
+ return compute_name
+ if compute_name == SERVERLESS_COMPUTE:
+ return compute_name
+ try:
+ return self._compute_operations.get(compute_name).id
+ except ResourceNotFoundError as e:
+ # the original error is not helpful (Operation returned an invalid status 'Not Found'),
+ # so we raise a more helpful one
+ response = e.response
+ response.reason = "Not found compute with name {}".format(compute_name)
+ raise ResourceNotFoundError(response=response) from e
+ return None
+
+ @distributed_trace
+ @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.PUBLICAPI)
+ def validate(self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any) -> ValidationResult:
+ """Validates a Job object before submitting to the service. Anonymous assets may be created if there are inline
+ defined entities such as Component, Environment, and Code. Only pipeline jobs are supported for validation
+ currently.
+
+ :param job: The job object to be validated.
+ :type job: ~azure.ai.ml.entities.Job
+ :keyword raise_on_failure: Specifies if an error should be raised if validation fails. Defaults to False.
+ :paramtype raise_on_failure: bool
+ :return: A ValidationResult object containing all found errors.
+ :rtype: ~azure.ai.ml.entities.ValidationResult
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_validate]
+ :end-before: [END job_operations_validate]
+ :language: python
+ :dedent: 8
+ :caption: Validating a PipelineJob object and printing out the found errors.
+ """
+ return self._validate(job, raise_on_failure=raise_on_failure, **kwargs)
+
+ @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.INTERNALCALL)
+ def _validate(
+ self,
+ job: Job,
+ *,
+ raise_on_failure: bool = False,
+ # pylint:disable=unused-argument
+ **kwargs: Any,
+ ) -> ValidationResult:
+ """Implementation of validate.
+
+ Add this function to avoid calling validate() directly in
+ create_or_update(), which will impact telemetry statistics &
+ bring experimental warning in create_or_update().
+
+ :param job: The job to validate
+ :type job: Job
+ :keyword raise_on_failure: Whether to raise on validation failure
+ :paramtype raise_on_failure: bool
+ :return: The validation result
+ :rtype: ValidationResult
+ """
+ git_code_validation_result = PathAwareSchemaValidatableMixin._create_empty_validation_result()
+ # TODO: move this check to Job._validate after validation is supported for all job types
+ # If private features are enable and job has code value of type str we need to check
+ # that it is a valid git path case. Otherwise we should throw a ValidationException
+ # saying that the code value is not a valid code value
+ if (
+ hasattr(job, "code")
+ and job.code is not None
+ and isinstance(job.code, str)
+ and job.code.startswith(GIT_PATH_PREFIX)
+ and not is_private_preview_enabled()
+ ):
+ git_code_validation_result.append_error(
+ message=f"Invalid code value: {job.code}. Git paths are not supported.",
+ yaml_path="code",
+ )
+
+ if not isinstance(job, PathAwareSchemaValidatableMixin):
+
+ def error_func(msg: str, no_personal_data_msg: str) -> ValidationException:
+ return ValidationException(
+ message=msg,
+ no_personal_data_message=no_personal_data_msg,
+ error_target=ErrorTarget.JOB,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ return git_code_validation_result.try_raise(
+ raise_error=raise_on_failure,
+ error_func=error_func,
+ )
+
+ validation_result = job._validate(raise_error=raise_on_failure)
+ validation_result.merge_with(git_code_validation_result)
+ # fast return to avoid remote call if local validation not passed
+ # TODO: use remote call to validate the entire job after MFE API is ready
+ if validation_result.passed and isinstance(job, PipelineJob):
+ try:
+ job.compute = self._try_get_compute_arm_id(job.compute)
+ except Exception as e: # pylint: disable=W0718
+ validation_result.append_error(yaml_path="compute", message=str(e))
+
+ for node_name, node in job.jobs.items():
+ try:
+ # TODO(1979547): refactor, not all nodes have compute
+ if not isinstance(node, ControlFlowNode):
+ node.compute = self._try_get_compute_arm_id(node.compute)
+ except Exception as e: # pylint: disable=W0718
+ validation_result.append_error(yaml_path=f"jobs.{node_name}.compute", message=str(e))
+
+ validation_result.resolve_location_for_diagnostics(str(job._source_path))
+ return job._try_raise(validation_result, raise_error=raise_on_failure) # pylint: disable=protected-access
+
+ @distributed_trace
+ @monitor_with_telemetry_mixin(ops_logger, "Job.CreateOrUpdate", ActivityType.PUBLICAPI)
+ def create_or_update(
+ self,
+ job: Job,
+ *,
+ description: Optional[str] = None,
+ compute: Optional[str] = None,
+ tags: Optional[dict] = None,
+ experiment_name: Optional[str] = None,
+ skip_validation: bool = False,
+ **kwargs: Any,
+ ) -> Job:
+ """Creates or updates a job. If entities such as Environment or Code are defined inline, they'll be created
+ together with the job.
+
+ :param job: The job object.
+ :type job: ~azure.ai.ml.entities.Job
+ :keyword description: The job description.
+ :paramtype description: Optional[str]
+ :keyword compute: The compute target for the job.
+ :paramtype compute: Optional[str]
+ :keyword tags: The tags for the job.
+ :paramtype tags: Optional[dict]
+ :keyword experiment_name: The name of the experiment the job will be created under. If None is provided,
+ job will be created under experiment 'Default'.
+ :paramtype experiment_name: Optional[str]
+ :keyword skip_validation: Specifies whether or not to skip validation before creating or updating the job. Note
+ that validation for dependent resources such as an anonymous component will not be skipped. Defaults to
+ False.
+ :paramtype skip_validation: bool
+ :raises Union[~azure.ai.ml.exceptions.UserErrorException, ~azure.ai.ml.exceptions.ValidationException]: Raised
+ if Job cannot be successfully validated. Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.AssetException: Raised if Job assets
+ (e.g. Data, Code, Model, Environment) cannot be successfully validated.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.ModelException: Raised if Job model cannot be successfully validated.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.JobException: Raised if Job object or attributes correctly formatted.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty
+ directory.
+ :raises ~azure.ai.ml.exceptions.DockerEngineNotAvailableError: Raised if Docker Engine is not available for
+ local job.
+ :return: Created or updated job.
+ :rtype: ~azure.ai.ml.entities.Job
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_create_and_update]
+ :end-before: [END job_operations_create_and_update]
+ :language: python
+ :dedent: 8
+ :caption: Creating a new job and then updating its compute.
+ """
+ if isinstance(job, BaseNode) and not (
+ isinstance(job, (Command, Spark))
+ ): # Command/Spark objects can be used directly
+ job = job._to_job()
+
+ # Set job properties before submission
+ if description is not None:
+ job.description = description
+ if compute is not None:
+ job.compute = compute
+ if tags is not None:
+ job.tags = tags
+ if experiment_name is not None:
+ job.experiment_name = experiment_name
+
+ if job.compute == LOCAL_COMPUTE_TARGET:
+ job.environment_variables[COMMON_RUNTIME_ENV_VAR] = "true" # type: ignore
+
+ # TODO: why we put log logic here instead of inside self._validate()?
+ try:
+ if not skip_validation:
+ self._validate(job, raise_on_failure=True)
+
+ # Create all dependent resources
+ self._resolve_arm_id_or_upload_dependencies(job)
+ except (ValidationException, ValidationError) as ex:
+ log_and_raise_error(ex)
+
+ git_props = get_git_properties()
+ # Do not add git props if they already exist in job properties.
+ # This is for update specifically-- if the user switches branches and tries to update
+ # their job, the request will fail since the git props will be repopulated.
+ # MFE does not allow existing properties to be updated, only for new props to be added
+ if not any(prop_name in job.properties for prop_name in git_props):
+ job.properties = {**job.properties, **git_props}
+ rest_job_resource = to_rest_job_object(job)
+
+ # Make a copy of self._kwargs instead of contaminate the original one
+ kwargs = {**self._kwargs}
+ # set headers with user aml token if job is a pipeline or has a user identity setting
+ if (rest_job_resource.properties.job_type == RestJobType.PIPELINE) or (
+ hasattr(rest_job_resource.properties, "identity")
+ and (isinstance(rest_job_resource.properties.identity, UserIdentity))
+ ):
+ self._set_headers_with_user_aml_token(kwargs)
+
+ result = self._create_or_update_with_different_version_api(rest_job_resource=rest_job_resource, **kwargs)
+
+ if is_local_run(result):
+ ws_base_url = self._all_operations.all_operations[
+ AzureMLResourceType.WORKSPACE
+ ]._operation._client._base_url
+ snapshot_id = start_run_if_local(
+ result,
+ self._credential,
+ ws_base_url,
+ self._requests_pipeline,
+ )
+ # in case of local run, the first create/update call to MFE returns the
+ # request for submitting to ES. Once we request to ES and start the run, we
+ # need to put the same body to MFE to append user tags etc.
+ if rest_job_resource.properties.job_type == RestJobType.PIPELINE:
+ job_object = self._get_job_2401(rest_job_resource.name)
+ else:
+ job_object = self._get_job(rest_job_resource.name)
+ if result.properties.tags is not None:
+ for tag_name, tag_value in rest_job_resource.properties.tags.items():
+ job_object.properties.tags[tag_name] = tag_value
+ if result.properties.properties is not None:
+ for (
+ prop_name,
+ prop_value,
+ ) in rest_job_resource.properties.properties.items():
+ job_object.properties.properties[prop_name] = prop_value
+ if snapshot_id is not None:
+ job_object.properties.properties["ContentSnapshotId"] = snapshot_id
+
+ result = self._create_or_update_with_different_version_api(rest_job_resource=job_object, **kwargs)
+
+ return self._resolve_azureml_id(Job._from_rest_object(result))
+
+ def _create_or_update_with_different_version_api(self, rest_job_resource: JobBase, **kwargs: Any) -> JobBase:
+ service_client_operation = self._operation_2023_02_preview
+ if rest_job_resource.properties.job_type == RestJobType_20241001Preview.FINE_TUNING:
+ service_client_operation = self.service_client_10_2024_preview.jobs
+ if rest_job_resource.properties.job_type == RestJobType.PIPELINE:
+ service_client_operation = self.service_client_01_2024_preview.jobs
+ if rest_job_resource.properties.job_type == RestJobType.AUTO_ML:
+ service_client_operation = self.service_client_01_2024_preview.jobs
+ if rest_job_resource.properties.job_type == RestJobType.SWEEP:
+ service_client_operation = self.service_client_01_2024_preview.jobs
+ if rest_job_resource.properties.job_type == RestJobType.COMMAND:
+ service_client_operation = self.service_client_01_2025_preview.jobs
+
+ result = service_client_operation.create_or_update(
+ id=rest_job_resource.name,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ body=rest_job_resource,
+ **kwargs,
+ )
+
+ return result
+
+ def _create_or_update_with_latest_version_api(self, rest_job_resource: JobBase, **kwargs: Any) -> JobBase:
+ service_client_operation = self.service_client_01_2024_preview.jobs
+ result = service_client_operation.create_or_update(
+ id=rest_job_resource.name,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ body=rest_job_resource,
+ **kwargs,
+ )
+
+ return result
+
+ def _archive_or_restore(self, name: str, is_archived: bool) -> None:
+ job_object = self._get_job(name)
+ if job_object.properties.job_type == RestJobType.PIPELINE:
+ job_object = self._get_job_2401(name)
+ if _is_pipeline_child_job(job_object):
+ raise PipelineChildJobError(job_id=job_object.id)
+ job_object.properties.is_archived = is_archived
+
+ self._create_or_update_with_different_version_api(rest_job_resource=job_object)
+
+ @distributed_trace
+ @monitor_with_telemetry_mixin(ops_logger, "Job.Archive", ActivityType.PUBLICAPI)
+ def archive(self, name: str) -> None:
+ """Archives a job.
+
+ :param name: The name of the job.
+ :type name: str
+ :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_archive]
+ :end-before: [END job_operations_archive]
+ :language: python
+ :dedent: 8
+ :caption: Archiving a job.
+ """
+
+ self._archive_or_restore(name=name, is_archived=True)
+
+ @distributed_trace
+ @monitor_with_telemetry_mixin(ops_logger, "Job.Restore", ActivityType.PUBLICAPI)
+ def restore(self, name: str) -> None:
+ """Restores an archived job.
+
+ :param name: The name of the job.
+ :type name: str
+ :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_restore]
+ :end-before: [END job_operations_restore]
+ :language: python
+ :dedent: 8
+ :caption: Restoring an archived job.
+ """
+
+ self._archive_or_restore(name=name, is_archived=False)
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "Job.Stream", ActivityType.PUBLICAPI)
+ def stream(self, name: str) -> None:
+ """Streams the logs of a running job.
+
+ :param name: The name of the job.
+ :type name: str
+ :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_stream_logs]
+ :end-before: [END job_operations_stream_logs]
+ :language: python
+ :dedent: 8
+ :caption: Streaming a running job.
+ """
+ job_object = self._get_job(name)
+
+ if _is_pipeline_child_job(job_object):
+ raise PipelineChildJobError(job_id=job_object.id)
+
+ self._stream_logs_until_completion(
+ self._runs_operations,
+ job_object,
+ self._datastore_operations,
+ requests_pipeline=self._requests_pipeline,
+ )
+
+ @distributed_trace
+ @monitor_with_activity(ops_logger, "Job.Download", ActivityType.PUBLICAPI)
+ def download(
+ self,
+ name: str,
+ *,
+ download_path: Union[PathLike, str] = ".",
+ output_name: Optional[str] = None,
+ all: bool = False, # pylint: disable=redefined-builtin
+ ) -> None:
+ """Downloads the logs and output of a job.
+
+ :param name: The name of a job.
+ :type name: str
+ :keyword download_path: The local path to be used as the download destination. Defaults to ".".
+ :paramtype download_path: Union[PathLike, str]
+ :keyword output_name: The name of the output to download. Defaults to None.
+ :paramtype output_name: Optional[str]
+ :keyword all: Specifies if all logs and named outputs should be downloaded. Defaults to False.
+ :paramtype all: bool
+ :raises ~azure.ai.ml.exceptions.JobException: Raised if Job is not yet in a terminal state.
+ Details will be provided in the error message.
+ :raises ~azure.ai.ml.exceptions.MlException: Raised if logs and outputs cannot be successfully downloaded.
+ Details will be provided in the error message.
+
+ .. admonition:: Example:
+
+ .. literalinclude:: ../samples/ml_samples_misc.py
+ :start-after: [START job_operations_download]
+ :end-before: [END job_operations_download]
+ :language: python
+ :dedent: 8
+ :caption: Downloading all logs and named outputs of the job "job-1" into local directory "job-1-logs".
+ """
+ job_details = self.get(name)
+ # job is reused, get reused job to download
+ if (
+ job_details.properties.get(PipelineConstants.REUSED_FLAG_FIELD) == PipelineConstants.REUSED_FLAG_TRUE
+ and PipelineConstants.REUSED_JOB_ID in job_details.properties
+ ):
+ reused_job_name = job_details.properties[PipelineConstants.REUSED_JOB_ID]
+ reused_job_detail = self.get(reused_job_name)
+ module_logger.info(
+ "job %s reuses previous job %s, download from the reused job.",
+ name,
+ reused_job_name,
+ )
+ name, job_details = reused_job_name, reused_job_detail
+ job_status = job_details.status
+ if job_status not in RunHistoryConstants.TERMINAL_STATUSES:
+ msg = "This job is in state {}. Download is allowed only in states {}".format(
+ job_status, RunHistoryConstants.TERMINAL_STATUSES
+ )
+ raise JobException(
+ message=msg,
+ target=ErrorTarget.JOB,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ is_batch_job = (
+ job_details.tags.get("azureml.batchrun", None) == "true"
+ and job_details.tags.get("azureml.jobtype", None) != PipelineConstants.PIPELINE_JOB_TYPE
+ )
+ outputs = {}
+ download_path = Path(download_path)
+ artifact_directory_name = "artifacts"
+ output_directory_name = "named-outputs"
+
+ def log_missing_uri(what: str) -> None:
+ module_logger.debug(
+ 'Could not download %s for job "%s" (job status: %s)',
+ what,
+ job_details.name,
+ job_details.status,
+ )
+
+ if isinstance(job_details, SweepJob):
+ best_child_run_id = job_details.properties.get(SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, None)
+ if best_child_run_id:
+ self.download(
+ best_child_run_id,
+ download_path=download_path,
+ output_name=output_name,
+ all=all,
+ )
+ else:
+ log_missing_uri(what="from best child run")
+
+ if output_name:
+ # Don't need to download anything from the parent
+ return
+ # only download default artifacts (logs + default outputs) from parent
+ artifact_directory_name = "hd-artifacts"
+ output_name = None
+ all = False
+
+ if is_batch_job:
+ scoring_uri = self._get_batch_job_scoring_output_uri(job_details.name)
+ if scoring_uri:
+ outputs = {BATCH_JOB_CHILD_RUN_OUTPUT_NAME: scoring_uri}
+ else:
+ log_missing_uri("batch job scoring file")
+ elif output_name:
+ outputs = self._get_named_output_uri(name, output_name)
+
+ if output_name not in outputs:
+ log_missing_uri(what=f'output "{output_name}"')
+ elif all:
+ outputs = self._get_named_output_uri(name)
+
+ if DEFAULT_ARTIFACT_STORE_OUTPUT_NAME not in outputs:
+ log_missing_uri(what="logs")
+ else:
+ outputs = self._get_named_output_uri(name, DEFAULT_ARTIFACT_STORE_OUTPUT_NAME)
+
+ if DEFAULT_ARTIFACT_STORE_OUTPUT_NAME not in outputs:
+ log_missing_uri(what="logs")
+
+ # Download all requested artifacts
+ for item_name, uri in outputs.items():
+ if is_batch_job:
+ destination = download_path
+ elif item_name == DEFAULT_ARTIFACT_STORE_OUTPUT_NAME:
+ destination = download_path / artifact_directory_name
+ else:
+ destination = download_path / output_directory_name / item_name
+
+ module_logger.info("Downloading artifact %s to %s", uri, destination)
+ download_artifact_from_aml_uri(
+ uri=uri,
+ destination=destination, # type: ignore[arg-type]
+ datastore_operation=self._datastore_operations,
+ )
+
+ def _get_named_output_uri(
+ self, job_name: Optional[str], output_names: Optional[Union[Iterable[str], str]] = None
+ ) -> Dict[str, str]:
+ """Gets the URIs to the specified named outputs of job.
+
+ :param job_name: Run ID of the job
+ :type job_name: str
+ :param output_names: Either an output name, or an iterable of output names. If omitted, all outputs are
+ returned.
+ :type output_names: Optional[Union[Iterable[str], str]]
+ :return: Map of output_names to URIs. Note that URIs that could not be found will not be present in the map.
+ :rtype: Dict[str, str]
+ """
+
+ if isinstance(output_names, str):
+ output_names = {output_names}
+ elif output_names:
+ output_names = set(output_names)
+
+ outputs = get_job_output_uris_from_dataplane(
+ job_name,
+ self._runs_operations,
+ self._dataset_dataplane_operations,
+ self._model_dataplane_operations,
+ output_names=output_names,
+ )
+
+ missing_outputs: Set = set()
+ if output_names is not None:
+ missing_outputs = set(output_names).difference(outputs.keys())
+ else:
+ missing_outputs = set().difference(outputs.keys())
+
+ # Include default artifact store in outputs
+ if (not output_names) or DEFAULT_ARTIFACT_STORE_OUTPUT_NAME in missing_outputs:
+ try:
+ job = self.get(job_name)
+ artifact_store_uri = job.outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME]
+ if artifact_store_uri is not None and artifact_store_uri.path:
+ outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] = artifact_store_uri.path
+ except (AttributeError, KeyError):
+ outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] = SHORT_URI_FORMAT.format(
+ "workspaceartifactstore", f"ExperimentRun/dcid.{job_name}/"
+ )
+ missing_outputs.discard(DEFAULT_ARTIFACT_STORE_OUTPUT_NAME)
+
+ # A job's output is not always reported in the outputs dict, but
+ # doesn't currently have a user configurable location.
+ # Perform a search of known paths to find output
+ # TODO: Remove once job output locations are reliably returned from the service
+
+ default_datastore = self._datastore_operations.get_default().name
+
+ for name in missing_outputs:
+ potential_uris = [
+ SHORT_URI_FORMAT.format(default_datastore, f"azureml/{job_name}/{name}/"),
+ SHORT_URI_FORMAT.format(default_datastore, f"dataset/{job_name}/{name}/"),
+ ]
+
+ for potential_uri in potential_uris:
+ if aml_datastore_path_exists(potential_uri, self._datastore_operations):
+ outputs[name] = potential_uri
+ break
+
+ return outputs
+
+ def _get_batch_job_scoring_output_uri(self, job_name: str) -> Optional[str]:
+ uri = None
+ # Download scoring output, which is the "score" output of the child job named "batchscoring"
+ # Batch Jobs are pipeline jobs with only one child, so this should terminate after an iteration
+ for child in self._runs_operations.get_run_children(job_name):
+ uri = self._get_named_output_uri(child.name, BATCH_JOB_CHILD_RUN_OUTPUT_NAME).get(
+ BATCH_JOB_CHILD_RUN_OUTPUT_NAME, None
+ )
+ # After the correct child is found, break to prevent unnecessary looping
+ if uri is not None:
+ break
+ return uri
+
+ def _get_job(self, name: str) -> JobBase:
+ job = self.service_client_01_2024_preview.jobs.get(
+ id=name,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._kwargs,
+ )
+
+ if (
+ hasattr(job, "properties")
+ and job.properties
+ and hasattr(job.properties, "job_type")
+ and job.properties.job_type == RestJobType_20241001Preview.FINE_TUNING
+ ):
+ return self.service_client_10_2024_preview.jobs.get(
+ id=name,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._kwargs,
+ )
+
+ return job
+
+ # Upgrade api from 2023-04-01-preview to 2024-01-01-preview for pipeline job
+ # We can remove this function once `_get_job` function has also been upgraded to the same version with pipeline
+ def _get_job_2401(self, name: str) -> JobBase_2401:
+ service_client_operation = self.service_client_01_2024_preview.jobs
+ return service_client_operation.get(
+ id=name,
+ resource_group_name=self._operation_scope.resource_group_name,
+ workspace_name=self._workspace_name,
+ **self._kwargs,
+ )
+
+ def _get_workspace_url(self, url_key: WorkspaceDiscoveryUrlKey) -> str:
+ discovery_url = (
+ self._all_operations.all_operations[AzureMLResourceType.WORKSPACE]
+ .get(self._operation_scope.workspace_name)
+ .discovery_url
+ )
+ all_urls = json.loads(
+ download_text_from_url(
+ discovery_url,
+ create_requests_pipeline_with_retry(requests_pipeline=self._requests_pipeline),
+ )
+ )
+ return all_urls[url_key]
+
+ def _resolve_arm_id_or_upload_dependencies(self, job: Job) -> None:
+ """This method converts name or name:version to ARM id. Or it
+ registers/uploads nested dependencies.
+
+ :param job: the job resource entity
+ :type job: Job
+ :return: the job resource entity that nested dependencies are resolved
+ :rtype: Job
+ """
+
+ self._resolve_arm_id_or_azureml_id(job, self._orchestrators.get_asset_arm_id)
+
+ if isinstance(job, PipelineJob):
+ # Resolve top-level inputs
+ self._resolve_job_inputs(self._flatten_group_inputs(job.inputs), job._base_path)
+ # inputs in sub-pipelines has been resolved in
+ # self._resolve_arm_id_or_azureml_id(job, self._orchestrators.get_asset_arm_id)
+ # as they are part of the pipeline component
+ elif isinstance(job, AutoMLJob):
+ self._resolve_automl_job_inputs(job)
+ elif isinstance(job, FineTuningJob):
+ self._resolve_finetuning_job_inputs(job)
+ elif isinstance(job, DistillationJob):
+ self._resolve_distillation_job_inputs(job)
+ elif isinstance(job, Spark):
+ self._resolve_job_inputs(job._job_inputs.values(), job._base_path)
+ elif isinstance(job, Command):
+ # TODO: switch to use inputs of Command objects, once the inputs/outputs building
+ # logic is removed from the BaseNode constructor.
+ try:
+ self._resolve_job_inputs(job._job_inputs.values(), job._base_path)
+ except AttributeError:
+ # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs
+ pass
+ else:
+ try:
+ self._resolve_job_inputs(job.inputs.values(), job._base_path) # type: ignore
+ except AttributeError:
+ # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs
+ pass
+
+ def _resolve_automl_job_inputs(self, job: AutoMLJob) -> None:
+ """This method resolves the inputs for AutoML jobs.
+
+ :param job: the job resource entity
+ :type job: AutoMLJob
+ """
+ if isinstance(job, AutoMLJob):
+ self._resolve_job_input(job.training_data, job._base_path)
+ if job.validation_data is not None:
+ self._resolve_job_input(job.validation_data, job._base_path)
+ if hasattr(job, "test_data") and job.test_data is not None:
+ self._resolve_job_input(job.test_data, job._base_path)
+
+ def _resolve_finetuning_job_inputs(self, job: FineTuningJob) -> None:
+ """This method resolves the inputs for FineTuning jobs.
+
+ :param job: the job resource entity
+ :type job: FineTuningJob
+ """
+ from azure.ai.ml.entities._job.finetuning.finetuning_vertical import FineTuningVertical
+
+ if isinstance(job, FineTuningVertical):
+ # self._resolve_job_input(job.model, job._base_path)
+ self._resolve_job_input(job.training_data, job._base_path)
+ if job.validation_data is not None:
+ self._resolve_job_input(job.validation_data, job._base_path)
+
+ def _resolve_distillation_job_inputs(self, job: DistillationJob) -> None:
+ """This method resolves the inputs for Distillation jobs.
+
+ :param job: the job resource entity
+ :type job: DistillationJob
+ """
+ if isinstance(job, DistillationJob):
+ if job.training_data:
+ self._resolve_job_input(job.training_data, job._base_path)
+ if job.validation_data is not None:
+ self._resolve_job_input(job.validation_data, job._base_path)
+
+ def _resolve_azureml_id(self, job: Job) -> Job:
+ """This method converts ARM id to name or name:version for nested
+ entities.
+
+ :param job: the job resource entity
+ :type job: Job
+ :return: the job resource entity that nested dependencies are resolved
+ :rtype: Job
+ """
+ self._append_tid_to_studio_url(job)
+ self._resolve_job_inputs_arm_id(job)
+ return self._resolve_arm_id_or_azureml_id(job, self._orchestrators.resolve_azureml_id)
+
+ def _resolve_compute_id(self, resolver: _AssetResolver, target: Any) -> Any:
+ # special case for local runs
+ if target is not None and target.lower() == LOCAL_COMPUTE_TARGET:
+ return LOCAL_COMPUTE_TARGET
+ try:
+ modified_target_name = target
+ if target.lower().startswith(AzureMLResourceType.VIRTUALCLUSTER + "/"):
+ # Compute target can be either workspace-scoped compute type,
+ # or AML scoped VC. In the case of VC, resource name will be of form
+ # azureml:virtualClusters/<name> to disambiguate from azureml:name (which is always compute)
+ modified_target_name = modified_target_name[len(AzureMLResourceType.VIRTUALCLUSTER) + 1 :]
+ modified_target_name = LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT.format(
+ self._operation_scope.subscription_id,
+ self._operation_scope.resource_group_name,
+ AZUREML_RESOURCE_PROVIDER,
+ AzureMLResourceType.VIRTUALCLUSTER,
+ modified_target_name,
+ )
+ return resolver(
+ modified_target_name,
+ azureml_type=AzureMLResourceType.VIRTUALCLUSTER,
+ sub_workspace_resource=False,
+ )
+ except Exception: # pylint: disable=W0718
+ return resolver(target, azureml_type=AzureMLResourceType.COMPUTE)
+
+ def _resolve_job_inputs(self, entries: Iterable[Union[Input, str, bool, int, float]], base_path: str) -> None:
+ """resolve job inputs as ARM id or remote url.
+
+ :param entries: An iterable of job inputs
+ :type entries: Iterable[Union[Input, str, bool, int, float]]
+ :param base_path: The base path
+ :type base_path: str
+ """
+ for entry in entries:
+ self._resolve_job_input(entry, base_path)
+
+ # TODO: move it to somewhere else?
+ @classmethod
+ def _flatten_group_inputs(
+ cls, inputs: Dict[str, Union[Input, str, bool, int, float]]
+ ) -> List[Union[Input, str, bool, int, float]]:
+ """Get flatten values from an InputDict.
+
+ :param inputs: The input dict
+ :type inputs: Dict[str, Union[Input, str, bool, int, float]]
+ :return: A list of values from the Input Dict
+ :rtype: List[Union[Input, str, bool, int, float]]
+ """
+ input_values: List = []
+ # Flatten inputs for pipeline job
+ for key, item in inputs.items():
+ if isinstance(item, _GroupAttrDict):
+ input_values.extend(item.flatten(group_parameter_name=key))
+ else:
+ if not isinstance(item, (str, bool, int, float)):
+ # skip resolving inferred optional input without path (in do-while + dynamic input case)
+ if isinstance(item._data, Input): # type: ignore
+ if not item._data.path and item._meta._is_inferred_optional: # type: ignore
+ continue
+ input_values.append(item._data) # type: ignore
+ return input_values
+
+ def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_path: str) -> None:
+ """resolve job input as ARM id or remote url.
+
+ :param entry: The job input
+ :type entry: Union[Input, str, bool, int, float]
+ :param base_path: The base path
+ :type base_path: str
+ """
+
+ # path can be empty if the job was created from builder functions
+ if isinstance(entry, Input) and not entry.path:
+ msg = "Input path can't be empty for jobs."
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.JOB,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.MISSING_FIELD,
+ )
+
+ if (
+ not isinstance(entry, Input)
+ or is_ARM_id_for_resource(entry.path)
+ or is_url(entry.path)
+ or is_data_binding_expression(entry.path) # literal value but set mode in pipeline yaml
+ ): # Literal value, ARM id or remote url. Pass through
+ return
+ try:
+ datastore_name = (
+ entry.datastore if hasattr(entry, "datastore") and entry.datastore else WORKSPACE_BLOB_STORE
+ )
+
+ # absolute local path, upload, transform to remote url
+ if os.path.isabs(entry.path): # type: ignore
+ if entry.type == AssetTypes.URI_FOLDER and not os.path.isdir(entry.path): # type: ignore
+ raise ValidationException(
+ message="There is no dir on target path: {}".format(entry.path),
+ target=ErrorTarget.JOB,
+ no_personal_data_message="There is no dir on target path",
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+ )
+ if entry.type == AssetTypes.URI_FILE and not os.path.isfile(entry.path): # type: ignore
+ raise ValidationException(
+ message="There is no file on target path: {}".format(entry.path),
+ target=ErrorTarget.JOB,
+ no_personal_data_message="There is no file on target path",
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+ )
+ # absolute local path
+ entry.path = _upload_and_generate_remote_uri(
+ self._operation_scope,
+ self._datastore_operations,
+ entry.path,
+ datastore_name=datastore_name,
+ show_progress=self._show_progress,
+ )
+ # TODO : Move this part to a common place
+ if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"):
+ entry.path = entry.path + "/"
+ # Check for AzureML id, is there a better way?
+ elif ":" in entry.path or "@" in entry.path: # type: ignore
+ asset_type = AzureMLResourceType.DATA
+ if entry.type in [AssetTypes.MLFLOW_MODEL, AssetTypes.CUSTOM_MODEL]:
+ asset_type = AzureMLResourceType.MODEL
+
+ entry.path = self._orchestrators.get_asset_arm_id(entry.path, asset_type) # type: ignore
+ else: # relative local path, upload, transform to remote url
+ # Base path will be None for dsl pipeline component for now. We have 2 choices if the dsl pipeline
+ # function is imported from another file:
+ # 1) Use cwd as default base path;
+ # 2) Use the file path of the dsl pipeline function as default base path.
+ # Pick solution 1 for now as defining input path in the script to submit is a more common scenario.
+ local_path = Path(base_path or Path.cwd(), entry.path).resolve() # type: ignore
+ entry.path = _upload_and_generate_remote_uri(
+ self._operation_scope,
+ self._datastore_operations,
+ local_path,
+ datastore_name=datastore_name,
+ show_progress=self._show_progress,
+ )
+ # TODO : Move this part to a common place
+ if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"):
+ entry.path = entry.path + "/"
+ except (MlException, HttpResponseError) as e:
+ raise e
+ except Exception as e:
+ raise ValidationException(
+ message=f"Supported input path value are ARM id, AzureML id, remote uri or local path.\n"
+ f"Met {type(e)}:\n{e}",
+ target=ErrorTarget.JOB,
+ no_personal_data_message=(
+ "Supported input path value are ARM id, AzureML id, " "remote uri or local path."
+ ),
+ error=e,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.INVALID_VALUE,
+ ) from e
+
+ def _resolve_job_inputs_arm_id(self, job: Job) -> None:
+ try:
+ inputs: Dict[str, Union[Input, InputOutputBase, str, bool, int, float]] = job.inputs # type: ignore
+ for _, entry in inputs.items():
+ if isinstance(entry, InputOutputBase):
+ # extract original input form input builder.
+ entry = entry._data
+ if not isinstance(entry, Input) or is_url(entry.path): # Literal value or remote url
+ continue
+ # ARM id
+ entry.path = self._orchestrators.resolve_azureml_id(entry.path)
+
+ except AttributeError:
+ # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs
+ pass
+
+ def _resolve_arm_id_or_azureml_id(self, job: Job, resolver: Union[Callable, _AssetResolver]) -> Job:
+ """Resolve arm_id for a given job.
+
+
+ :param job: The job
+ :type job: Job
+ :param resolver: The asset resolver function
+ :type resolver: _AssetResolver
+ :return: The provided job, with fields resolved to full ARM IDs
+ :rtype: Job
+ """
+ # TODO: this will need to be parallelized when multiple tasks
+ # are required. Also consider the implications for dependencies.
+
+ if isinstance(job, _BaseJob):
+ job.compute = self._resolve_compute_id(resolver, job.compute)
+ elif isinstance(job, Command):
+ job = self._resolve_arm_id_for_command_job(job, resolver)
+ elif isinstance(job, ImportJob):
+ job = self._resolve_arm_id_for_import_job(job, resolver)
+ elif isinstance(job, Spark):
+ job = self._resolve_arm_id_for_spark_job(job, resolver)
+ elif isinstance(job, ParallelJob):
+ job = self._resolve_arm_id_for_parallel_job(job, resolver)
+ elif isinstance(job, SweepJob):
+ job = self._resolve_arm_id_for_sweep_job(job, resolver)
+ elif isinstance(job, AutoMLJob):
+ job = self._resolve_arm_id_for_automl_job(job, resolver, inside_pipeline=False)
+ elif isinstance(job, PipelineJob):
+ job = self._resolve_arm_id_for_pipeline_job(job, resolver)
+ elif isinstance(job, FineTuningJob):
+ pass
+ elif isinstance(job, DistillationJob):
+ pass
+ else:
+ msg = f"Non supported job type: {type(job)}"
+ raise ValidationException(
+ message=msg,
+ target=ErrorTarget.JOB,
+ no_personal_data_message=msg,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.INVALID_VALUE,
+ )
+ return job
+
+ def _resolve_arm_id_for_command_job(self, job: Command, resolver: _AssetResolver) -> Command:
+ """Resolve arm_id for CommandJob.
+
+
+ :param job: The Command job
+ :type job: Command
+ :param resolver: The asset resolver function
+ :type resolver: _AssetResolver
+ :return: The provided Command job, with resolved fields
+ :rtype: Command
+ """
+ if job.code is not None and is_registry_id_for_resource(job.code):
+ msg = "Format not supported for code asset: {}"
+ raise ValidationException(
+ message=msg.format(job.code),
+ target=ErrorTarget.JOB,
+ no_personal_data_message=msg.format("[job.code]"),
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.INVALID_VALUE,
+ )
+
+ if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE):
+ job.code = resolver( # type: ignore
+ Code(base_path=job._base_path, path=job.code),
+ azureml_type=AzureMLResourceType.CODE,
+ )
+ job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
+ job.compute = self._resolve_compute_id(resolver, job.compute)
+ return job
+
+ def _resolve_arm_id_for_spark_job(self, job: Spark, resolver: _AssetResolver) -> Spark:
+ """Resolve arm_id for SparkJob.
+
+ :param job: The Spark job
+ :type job: Spark
+ :param resolver: The asset resolver function
+ :type resolver: _AssetResolver
+ :return: The provided SparkJob, with resolved fields
+ :rtype: Spark
+ """
+ if job.code is not None and is_registry_id_for_resource(job.code):
+ msg = "Format not supported for code asset: {}"
+ raise JobException(
+ message=msg.format(job.code),
+ target=ErrorTarget.JOB,
+ no_personal_data_message=msg.format("[job.code]"),
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE):
+ job.code = resolver( # type: ignore
+ Code(base_path=job._base_path, path=job.code),
+ azureml_type=AzureMLResourceType.CODE,
+ )
+ job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
+ job.compute = self._resolve_compute_id(resolver, job.compute)
+ return job
+
+ def _resolve_arm_id_for_import_job(self, job: ImportJob, resolver: _AssetResolver) -> ImportJob:
+ """Resolve arm_id for ImportJob.
+
+ :param job: The Import job
+ :type job: ImportJob
+ :param resolver: The asset resolver function
+ :type resolver: _AssetResolver
+ :return: The provided ImportJob, with resolved fields
+ :rtype: ImportJob
+ """
+ # compute property will be no longer applicable once import job type is ready on MFE in PuP
+ # for PrP, we use command job type instead for import job where compute property is required
+ # However, MFE only validates compute resource url format. Execution service owns the real
+ # validation today but supports reserved compute names like AmlCompute, ContainerInstance and
+ # DataFactory here for 'clusterless' jobs
+ job.compute = self._resolve_compute_id(resolver, ComputeType.ADF)
+ return job
+
+ def _resolve_arm_id_for_parallel_job(self, job: ParallelJob, resolver: _AssetResolver) -> ParallelJob:
+ """Resolve arm_id for ParallelJob.
+
+ :param job: The Parallel job
+ :type job: ParallelJob
+ :param resolver: The asset resolver function
+ :type resolver: _AssetResolver
+ :return: The provided ParallelJob, with resolved fields
+ :rtype: ParallelJob
+ """
+ if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE): # type: ignore
+ job.code = resolver( # type: ignore
+ Code(base_path=job._base_path, path=job.code), # type: ignore
+ azureml_type=AzureMLResourceType.CODE,
+ )
+ job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT) # type: ignore
+ job.compute = self._resolve_compute_id(resolver, job.compute)
+ return job
+
+ def _resolve_arm_id_for_sweep_job(self, job: SweepJob, resolver: _AssetResolver) -> SweepJob:
+ """Resolve arm_id for SweepJob.
+
+ :param job: The Sweep job
+ :type job: SweepJob
+ :param resolver: The asset resolver function
+ :type resolver: _AssetResolver
+ :return: The provided SweepJob, with resolved fields
+ :rtype: SweepJob
+ """
+ if (
+ job.trial is not None
+ and job.trial.code is not None
+ and not is_ARM_id_for_resource(job.trial.code, AzureMLResourceType.CODE)
+ ):
+ job.trial.code = resolver( # type: ignore[assignment]
+ Code(base_path=job._base_path, path=job.trial.code),
+ azureml_type=AzureMLResourceType.CODE,
+ )
+ if (
+ job.trial is not None
+ and job.trial.environment is not None
+ and not is_ARM_id_for_resource(job.trial.environment, AzureMLResourceType.ENVIRONMENT)
+ ):
+ job.trial.environment = resolver( # type: ignore[assignment]
+ job.trial.environment, azureml_type=AzureMLResourceType.ENVIRONMENT
+ )
+ job.compute = self._resolve_compute_id(resolver, job.compute)
+ return job
+
+ def _resolve_arm_id_for_automl_job(
+ self, job: AutoMLJob, resolver: _AssetResolver, inside_pipeline: bool
+ ) -> AutoMLJob:
+ """Resolve arm_id for AutoMLJob.
+
+ :param job: The AutoML job
+ :type job: AutoMLJob
+ :param resolver: The asset resolver function
+ :type resolver: _AssetResolver
+ :param inside_pipeline: Whether the job is within a pipeline
+ :type inside_pipeline: bool
+ :return: The provided AutoMLJob, with resolved fields
+ :rtype: AutoMLJob
+ """
+ # AutoML does not have dependency uploads. Only need to resolve reference to arm id.
+
+ # automl node in pipeline has optional compute
+ if inside_pipeline and job.compute is None:
+ return job
+ job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE)
+ return job
+
+ def _resolve_arm_id_for_pipeline_job(self, pipeline_job: PipelineJob, resolver: _AssetResolver) -> PipelineJob:
+ """Resolve arm_id for pipeline_job.
+
+ :param pipeline_job: The pipeline job
+ :type pipeline_job: PipelineJob
+ :param resolver: The asset resolver function
+ :type resolver: _AssetResolver
+ :return: The provided PipelineJob, with resolved fields
+ :rtype: PipelineJob
+ """
+ # Get top-level job compute
+ _get_job_compute_id(pipeline_job, resolver)
+
+ # Process job defaults:
+ if pipeline_job.settings:
+ pipeline_job.settings.default_datastore = resolver(
+ pipeline_job.settings.default_datastore,
+ azureml_type=AzureMLResourceType.DATASTORE,
+ )
+ pipeline_job.settings.default_compute = resolver(
+ pipeline_job.settings.default_compute,
+ azureml_type=AzureMLResourceType.COMPUTE,
+ )
+
+ # Process each component job
+ try:
+ self._component_operations._resolve_dependencies_for_pipeline_component_jobs(
+ pipeline_job.component, resolver
+ )
+ except ComponentException as e:
+ raise JobException(
+ message=e.message,
+ target=ErrorTarget.JOB,
+ no_personal_data_message=e.no_personal_data_message,
+ error_category=e.error_category,
+ ) from e
+
+ # Create a pipeline component for pipeline job if user specified component in job yaml.
+ if (
+ not isinstance(pipeline_job.component, str)
+ and getattr(pipeline_job.component, "_source", None) == ComponentSource.YAML_COMPONENT
+ ):
+ pipeline_job.component = resolver( # type: ignore
+ pipeline_job.component,
+ azureml_type=AzureMLResourceType.COMPONENT,
+ )
+
+ return pipeline_job
+
+ def _append_tid_to_studio_url(self, job: Job) -> None:
+ """Appends the user's tenant ID to the end of the studio URL.
+
+ Allows the UI to authenticate against the correct tenant.
+
+ :param job: The job
+ :type job: Job
+ """
+ try:
+ if job.services is not None:
+ studio_endpoint = job.services.get("Studio", None)
+ studio_url = studio_endpoint.endpoint
+ default_scopes = _resource_to_scopes(_get_base_url_from_metadata())
+ module_logger.debug("default_scopes used: `%s`\n", default_scopes)
+ # Extract the tenant id from the credential using PyJWT
+ decode = jwt.decode(
+ self._credential.get_token(*default_scopes).token,
+ options={"verify_signature": False, "verify_aud": False},
+ )
+ tid = decode["tid"]
+ formatted_tid = TID_FMT.format(tid)
+ studio_endpoint.endpoint = studio_url + formatted_tid
+ except Exception: # pylint: disable=W0718
+ module_logger.info("Proceeding with no tenant id appended to studio URL\n")
+
+ def _set_headers_with_user_aml_token(self, kwargs: Any) -> None:
+ aml_resource_id = _get_aml_resource_id_from_metadata()
+ azure_ml_scopes = _resource_to_scopes(aml_resource_id)
+ module_logger.debug("azure_ml_scopes used: `%s`\n", azure_ml_scopes)
+ aml_token = self._credential.get_token(*azure_ml_scopes).token
+ # validate token has aml audience
+ decoded_token = jwt.decode(
+ aml_token,
+ options={"verify_signature": False, "verify_aud": False},
+ )
+ if decoded_token.get("aud") != aml_resource_id:
+ msg = """AAD token with aml scope could not be fetched using the credentials being used.
+ Please validate if token with {0} scope can be fetched using credentials provided to MLClient.
+ Token with {0} scope can be fetched using credentials.get_token({0})
+ """
+ raise ValidationException(
+ message=msg.format(*azure_ml_scopes),
+ target=ErrorTarget.JOB,
+ error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+ no_personal_data_message=msg.format("[job.code]"),
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ headers = kwargs.pop("headers", {})
+ headers["x-azureml-token"] = aml_token
+ kwargs["headers"] = headers
+
+
+def _get_job_compute_id(job: Union[Job, Command], resolver: _AssetResolver) -> None:
+ job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE)