about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py1677
1 files changed, 1677 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py
new file mode 100644
index 00000000..0003c8cd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/operations/_job_operations.py
@@ -0,0 +1,1677 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access, too-many-instance-attributes, too-many-statements, too-many-lines
+import json
+import os.path
+from os import PathLike
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Set, Union, cast
+
+import jwt
+from marshmallow import ValidationError
+
+from azure.ai.ml._artifacts._artifact_utilities import (
+    _upload_and_generate_remote_uri,
+    aml_datastore_path_exists,
+    download_artifact_from_aml_uri,
+)
+from azure.ai.ml._azure_environments import (
+    _get_aml_resource_id_from_metadata,
+    _get_base_url_from_metadata,
+    _resource_to_scopes,
+)
+from azure.ai.ml._exception_helper import log_and_raise_error
+from azure.ai.ml._restclient.dataset_dataplane import AzureMachineLearningWorkspaces as ServiceClientDatasetDataplane
+from azure.ai.ml._restclient.model_dataplane import AzureMachineLearningWorkspaces as ServiceClientModelDataplane
+from azure.ai.ml._restclient.runhistory import AzureMachineLearningWorkspaces as ServiceClientRunHistory
+from azure.ai.ml._restclient.runhistory.models import Run
+from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient022023Preview
+from azure.ai.ml._restclient.v2023_04_01_preview.models import JobBase, ListViewType, UserIdentity
+from azure.ai.ml._restclient.v2023_08_01_preview.models import JobType as RestJobType
+from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase as JobBase_2401
+from azure.ai.ml._restclient.v2024_10_01_preview.models import JobType as RestJobType_20241001Preview
+from azure.ai.ml._scope_dependent_operations import (
+    OperationConfig,
+    OperationsContainer,
+    OperationScope,
+    _ScopeDependentOperations,
+)
+from azure.ai.ml._telemetry import ActivityType, monitor_with_activity, monitor_with_telemetry_mixin
+from azure.ai.ml._utils._http_utils import HttpPipeline
+from azure.ai.ml._utils._logger_utils import OpsLogger
+from azure.ai.ml._utils.utils import (
+    create_requests_pipeline_with_retry,
+    download_text_from_url,
+    is_data_binding_expression,
+    is_private_preview_enabled,
+    is_url,
+)
+from azure.ai.ml.constants._common import (
+    AZUREML_RESOURCE_PROVIDER,
+    BATCH_JOB_CHILD_RUN_OUTPUT_NAME,
+    COMMON_RUNTIME_ENV_VAR,
+    DEFAULT_ARTIFACT_STORE_OUTPUT_NAME,
+    GIT_PATH_PREFIX,
+    LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT,
+    LOCAL_COMPUTE_TARGET,
+    SERVERLESS_COMPUTE,
+    SHORT_URI_FORMAT,
+    SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME,
+    TID_FMT,
+    AssetTypes,
+    AzureMLResourceType,
+    WorkspaceDiscoveryUrlKey,
+)
+from azure.ai.ml.constants._compute import ComputeType
+from azure.ai.ml.constants._job.pipeline import PipelineConstants
+from azure.ai.ml.entities import Compute, Job, PipelineJob, ServiceInstance, ValidationResult
+from azure.ai.ml.entities._assets._artifacts.code import Code
+from azure.ai.ml.entities._builders import BaseNode, Command, Spark
+from azure.ai.ml.entities._datastore._constants import WORKSPACE_BLOB_STORE
+from azure.ai.ml.entities._inputs_outputs import Input
+from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
+from azure.ai.ml.entities._job.base_job import _BaseJob
+from azure.ai.ml.entities._job.distillation.distillation_job import DistillationJob
+from azure.ai.ml.entities._job.finetuning.finetuning_job import FineTuningJob
+from azure.ai.ml.entities._job.import_job import ImportJob
+from azure.ai.ml.entities._job.job import _is_pipeline_child_job
+from azure.ai.ml.entities._job.parallel.parallel_job import ParallelJob
+from azure.ai.ml.entities._job.to_rest_functions import to_rest_job_object
+from azure.ai.ml.entities._validation import PathAwareSchemaValidatableMixin
+from azure.ai.ml.exceptions import (
+    ComponentException,
+    ErrorCategory,
+    ErrorTarget,
+    JobException,
+    JobParsingError,
+    MlException,
+    PipelineChildJobError,
+    UserErrorException,
+    ValidationErrorType,
+    ValidationException,
+)
+from azure.ai.ml.operations._run_history_constants import RunHistoryConstants
+from azure.ai.ml.sweep import SweepJob
+from azure.core.credentials import TokenCredential
+from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
+from azure.core.polling import LROPoller
+from azure.core.tracing.decorator import distributed_trace
+
+from ..constants._component import ComponentSource
+from ..entities._builders.control_flow_node import ControlFlowNode
+from ..entities._job.pipeline._io import InputOutputBase, PipelineInput, _GroupAttrDict
+from ._component_operations import ComponentOperations
+from ._compute_operations import ComputeOperations
+from ._dataset_dataplane_operations import DatasetDataplaneOperations
+from ._job_ops_helper import get_git_properties, get_job_output_uris_from_dataplane, stream_logs_until_completion
+from ._local_job_invoker import is_local_run, start_run_if_local
+from ._model_dataplane_operations import ModelDataplaneOperations
+from ._operation_orchestrator import (
+    OperationOrchestrator,
+    _AssetResolver,
+    is_ARM_id_for_resource,
+    is_registry_id_for_resource,
+    is_singularity_full_name_for_resource,
+    is_singularity_id_for_resource,
+    is_singularity_short_name_for_resource,
+)
+from ._run_operations import RunOperations
+from ._virtual_cluster_operations import VirtualClusterOperations
+
+try:
+    pass
+except ImportError:
+    pass
+
+if TYPE_CHECKING:
+    from azure.ai.ml.operations import DatastoreOperations
+
+ops_logger = OpsLogger(__name__)
+module_logger = ops_logger.module_logger
+
+
+class JobOperations(_ScopeDependentOperations):
+    """Initiates an instance of JobOperations
+
+    This class should not be instantiated directly. Instead, use the `jobs` attribute of an MLClient object.
+
+    :param operation_scope: Scope variables for the operations classes of an MLClient object.
+    :type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
+    :param operation_config: Common configuration for operations classes of an MLClient object.
+    :type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
+    :param service_client_02_2023_preview: Service client to allow end users to operate on Azure Machine Learning
+        Workspace resources.
+    :type service_client_02_2023_preview: ~azure.ai.ml._restclient.v2023_02_01_preview.AzureMachineLearningWorkspaces
+    :param all_operations: All operations classes of an MLClient object.
+    :type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer
+    :param credential: Credential to use for authentication.
+    :type credential: ~azure.core.credentials.TokenCredential
+    """
+
+    def __init__(
+        self,
+        operation_scope: OperationScope,
+        operation_config: OperationConfig,
+        service_client_02_2023_preview: ServiceClient022023Preview,
+        all_operations: OperationsContainer,
+        credential: TokenCredential,
+        **kwargs: Any,
+    ) -> None:
+        super(JobOperations, self).__init__(operation_scope, operation_config)
+        ops_logger.update_filter()
+
+        self._operation_2023_02_preview = service_client_02_2023_preview.jobs
+        self._service_client = service_client_02_2023_preview
+        self._all_operations = all_operations
+        self._stream_logs_until_completion = stream_logs_until_completion
+        # Dataplane service clients are lazily created as they are needed
+        self._runs_operations_client: Optional[RunOperations] = None
+        self._dataset_dataplane_operations_client: Optional[DatasetDataplaneOperations] = None
+        self._model_dataplane_operations_client: Optional[ModelDataplaneOperations] = None
+        # Kwargs to propagate to dataplane service clients
+        self._service_client_kwargs = kwargs.pop("_service_client_kwargs", {})
+        self._api_base_url: Optional[str] = None
+        self._container = "azureml"
+        self._credential = credential
+        self._orchestrators = OperationOrchestrator(self._all_operations, self._operation_scope, self._operation_config)
+
+        self.service_client_01_2024_preview = kwargs.pop("service_client_01_2024_preview", None)
+        self.service_client_10_2024_preview = kwargs.pop("service_client_10_2024_preview", None)
+        self.service_client_01_2025_preview = kwargs.pop("service_client_01_2025_preview", None)
+        self._kwargs = kwargs
+
+        self._requests_pipeline: HttpPipeline = kwargs.pop("requests_pipeline")
+
+    @property
+    def _component_operations(self) -> ComponentOperations:
+        return cast(
+            ComponentOperations,
+            self._all_operations.get_operation(  # type: ignore[misc]
+                AzureMLResourceType.COMPONENT, lambda x: isinstance(x, ComponentOperations)
+            ),
+        )
+
+    @property
+    def _compute_operations(self) -> ComputeOperations:
+        return cast(
+            ComputeOperations,
+            self._all_operations.get_operation(  # type: ignore[misc]
+                AzureMLResourceType.COMPUTE, lambda x: isinstance(x, ComputeOperations)
+            ),
+        )
+
+    @property
+    def _virtual_cluster_operations(self) -> VirtualClusterOperations:
+        return cast(
+            VirtualClusterOperations,
+            self._all_operations.get_operation(  # type: ignore[misc]
+                AzureMLResourceType.VIRTUALCLUSTER,
+                lambda x: isinstance(x, VirtualClusterOperations),
+            ),
+        )
+
+    @property
+    def _datastore_operations(self) -> "DatastoreOperations":
+        from azure.ai.ml.operations import DatastoreOperations
+
+        return cast(DatastoreOperations, self._all_operations.all_operations[AzureMLResourceType.DATASTORE])
+
+    @property
+    def _runs_operations(self) -> RunOperations:
+        if not self._runs_operations_client:
+            service_client_run_history = ServiceClientRunHistory(
+                self._credential, base_url=self._api_url, **self._service_client_kwargs
+            )
+            self._runs_operations_client = RunOperations(
+                self._operation_scope, self._operation_config, service_client_run_history
+            )
+        return self._runs_operations_client
+
+    @property
+    def _dataset_dataplane_operations(self) -> DatasetDataplaneOperations:
+        if not self._dataset_dataplane_operations_client:
+            service_client_dataset_dataplane = ServiceClientDatasetDataplane(
+                self._credential, base_url=self._api_url, **self._service_client_kwargs
+            )
+            self._dataset_dataplane_operations_client = DatasetDataplaneOperations(
+                self._operation_scope,
+                self._operation_config,
+                service_client_dataset_dataplane,
+            )
+        return self._dataset_dataplane_operations_client
+
+    @property
+    def _model_dataplane_operations(self) -> ModelDataplaneOperations:
+        if not self._model_dataplane_operations_client:
+            service_client_model_dataplane = ServiceClientModelDataplane(
+                self._credential, base_url=self._api_url, **self._service_client_kwargs
+            )
+            self._model_dataplane_operations_client = ModelDataplaneOperations(
+                self._operation_scope,
+                self._operation_config,
+                service_client_model_dataplane,
+            )
+        return self._model_dataplane_operations_client
+
+    @property
+    def _api_url(self) -> str:
+        if not self._api_base_url:
+            self._api_base_url = self._get_workspace_url(url_key=WorkspaceDiscoveryUrlKey.API)
+        return self._api_base_url
+
+    @distributed_trace
+    @monitor_with_activity(ops_logger, "Job.List", ActivityType.PUBLICAPI)
+    def list(
+        self,
+        *,
+        parent_job_name: Optional[str] = None,
+        list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
+        **kwargs: Any,
+    ) -> Iterable[Job]:
+        """Lists jobs in the workspace.
+
+        :keyword parent_job_name: When provided, only returns jobs that are children of the named job. Defaults to None,
+            listing all jobs in the workspace.
+        :paramtype parent_job_name: Optional[str]
+        :keyword list_view_type: The view type for including/excluding archived jobs. Defaults to
+            ~azure.mgt.machinelearningservices.models.ListViewType.ACTIVE_ONLY, excluding archived jobs.
+        :paramtype list_view_type: ~azure.mgmt.machinelearningservices.models.ListViewType
+        :return: An iterator-like instance of Job objects.
+        :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.Job]
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_list]
+                :end-before: [END job_operations_list]
+                :language: python
+                :dedent: 8
+                :caption: Retrieving a list of the archived jobs in a workspace with parent job named
+                    "iris-dataset-jobs".
+        """
+
+        schedule_defined = kwargs.pop("schedule_defined", None)
+        scheduled_job_name = kwargs.pop("scheduled_job_name", None)
+        max_results = kwargs.pop("max_results", None)
+
+        if parent_job_name:
+            parent_job = self.get(parent_job_name)
+            return self._runs_operations.get_run_children(parent_job.name, max_results=max_results)
+
+        return cast(
+            Iterable[Job],
+            self.service_client_01_2024_preview.jobs.list(
+                self._operation_scope.resource_group_name,
+                self._workspace_name,
+                cls=lambda objs: [self._handle_rest_errors(obj) for obj in objs],
+                list_view_type=list_view_type,
+                scheduled=schedule_defined,
+                schedule_id=scheduled_job_name,
+                **self._kwargs,
+                **kwargs,
+            ),
+        )
+
+    def _handle_rest_errors(self, job_object: Union[JobBase, Run]) -> Optional[Job]:
+        """Handle errors while resolving azureml_id's during list operation.
+
+        :param job_object: The REST object to turn into a Job
+        :type job_object: Union[JobBase, Run]
+        :return: The resolved job
+        :rtype: Optional[Job]
+        """
+        try:
+            return self._resolve_azureml_id(Job._from_rest_object(job_object))
+        except JobParsingError:
+            return None
+
+    @distributed_trace
+    @monitor_with_telemetry_mixin(ops_logger, "Job.Get", ActivityType.PUBLICAPI)
+    def get(self, name: str) -> Job:
+        """Gets a job resource.
+
+        :param name: The name of the job.
+        :type name: str
+        :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+        :raises ~azure.ai.ml.exceptions.UserErrorException: Raised if the name parameter is not a string.
+        :return: Job object retrieved from the service.
+        :rtype: ~azure.ai.ml.entities.Job
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_get]
+                :end-before: [END job_operations_get]
+                :language: python
+                :dedent: 8
+                :caption: Retrieving a job named "iris-dataset-job-1".
+        """
+        if not isinstance(name, str):
+            raise UserErrorException(f"{name} is a invalid input for client.jobs.get().")
+        job_object = self._get_job(name)
+
+        job: Any = None
+        if not _is_pipeline_child_job(job_object):
+            job = Job._from_rest_object(job_object)
+            if job_object.properties.job_type != RestJobType.AUTO_ML:
+                # resolvers do not work with the old contract, leave the ids as is
+                job = self._resolve_azureml_id(job)
+        else:
+            # Child jobs are no longer available through MFE, fetch
+            # through run history instead
+            job = self._runs_operations._translate_from_rest_object(self._runs_operations.get_run(name))
+
+        return job
+
+    @distributed_trace
+    @monitor_with_telemetry_mixin(ops_logger, "Job.ShowServices", ActivityType.PUBLICAPI)
+    def show_services(self, name: str, node_index: int = 0) -> Optional[Dict[str, ServiceInstance]]:
+        """Gets services associated with a job's node.
+
+        :param name: The name of the job.
+        :type name: str
+        :param node_index: The node's index (zero-based). Defaults to 0.
+        :type node_index: int
+        :return: The services associated with the job for the given node.
+        :rtype: dict[str, ~azure.ai.ml.entities.ServiceInstance]
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_show_services]
+                :end-before: [END job_operations_show_services]
+                :language: python
+                :dedent: 8
+                :caption: Retrieving the services associated with a job's 1st node.
+        """
+
+        service_instances_dict = self._runs_operations._operation.get_run_service_instances(
+            self._subscription_id,
+            self._operation_scope.resource_group_name,
+            self._workspace_name,
+            name,
+            node_index,
+        )
+        if not service_instances_dict.instances:
+            return None
+
+        return {
+            k: ServiceInstance._from_rest_object(v, node_index) for k, v in service_instances_dict.instances.items()
+        }
+
+    @distributed_trace
+    @monitor_with_activity(ops_logger, "Job.Cancel", ActivityType.PUBLICAPI)
+    def begin_cancel(self, name: str, **kwargs: Any) -> LROPoller[None]:
+        """Cancels a job.
+
+        :param name: The name of the job.
+        :type name: str
+        :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+        :return: A poller to track the operation status.
+        :rtype: ~azure.core.polling.LROPoller[None]
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_begin_cancel]
+                :end-before: [END job_operations_begin_cancel]
+                :language: python
+                :dedent: 8
+                :caption: Canceling the job named "iris-dataset-job-1" and checking the poller for status.
+        """
+        tag = kwargs.pop("tag", None)
+
+        if not tag:
+            return self._operation_2023_02_preview.begin_cancel(
+                id=name,
+                resource_group_name=self._operation_scope.resource_group_name,
+                workspace_name=self._workspace_name,
+                **self._kwargs,
+                **kwargs,
+            )
+
+        # Note: Below batch cancel is experimental and for private usage
+        results = []
+        jobs = self.list(tag=tag)
+        # TODO: Do we need to show error message when no jobs is returned for the given tag?
+        for job in jobs:
+            result = self._operation_2023_02_preview.begin_cancel(
+                id=job.name,
+                resource_group_name=self._operation_scope.resource_group_name,
+                workspace_name=self._workspace_name,
+                **self._kwargs,
+            )
+            results.append(result)
+        return results
+
+    def _try_get_compute_arm_id(self, compute: Union[Compute, str]) -> Optional[Union[Compute, str]]:
+        # pylint: disable=too-many-return-statements
+        # TODO: Remove in PuP with native import job/component type support in MFE/Designer
+        # DataFactory 'clusterless' job
+        if str(compute) == ComputeType.ADF:
+            return compute
+
+        if compute is not None:
+            # Singularity
+            if isinstance(compute, str) and is_singularity_id_for_resource(compute):
+                return self._virtual_cluster_operations.get(compute)["id"]
+            if isinstance(compute, str) and is_singularity_full_name_for_resource(compute):
+                return self._orchestrators._get_singularity_arm_id_from_full_name(compute)
+            if isinstance(compute, str) and is_singularity_short_name_for_resource(compute):
+                return self._orchestrators._get_singularity_arm_id_from_short_name(compute)
+            # other compute
+            if is_ARM_id_for_resource(compute, resource_type=AzureMLResourceType.COMPUTE):
+                # compute is not a sub-workspace resource
+                compute_name = compute.split("/")[-1]  # type: ignore
+            elif isinstance(compute, Compute):
+                compute_name = compute.name
+            elif isinstance(compute, str):
+                compute_name = compute
+            elif isinstance(compute, PipelineInput):
+                compute_name = str(compute)
+            else:
+                raise ValueError(
+                    "compute must be either an arm id of Compute, a Compute object or a compute name but"
+                    f" got {type(compute)}"
+                )
+
+            if is_data_binding_expression(compute_name):
+                return compute_name
+            if compute_name == SERVERLESS_COMPUTE:
+                return compute_name
+            try:
+                return self._compute_operations.get(compute_name).id
+            except ResourceNotFoundError as e:
+                # the original error is not helpful (Operation returned an invalid status 'Not Found'),
+                # so we raise a more helpful one
+                response = e.response
+                response.reason = "Not found compute with name {}".format(compute_name)
+                raise ResourceNotFoundError(response=response) from e
+        return None
+
+    @distributed_trace
+    @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.PUBLICAPI)
+    def validate(self, job: Job, *, raise_on_failure: bool = False, **kwargs: Any) -> ValidationResult:
+        """Validates a Job object before submitting to the service. Anonymous assets may be created if there are inline
+        defined entities such as Component, Environment, and Code. Only pipeline jobs are supported for validation
+        currently.
+
+        :param job: The job object to be validated.
+        :type job: ~azure.ai.ml.entities.Job
+        :keyword raise_on_failure: Specifies if an error should be raised if validation fails. Defaults to False.
+        :paramtype raise_on_failure: bool
+        :return: A ValidationResult object containing all found errors.
+        :rtype: ~azure.ai.ml.entities.ValidationResult
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_validate]
+                :end-before: [END job_operations_validate]
+                :language: python
+                :dedent: 8
+                :caption: Validating a PipelineJob object and printing out the found errors.
+        """
+        return self._validate(job, raise_on_failure=raise_on_failure, **kwargs)
+
+    @monitor_with_telemetry_mixin(ops_logger, "Job.Validate", ActivityType.INTERNALCALL)
+    def _validate(
+        self,
+        job: Job,
+        *,
+        raise_on_failure: bool = False,
+        # pylint:disable=unused-argument
+        **kwargs: Any,
+    ) -> ValidationResult:
+        """Implementation of validate.
+
+        Add this function to avoid calling validate() directly in
+        create_or_update(), which will impact telemetry statistics &
+        bring experimental warning in create_or_update().
+
+        :param job: The job to validate
+        :type job: Job
+        :keyword raise_on_failure: Whether to raise on validation failure
+        :paramtype raise_on_failure: bool
+        :return: The validation result
+        :rtype: ValidationResult
+        """
+        git_code_validation_result = PathAwareSchemaValidatableMixin._create_empty_validation_result()
+        # TODO: move this check to Job._validate after validation is supported for all job types
+        # If private features are enable and job has code value of type str we need to check
+        # that it is a valid git path case. Otherwise we should throw a ValidationException
+        # saying that the code value is not a valid code value
+        if (
+            hasattr(job, "code")
+            and job.code is not None
+            and isinstance(job.code, str)
+            and job.code.startswith(GIT_PATH_PREFIX)
+            and not is_private_preview_enabled()
+        ):
+            git_code_validation_result.append_error(
+                message=f"Invalid code value: {job.code}. Git paths are not supported.",
+                yaml_path="code",
+            )
+
+        if not isinstance(job, PathAwareSchemaValidatableMixin):
+
+            def error_func(msg: str, no_personal_data_msg: str) -> ValidationException:
+                return ValidationException(
+                    message=msg,
+                    no_personal_data_message=no_personal_data_msg,
+                    error_target=ErrorTarget.JOB,
+                    error_category=ErrorCategory.USER_ERROR,
+                )
+
+            return git_code_validation_result.try_raise(
+                raise_error=raise_on_failure,
+                error_func=error_func,
+            )
+
+        validation_result = job._validate(raise_error=raise_on_failure)
+        validation_result.merge_with(git_code_validation_result)
+        # fast return to avoid remote call if local validation not passed
+        # TODO: use remote call to validate the entire job after MFE API is ready
+        if validation_result.passed and isinstance(job, PipelineJob):
+            try:
+                job.compute = self._try_get_compute_arm_id(job.compute)
+            except Exception as e:  # pylint: disable=W0718
+                validation_result.append_error(yaml_path="compute", message=str(e))
+
+            for node_name, node in job.jobs.items():
+                try:
+                    # TODO(1979547): refactor, not all nodes have compute
+                    if not isinstance(node, ControlFlowNode):
+                        node.compute = self._try_get_compute_arm_id(node.compute)
+                except Exception as e:  # pylint: disable=W0718
+                    validation_result.append_error(yaml_path=f"jobs.{node_name}.compute", message=str(e))
+
+        validation_result.resolve_location_for_diagnostics(str(job._source_path))
+        return job._try_raise(validation_result, raise_error=raise_on_failure)  # pylint: disable=protected-access
+
+    @distributed_trace
+    @monitor_with_telemetry_mixin(ops_logger, "Job.CreateOrUpdate", ActivityType.PUBLICAPI)
+    def create_or_update(
+        self,
+        job: Job,
+        *,
+        description: Optional[str] = None,
+        compute: Optional[str] = None,
+        tags: Optional[dict] = None,
+        experiment_name: Optional[str] = None,
+        skip_validation: bool = False,
+        **kwargs: Any,
+    ) -> Job:
+        """Creates or updates a job. If entities such as Environment or Code are defined inline, they'll be created
+        together with the job.
+
+        :param job: The job object.
+        :type job: ~azure.ai.ml.entities.Job
+        :keyword description: The job description.
+        :paramtype description: Optional[str]
+        :keyword compute: The compute target for the job.
+        :paramtype compute: Optional[str]
+        :keyword tags: The tags for the job.
+        :paramtype tags: Optional[dict]
+        :keyword experiment_name: The name of the experiment the job will be created under. If None is provided,
+            job will be created under experiment 'Default'.
+        :paramtype experiment_name: Optional[str]
+        :keyword skip_validation: Specifies whether or not to skip validation before creating or updating the job. Note
+            that validation for dependent resources such as an anonymous component will not be skipped. Defaults to
+            False.
+        :paramtype skip_validation: bool
+        :raises Union[~azure.ai.ml.exceptions.UserErrorException, ~azure.ai.ml.exceptions.ValidationException]: Raised
+            if Job cannot be successfully validated. Details will be provided in the error message.
+        :raises ~azure.ai.ml.exceptions.AssetException: Raised if Job assets
+            (e.g. Data, Code, Model, Environment) cannot be successfully validated.
+            Details will be provided in the error message.
+        :raises ~azure.ai.ml.exceptions.ModelException: Raised if Job model cannot be successfully validated.
+            Details will be provided in the error message.
+        :raises ~azure.ai.ml.exceptions.JobException: Raised if Job object or attributes correctly formatted.
+            Details will be provided in the error message.
+        :raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty
+            directory.
+        :raises ~azure.ai.ml.exceptions.DockerEngineNotAvailableError: Raised if Docker Engine is not available for
+            local job.
+        :return: Created or updated job.
+        :rtype: ~azure.ai.ml.entities.Job
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_create_and_update]
+                :end-before: [END job_operations_create_and_update]
+                :language: python
+                :dedent: 8
+                :caption: Creating a new job and then updating its compute.
+        """
+        if isinstance(job, BaseNode) and not (
+            isinstance(job, (Command, Spark))
+        ):  # Command/Spark objects can be used directly
+            job = job._to_job()
+
+        # Set job properties before submission
+        if description is not None:
+            job.description = description
+        if compute is not None:
+            job.compute = compute
+        if tags is not None:
+            job.tags = tags
+        if experiment_name is not None:
+            job.experiment_name = experiment_name
+
+        if job.compute == LOCAL_COMPUTE_TARGET:
+            job.environment_variables[COMMON_RUNTIME_ENV_VAR] = "true"  # type: ignore
+
+        # TODO: why we put log logic here instead of inside self._validate()?
+        try:
+            if not skip_validation:
+                self._validate(job, raise_on_failure=True)
+
+            # Create all dependent resources
+            self._resolve_arm_id_or_upload_dependencies(job)
+        except (ValidationException, ValidationError) as ex:
+            log_and_raise_error(ex)
+
+        git_props = get_git_properties()
+        # Do not add git props if they already exist in job properties.
+        # This is for update specifically-- if the user switches branches and tries to update
+        # their job, the request will fail since the git props will be repopulated.
+        # MFE does not allow existing properties to be updated, only for new props to be added
+        if not any(prop_name in job.properties for prop_name in git_props):
+            job.properties = {**job.properties, **git_props}
+        rest_job_resource = to_rest_job_object(job)
+
+        # Make a copy of self._kwargs instead of contaminate the original one
+        kwargs = {**self._kwargs}
+        # set headers with user aml token if job is a pipeline or has a user identity setting
+        if (rest_job_resource.properties.job_type == RestJobType.PIPELINE) or (
+            hasattr(rest_job_resource.properties, "identity")
+            and (isinstance(rest_job_resource.properties.identity, UserIdentity))
+        ):
+            self._set_headers_with_user_aml_token(kwargs)
+
+        result = self._create_or_update_with_different_version_api(rest_job_resource=rest_job_resource, **kwargs)
+
+        if is_local_run(result):
+            ws_base_url = self._all_operations.all_operations[
+                AzureMLResourceType.WORKSPACE
+            ]._operation._client._base_url
+            snapshot_id = start_run_if_local(
+                result,
+                self._credential,
+                ws_base_url,
+                self._requests_pipeline,
+            )
+            # in case of local run, the first create/update call to MFE returns the
+            # request for submitting to ES. Once we request to ES and start the run, we
+            # need to put the same body to MFE to append user tags etc.
+            if rest_job_resource.properties.job_type == RestJobType.PIPELINE:
+                job_object = self._get_job_2401(rest_job_resource.name)
+            else:
+                job_object = self._get_job(rest_job_resource.name)
+            if result.properties.tags is not None:
+                for tag_name, tag_value in rest_job_resource.properties.tags.items():
+                    job_object.properties.tags[tag_name] = tag_value
+            if result.properties.properties is not None:
+                for (
+                    prop_name,
+                    prop_value,
+                ) in rest_job_resource.properties.properties.items():
+                    job_object.properties.properties[prop_name] = prop_value
+            if snapshot_id is not None:
+                job_object.properties.properties["ContentSnapshotId"] = snapshot_id
+
+            result = self._create_or_update_with_different_version_api(rest_job_resource=job_object, **kwargs)
+
+        return self._resolve_azureml_id(Job._from_rest_object(result))
+
+    def _create_or_update_with_different_version_api(self, rest_job_resource: JobBase, **kwargs: Any) -> JobBase:
+        service_client_operation = self._operation_2023_02_preview
+        if rest_job_resource.properties.job_type == RestJobType_20241001Preview.FINE_TUNING:
+            service_client_operation = self.service_client_10_2024_preview.jobs
+        if rest_job_resource.properties.job_type == RestJobType.PIPELINE:
+            service_client_operation = self.service_client_01_2024_preview.jobs
+        if rest_job_resource.properties.job_type == RestJobType.AUTO_ML:
+            service_client_operation = self.service_client_01_2024_preview.jobs
+        if rest_job_resource.properties.job_type == RestJobType.SWEEP:
+            service_client_operation = self.service_client_01_2024_preview.jobs
+        if rest_job_resource.properties.job_type == RestJobType.COMMAND:
+            service_client_operation = self.service_client_01_2025_preview.jobs
+
+        result = service_client_operation.create_or_update(
+            id=rest_job_resource.name,
+            resource_group_name=self._operation_scope.resource_group_name,
+            workspace_name=self._workspace_name,
+            body=rest_job_resource,
+            **kwargs,
+        )
+
+        return result
+
+    def _create_or_update_with_latest_version_api(self, rest_job_resource: JobBase, **kwargs: Any) -> JobBase:
+        service_client_operation = self.service_client_01_2024_preview.jobs
+        result = service_client_operation.create_or_update(
+            id=rest_job_resource.name,
+            resource_group_name=self._operation_scope.resource_group_name,
+            workspace_name=self._workspace_name,
+            body=rest_job_resource,
+            **kwargs,
+        )
+
+        return result
+
+    def _archive_or_restore(self, name: str, is_archived: bool) -> None:
+        job_object = self._get_job(name)
+        if job_object.properties.job_type == RestJobType.PIPELINE:
+            job_object = self._get_job_2401(name)
+        if _is_pipeline_child_job(job_object):
+            raise PipelineChildJobError(job_id=job_object.id)
+        job_object.properties.is_archived = is_archived
+
+        self._create_or_update_with_different_version_api(rest_job_resource=job_object)
+
+    @distributed_trace
+    @monitor_with_telemetry_mixin(ops_logger, "Job.Archive", ActivityType.PUBLICAPI)
+    def archive(self, name: str) -> None:
+        """Archives a job.
+
+        :param name: The name of the job.
+        :type name: str
+        :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_archive]
+                :end-before: [END job_operations_archive]
+                :language: python
+                :dedent: 8
+                :caption: Archiving a job.
+        """
+
+        self._archive_or_restore(name=name, is_archived=True)
+
+    @distributed_trace
+    @monitor_with_telemetry_mixin(ops_logger, "Job.Restore", ActivityType.PUBLICAPI)
+    def restore(self, name: str) -> None:
+        """Restores an archived job.
+
+        :param name: The name of the job.
+        :type name: str
+        :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_restore]
+                :end-before: [END job_operations_restore]
+                :language: python
+                :dedent: 8
+                :caption: Restoring an archived job.
+        """
+
+        self._archive_or_restore(name=name, is_archived=False)
+
+    @distributed_trace
+    @monitor_with_activity(ops_logger, "Job.Stream", ActivityType.PUBLICAPI)
+    def stream(self, name: str) -> None:
+        """Streams the logs of a running job.
+
+        :param name: The name of the job.
+        :type name: str
+        :raises azure.core.exceptions.ResourceNotFoundError: Raised if no job with the given name can be found.
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_stream_logs]
+                :end-before: [END job_operations_stream_logs]
+                :language: python
+                :dedent: 8
+                :caption: Streaming a running job.
+        """
+        job_object = self._get_job(name)
+
+        if _is_pipeline_child_job(job_object):
+            raise PipelineChildJobError(job_id=job_object.id)
+
+        self._stream_logs_until_completion(
+            self._runs_operations,
+            job_object,
+            self._datastore_operations,
+            requests_pipeline=self._requests_pipeline,
+        )
+
+    @distributed_trace
+    @monitor_with_activity(ops_logger, "Job.Download", ActivityType.PUBLICAPI)
+    def download(
+        self,
+        name: str,
+        *,
+        download_path: Union[PathLike, str] = ".",
+        output_name: Optional[str] = None,
+        all: bool = False,  # pylint: disable=redefined-builtin
+    ) -> None:
+        """Downloads the logs and output of a job.
+
+        :param name: The name of a job.
+        :type name: str
+        :keyword download_path: The local path to be used as the download destination. Defaults to ".".
+        :paramtype download_path: Union[PathLike, str]
+        :keyword output_name: The name of the output to download. Defaults to None.
+        :paramtype output_name: Optional[str]
+        :keyword all: Specifies if all logs and named outputs should be downloaded. Defaults to False.
+        :paramtype all: bool
+        :raises ~azure.ai.ml.exceptions.JobException: Raised if Job is not yet in a terminal state.
+            Details will be provided in the error message.
+        :raises ~azure.ai.ml.exceptions.MlException: Raised if logs and outputs cannot be successfully downloaded.
+            Details will be provided in the error message.
+
+        .. admonition:: Example:
+
+            .. literalinclude:: ../samples/ml_samples_misc.py
+                :start-after: [START job_operations_download]
+                :end-before: [END job_operations_download]
+                :language: python
+                :dedent: 8
+                :caption: Downloading all logs and named outputs of the job "job-1" into local directory "job-1-logs".
+        """
+        job_details = self.get(name)
+        # job is reused, get reused job to download
+        if (
+            job_details.properties.get(PipelineConstants.REUSED_FLAG_FIELD) == PipelineConstants.REUSED_FLAG_TRUE
+            and PipelineConstants.REUSED_JOB_ID in job_details.properties
+        ):
+            reused_job_name = job_details.properties[PipelineConstants.REUSED_JOB_ID]
+            reused_job_detail = self.get(reused_job_name)
+            module_logger.info(
+                "job %s reuses previous job %s, download from the reused job.",
+                name,
+                reused_job_name,
+            )
+            name, job_details = reused_job_name, reused_job_detail
+        job_status = job_details.status
+        if job_status not in RunHistoryConstants.TERMINAL_STATUSES:
+            msg = "This job is in state {}. Download is allowed only in states {}".format(
+                job_status, RunHistoryConstants.TERMINAL_STATUSES
+            )
+            raise JobException(
+                message=msg,
+                target=ErrorTarget.JOB,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+        is_batch_job = (
+            job_details.tags.get("azureml.batchrun", None) == "true"
+            and job_details.tags.get("azureml.jobtype", None) != PipelineConstants.PIPELINE_JOB_TYPE
+        )
+        outputs = {}
+        download_path = Path(download_path)
+        artifact_directory_name = "artifacts"
+        output_directory_name = "named-outputs"
+
+        def log_missing_uri(what: str) -> None:
+            module_logger.debug(
+                'Could not download %s for job "%s" (job status: %s)',
+                what,
+                job_details.name,
+                job_details.status,
+            )
+
+        if isinstance(job_details, SweepJob):
+            best_child_run_id = job_details.properties.get(SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, None)
+            if best_child_run_id:
+                self.download(
+                    best_child_run_id,
+                    download_path=download_path,
+                    output_name=output_name,
+                    all=all,
+                )
+            else:
+                log_missing_uri(what="from best child run")
+
+            if output_name:
+                # Don't need to download anything from the parent
+                return
+            # only download default artifacts (logs + default outputs) from parent
+            artifact_directory_name = "hd-artifacts"
+            output_name = None
+            all = False
+
+        if is_batch_job:
+            scoring_uri = self._get_batch_job_scoring_output_uri(job_details.name)
+            if scoring_uri:
+                outputs = {BATCH_JOB_CHILD_RUN_OUTPUT_NAME: scoring_uri}
+            else:
+                log_missing_uri("batch job scoring file")
+        elif output_name:
+            outputs = self._get_named_output_uri(name, output_name)
+
+            if output_name not in outputs:
+                log_missing_uri(what=f'output "{output_name}"')
+        elif all:
+            outputs = self._get_named_output_uri(name)
+
+            if DEFAULT_ARTIFACT_STORE_OUTPUT_NAME not in outputs:
+                log_missing_uri(what="logs")
+        else:
+            outputs = self._get_named_output_uri(name, DEFAULT_ARTIFACT_STORE_OUTPUT_NAME)
+
+            if DEFAULT_ARTIFACT_STORE_OUTPUT_NAME not in outputs:
+                log_missing_uri(what="logs")
+
+        # Download all requested artifacts
+        for item_name, uri in outputs.items():
+            if is_batch_job:
+                destination = download_path
+            elif item_name == DEFAULT_ARTIFACT_STORE_OUTPUT_NAME:
+                destination = download_path / artifact_directory_name
+            else:
+                destination = download_path / output_directory_name / item_name
+
+            module_logger.info("Downloading artifact %s to %s", uri, destination)
+            download_artifact_from_aml_uri(
+                uri=uri,
+                destination=destination,  # type: ignore[arg-type]
+                datastore_operation=self._datastore_operations,
+            )
+
+    def _get_named_output_uri(
+        self, job_name: Optional[str], output_names: Optional[Union[Iterable[str], str]] = None
+    ) -> Dict[str, str]:
+        """Gets the URIs to the specified named outputs of job.
+
+        :param job_name: Run ID of the job
+        :type job_name: str
+        :param output_names: Either an output name, or an iterable of output names. If omitted, all outputs are
+            returned.
+        :type output_names: Optional[Union[Iterable[str], str]]
+        :return: Map of output_names to URIs. Note that URIs that could not be found will not be present in the map.
+        :rtype: Dict[str, str]
+        """
+
+        if isinstance(output_names, str):
+            output_names = {output_names}
+        elif output_names:
+            output_names = set(output_names)
+
+        outputs = get_job_output_uris_from_dataplane(
+            job_name,
+            self._runs_operations,
+            self._dataset_dataplane_operations,
+            self._model_dataplane_operations,
+            output_names=output_names,
+        )
+
+        missing_outputs: Set = set()
+        if output_names is not None:
+            missing_outputs = set(output_names).difference(outputs.keys())
+        else:
+            missing_outputs = set().difference(outputs.keys())
+
+        # Include default artifact store in outputs
+        if (not output_names) or DEFAULT_ARTIFACT_STORE_OUTPUT_NAME in missing_outputs:
+            try:
+                job = self.get(job_name)
+                artifact_store_uri = job.outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME]
+                if artifact_store_uri is not None and artifact_store_uri.path:
+                    outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] = artifact_store_uri.path
+            except (AttributeError, KeyError):
+                outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] = SHORT_URI_FORMAT.format(
+                    "workspaceartifactstore", f"ExperimentRun/dcid.{job_name}/"
+                )
+            missing_outputs.discard(DEFAULT_ARTIFACT_STORE_OUTPUT_NAME)
+
+        # A job's output is not always reported in the outputs dict, but
+        # doesn't currently have a user configurable location.
+        # Perform a search of known paths to find output
+        # TODO: Remove once job output locations are reliably returned from the service
+
+        default_datastore = self._datastore_operations.get_default().name
+
+        for name in missing_outputs:
+            potential_uris = [
+                SHORT_URI_FORMAT.format(default_datastore, f"azureml/{job_name}/{name}/"),
+                SHORT_URI_FORMAT.format(default_datastore, f"dataset/{job_name}/{name}/"),
+            ]
+
+            for potential_uri in potential_uris:
+                if aml_datastore_path_exists(potential_uri, self._datastore_operations):
+                    outputs[name] = potential_uri
+                    break
+
+        return outputs
+
+    def _get_batch_job_scoring_output_uri(self, job_name: str) -> Optional[str]:
+        uri = None
+        # Download scoring output, which is the "score" output of the child job named "batchscoring"
+        # Batch Jobs are pipeline jobs with only one child, so this should terminate after an iteration
+        for child in self._runs_operations.get_run_children(job_name):
+            uri = self._get_named_output_uri(child.name, BATCH_JOB_CHILD_RUN_OUTPUT_NAME).get(
+                BATCH_JOB_CHILD_RUN_OUTPUT_NAME, None
+            )
+            # After the correct child is found, break to prevent unnecessary looping
+            if uri is not None:
+                break
+        return uri
+
+    def _get_job(self, name: str) -> JobBase:
+        job = self.service_client_01_2024_preview.jobs.get(
+            id=name,
+            resource_group_name=self._operation_scope.resource_group_name,
+            workspace_name=self._workspace_name,
+            **self._kwargs,
+        )
+
+        if (
+            hasattr(job, "properties")
+            and job.properties
+            and hasattr(job.properties, "job_type")
+            and job.properties.job_type == RestJobType_20241001Preview.FINE_TUNING
+        ):
+            return self.service_client_10_2024_preview.jobs.get(
+                id=name,
+                resource_group_name=self._operation_scope.resource_group_name,
+                workspace_name=self._workspace_name,
+                **self._kwargs,
+            )
+
+        return job
+
+    # Upgrade api from 2023-04-01-preview to 2024-01-01-preview for pipeline job
+    # We can remove this function once `_get_job` function has also been upgraded to the same version with pipeline
+    def _get_job_2401(self, name: str) -> JobBase_2401:
+        service_client_operation = self.service_client_01_2024_preview.jobs
+        return service_client_operation.get(
+            id=name,
+            resource_group_name=self._operation_scope.resource_group_name,
+            workspace_name=self._workspace_name,
+            **self._kwargs,
+        )
+
+    def _get_workspace_url(self, url_key: WorkspaceDiscoveryUrlKey) -> str:
+        discovery_url = (
+            self._all_operations.all_operations[AzureMLResourceType.WORKSPACE]
+            .get(self._operation_scope.workspace_name)
+            .discovery_url
+        )
+        all_urls = json.loads(
+            download_text_from_url(
+                discovery_url,
+                create_requests_pipeline_with_retry(requests_pipeline=self._requests_pipeline),
+            )
+        )
+        return all_urls[url_key]
+
+    def _resolve_arm_id_or_upload_dependencies(self, job: Job) -> None:
+        """This method converts name or name:version to ARM id. Or it
+        registers/uploads nested dependencies.
+
+        :param job: the job resource entity
+        :type job: Job
+        :return: the job resource entity that nested dependencies are resolved
+        :rtype: Job
+        """
+
+        self._resolve_arm_id_or_azureml_id(job, self._orchestrators.get_asset_arm_id)
+
+        if isinstance(job, PipelineJob):
+            # Resolve top-level inputs
+            self._resolve_job_inputs(self._flatten_group_inputs(job.inputs), job._base_path)
+            # inputs in sub-pipelines has been resolved in
+            # self._resolve_arm_id_or_azureml_id(job, self._orchestrators.get_asset_arm_id)
+            # as they are part of the pipeline component
+        elif isinstance(job, AutoMLJob):
+            self._resolve_automl_job_inputs(job)
+        elif isinstance(job, FineTuningJob):
+            self._resolve_finetuning_job_inputs(job)
+        elif isinstance(job, DistillationJob):
+            self._resolve_distillation_job_inputs(job)
+        elif isinstance(job, Spark):
+            self._resolve_job_inputs(job._job_inputs.values(), job._base_path)
+        elif isinstance(job, Command):
+            # TODO: switch to use inputs of Command objects, once the inputs/outputs building
+            # logic is removed from the BaseNode constructor.
+            try:
+                self._resolve_job_inputs(job._job_inputs.values(), job._base_path)
+            except AttributeError:
+                # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs
+                pass
+        else:
+            try:
+                self._resolve_job_inputs(job.inputs.values(), job._base_path)  # type: ignore
+            except AttributeError:
+                # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs
+                pass
+
+    def _resolve_automl_job_inputs(self, job: AutoMLJob) -> None:
+        """This method resolves the inputs for AutoML jobs.
+
+        :param job: the job resource entity
+        :type job: AutoMLJob
+        """
+        if isinstance(job, AutoMLJob):
+            self._resolve_job_input(job.training_data, job._base_path)
+            if job.validation_data is not None:
+                self._resolve_job_input(job.validation_data, job._base_path)
+            if hasattr(job, "test_data") and job.test_data is not None:
+                self._resolve_job_input(job.test_data, job._base_path)
+
+    def _resolve_finetuning_job_inputs(self, job: FineTuningJob) -> None:
+        """This method resolves the inputs for FineTuning jobs.
+
+        :param job: the job resource entity
+        :type job: FineTuningJob
+        """
+        from azure.ai.ml.entities._job.finetuning.finetuning_vertical import FineTuningVertical
+
+        if isinstance(job, FineTuningVertical):
+            # self._resolve_job_input(job.model, job._base_path)
+            self._resolve_job_input(job.training_data, job._base_path)
+            if job.validation_data is not None:
+                self._resolve_job_input(job.validation_data, job._base_path)
+
+    def _resolve_distillation_job_inputs(self, job: DistillationJob) -> None:
+        """This method resolves the inputs for Distillation jobs.
+
+        :param job: the job resource entity
+        :type job: DistillationJob
+        """
+        if isinstance(job, DistillationJob):
+            if job.training_data:
+                self._resolve_job_input(job.training_data, job._base_path)
+            if job.validation_data is not None:
+                self._resolve_job_input(job.validation_data, job._base_path)
+
+    def _resolve_azureml_id(self, job: Job) -> Job:
+        """This method converts ARM id to name or name:version for nested
+        entities.
+
+        :param job: the job resource entity
+        :type job: Job
+        :return: the job resource entity that nested dependencies are resolved
+        :rtype: Job
+        """
+        self._append_tid_to_studio_url(job)
+        self._resolve_job_inputs_arm_id(job)
+        return self._resolve_arm_id_or_azureml_id(job, self._orchestrators.resolve_azureml_id)
+
+    def _resolve_compute_id(self, resolver: _AssetResolver, target: Any) -> Any:
+        # special case for local runs
+        if target is not None and target.lower() == LOCAL_COMPUTE_TARGET:
+            return LOCAL_COMPUTE_TARGET
+        try:
+            modified_target_name = target
+            if target.lower().startswith(AzureMLResourceType.VIRTUALCLUSTER + "/"):
+                # Compute target can be either workspace-scoped compute type,
+                # or AML scoped VC. In the case of VC, resource name will be of form
+                # azureml:virtualClusters/<name> to disambiguate from azureml:name (which is always compute)
+                modified_target_name = modified_target_name[len(AzureMLResourceType.VIRTUALCLUSTER) + 1 :]
+                modified_target_name = LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT.format(
+                    self._operation_scope.subscription_id,
+                    self._operation_scope.resource_group_name,
+                    AZUREML_RESOURCE_PROVIDER,
+                    AzureMLResourceType.VIRTUALCLUSTER,
+                    modified_target_name,
+                )
+            return resolver(
+                modified_target_name,
+                azureml_type=AzureMLResourceType.VIRTUALCLUSTER,
+                sub_workspace_resource=False,
+            )
+        except Exception:  # pylint: disable=W0718
+            return resolver(target, azureml_type=AzureMLResourceType.COMPUTE)
+
+    def _resolve_job_inputs(self, entries: Iterable[Union[Input, str, bool, int, float]], base_path: str) -> None:
+        """resolve job inputs as ARM id or remote url.
+
+        :param entries: An iterable of job inputs
+        :type entries: Iterable[Union[Input, str, bool, int, float]]
+        :param base_path: The base path
+        :type base_path: str
+        """
+        for entry in entries:
+            self._resolve_job_input(entry, base_path)
+
+    # TODO: move it to somewhere else?
+    @classmethod
+    def _flatten_group_inputs(
+        cls, inputs: Dict[str, Union[Input, str, bool, int, float]]
+    ) -> List[Union[Input, str, bool, int, float]]:
+        """Get flatten values from an InputDict.
+
+        :param inputs: The input dict
+        :type inputs: Dict[str, Union[Input, str, bool, int, float]]
+        :return: A list of values from the Input Dict
+        :rtype: List[Union[Input, str, bool, int, float]]
+        """
+        input_values: List = []
+        # Flatten inputs for pipeline job
+        for key, item in inputs.items():
+            if isinstance(item, _GroupAttrDict):
+                input_values.extend(item.flatten(group_parameter_name=key))
+            else:
+                if not isinstance(item, (str, bool, int, float)):
+                    # skip resolving inferred optional input without path (in do-while + dynamic input case)
+                    if isinstance(item._data, Input):  # type: ignore
+                        if not item._data.path and item._meta._is_inferred_optional:  # type: ignore
+                            continue
+                    input_values.append(item._data)  # type: ignore
+        return input_values
+
+    def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_path: str) -> None:
+        """resolve job input as ARM id or remote url.
+
+        :param entry: The job input
+        :type entry: Union[Input, str, bool, int, float]
+        :param base_path: The base path
+        :type base_path: str
+        """
+
+        # path can be empty if the job was created from builder functions
+        if isinstance(entry, Input) and not entry.path:
+            msg = "Input path can't be empty for jobs."
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.JOB,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.MISSING_FIELD,
+            )
+
+        if (
+            not isinstance(entry, Input)
+            or is_ARM_id_for_resource(entry.path)
+            or is_url(entry.path)
+            or is_data_binding_expression(entry.path)  # literal value but set mode in pipeline yaml
+        ):  # Literal value, ARM id or remote url. Pass through
+            return
+        try:
+            datastore_name = (
+                entry.datastore if hasattr(entry, "datastore") and entry.datastore else WORKSPACE_BLOB_STORE
+            )
+
+            # absolute local path, upload, transform to remote url
+            if os.path.isabs(entry.path):  # type: ignore
+                if entry.type == AssetTypes.URI_FOLDER and not os.path.isdir(entry.path):  # type: ignore
+                    raise ValidationException(
+                        message="There is no dir on target path: {}".format(entry.path),
+                        target=ErrorTarget.JOB,
+                        no_personal_data_message="There is no dir on target path",
+                        error_category=ErrorCategory.USER_ERROR,
+                        error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+                    )
+                if entry.type == AssetTypes.URI_FILE and not os.path.isfile(entry.path):  # type: ignore
+                    raise ValidationException(
+                        message="There is no file on target path: {}".format(entry.path),
+                        target=ErrorTarget.JOB,
+                        no_personal_data_message="There is no file on target path",
+                        error_category=ErrorCategory.USER_ERROR,
+                        error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+                    )
+                # absolute local path
+                entry.path = _upload_and_generate_remote_uri(
+                    self._operation_scope,
+                    self._datastore_operations,
+                    entry.path,
+                    datastore_name=datastore_name,
+                    show_progress=self._show_progress,
+                )
+                # TODO : Move this part to a common place
+                if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"):
+                    entry.path = entry.path + "/"
+            # Check for AzureML id, is there a better way?
+            elif ":" in entry.path or "@" in entry.path:  # type: ignore
+                asset_type = AzureMLResourceType.DATA
+                if entry.type in [AssetTypes.MLFLOW_MODEL, AssetTypes.CUSTOM_MODEL]:
+                    asset_type = AzureMLResourceType.MODEL
+
+                entry.path = self._orchestrators.get_asset_arm_id(entry.path, asset_type)  # type: ignore
+            else:  # relative local path, upload, transform to remote url
+                # Base path will be None for dsl pipeline component for now. We have 2 choices if the dsl pipeline
+                # function is imported from another file:
+                # 1) Use cwd as default base path;
+                # 2) Use the file path of the dsl pipeline function as default base path.
+                # Pick solution 1 for now as defining input path in the script to submit is a more common scenario.
+                local_path = Path(base_path or Path.cwd(), entry.path).resolve()  # type: ignore
+                entry.path = _upload_and_generate_remote_uri(
+                    self._operation_scope,
+                    self._datastore_operations,
+                    local_path,
+                    datastore_name=datastore_name,
+                    show_progress=self._show_progress,
+                )
+                # TODO : Move this part to a common place
+                if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"):
+                    entry.path = entry.path + "/"
+        except (MlException, HttpResponseError) as e:
+            raise e
+        except Exception as e:
+            raise ValidationException(
+                message=f"Supported input path value are ARM id, AzureML id, remote uri or local path.\n"
+                f"Met {type(e)}:\n{e}",
+                target=ErrorTarget.JOB,
+                no_personal_data_message=(
+                    "Supported input path value are ARM id, AzureML id, " "remote uri or local path."
+                ),
+                error=e,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            ) from e
+
+    def _resolve_job_inputs_arm_id(self, job: Job) -> None:
+        try:
+            inputs: Dict[str, Union[Input, InputOutputBase, str, bool, int, float]] = job.inputs  # type: ignore
+            for _, entry in inputs.items():
+                if isinstance(entry, InputOutputBase):
+                    # extract original input form input builder.
+                    entry = entry._data
+                if not isinstance(entry, Input) or is_url(entry.path):  # Literal value or remote url
+                    continue
+                # ARM id
+                entry.path = self._orchestrators.resolve_azureml_id(entry.path)
+
+        except AttributeError:
+            # If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs
+            pass
+
+    def _resolve_arm_id_or_azureml_id(self, job: Job, resolver: Union[Callable, _AssetResolver]) -> Job:
+        """Resolve arm_id for a given job.
+
+
+        :param job: The job
+        :type job: Job
+        :param resolver: The asset resolver function
+        :type resolver: _AssetResolver
+        :return: The provided job, with fields resolved to full ARM IDs
+        :rtype: Job
+        """
+        # TODO: this will need to be parallelized when multiple tasks
+        # are required. Also consider the implications for dependencies.
+
+        if isinstance(job, _BaseJob):
+            job.compute = self._resolve_compute_id(resolver, job.compute)
+        elif isinstance(job, Command):
+            job = self._resolve_arm_id_for_command_job(job, resolver)
+        elif isinstance(job, ImportJob):
+            job = self._resolve_arm_id_for_import_job(job, resolver)
+        elif isinstance(job, Spark):
+            job = self._resolve_arm_id_for_spark_job(job, resolver)
+        elif isinstance(job, ParallelJob):
+            job = self._resolve_arm_id_for_parallel_job(job, resolver)
+        elif isinstance(job, SweepJob):
+            job = self._resolve_arm_id_for_sweep_job(job, resolver)
+        elif isinstance(job, AutoMLJob):
+            job = self._resolve_arm_id_for_automl_job(job, resolver, inside_pipeline=False)
+        elif isinstance(job, PipelineJob):
+            job = self._resolve_arm_id_for_pipeline_job(job, resolver)
+        elif isinstance(job, FineTuningJob):
+            pass
+        elif isinstance(job, DistillationJob):
+            pass
+        else:
+            msg = f"Non supported job type: {type(job)}"
+            raise ValidationException(
+                message=msg,
+                target=ErrorTarget.JOB,
+                no_personal_data_message=msg,
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+        return job
+
+    def _resolve_arm_id_for_command_job(self, job: Command, resolver: _AssetResolver) -> Command:
+        """Resolve arm_id for CommandJob.
+
+
+        :param job: The Command job
+        :type job: Command
+        :param resolver: The asset resolver function
+        :type resolver: _AssetResolver
+        :return: The provided Command job, with resolved fields
+        :rtype: Command
+        """
+        if job.code is not None and is_registry_id_for_resource(job.code):
+            msg = "Format not supported for code asset: {}"
+            raise ValidationException(
+                message=msg.format(job.code),
+                target=ErrorTarget.JOB,
+                no_personal_data_message=msg.format("[job.code]"),
+                error_category=ErrorCategory.USER_ERROR,
+                error_type=ValidationErrorType.INVALID_VALUE,
+            )
+
+        if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE):
+            job.code = resolver(  # type: ignore
+                Code(base_path=job._base_path, path=job.code),
+                azureml_type=AzureMLResourceType.CODE,
+            )
+        job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
+        job.compute = self._resolve_compute_id(resolver, job.compute)
+        return job
+
+    def _resolve_arm_id_for_spark_job(self, job: Spark, resolver: _AssetResolver) -> Spark:
+        """Resolve arm_id for SparkJob.
+
+        :param job: The Spark job
+        :type job: Spark
+        :param resolver: The asset resolver function
+        :type resolver: _AssetResolver
+        :return: The provided SparkJob, with resolved fields
+        :rtype: Spark
+        """
+        if job.code is not None and is_registry_id_for_resource(job.code):
+            msg = "Format not supported for code asset: {}"
+            raise JobException(
+                message=msg.format(job.code),
+                target=ErrorTarget.JOB,
+                no_personal_data_message=msg.format("[job.code]"),
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+        if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE):
+            job.code = resolver(  # type: ignore
+                Code(base_path=job._base_path, path=job.code),
+                azureml_type=AzureMLResourceType.CODE,
+            )
+        job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
+        job.compute = self._resolve_compute_id(resolver, job.compute)
+        return job
+
+    def _resolve_arm_id_for_import_job(self, job: ImportJob, resolver: _AssetResolver) -> ImportJob:
+        """Resolve arm_id for ImportJob.
+
+        :param job: The Import job
+        :type job: ImportJob
+        :param resolver: The asset resolver function
+        :type resolver: _AssetResolver
+        :return: The provided ImportJob, with resolved fields
+        :rtype: ImportJob
+        """
+        # compute property will be no longer applicable once import job type is ready on MFE in PuP
+        # for PrP, we use command job type instead for import job where compute property is required
+        # However, MFE only validates compute resource url format. Execution service owns the real
+        # validation today but supports reserved compute names like AmlCompute, ContainerInstance and
+        # DataFactory here for 'clusterless' jobs
+        job.compute = self._resolve_compute_id(resolver, ComputeType.ADF)
+        return job
+
+    def _resolve_arm_id_for_parallel_job(self, job: ParallelJob, resolver: _AssetResolver) -> ParallelJob:
+        """Resolve arm_id for ParallelJob.
+
+        :param job: The Parallel job
+        :type job: ParallelJob
+        :param resolver: The asset resolver function
+        :type resolver: _AssetResolver
+        :return: The provided ParallelJob, with resolved fields
+        :rtype: ParallelJob
+        """
+        if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE):  # type: ignore
+            job.code = resolver(  # type: ignore
+                Code(base_path=job._base_path, path=job.code),  # type: ignore
+                azureml_type=AzureMLResourceType.CODE,
+            )
+        job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)  # type: ignore
+        job.compute = self._resolve_compute_id(resolver, job.compute)
+        return job
+
+    def _resolve_arm_id_for_sweep_job(self, job: SweepJob, resolver: _AssetResolver) -> SweepJob:
+        """Resolve arm_id for SweepJob.
+
+        :param job: The Sweep job
+        :type job: SweepJob
+        :param resolver: The asset resolver function
+        :type resolver: _AssetResolver
+        :return: The provided SweepJob, with resolved fields
+        :rtype: SweepJob
+        """
+        if (
+            job.trial is not None
+            and job.trial.code is not None
+            and not is_ARM_id_for_resource(job.trial.code, AzureMLResourceType.CODE)
+        ):
+            job.trial.code = resolver(  # type: ignore[assignment]
+                Code(base_path=job._base_path, path=job.trial.code),
+                azureml_type=AzureMLResourceType.CODE,
+            )
+        if (
+            job.trial is not None
+            and job.trial.environment is not None
+            and not is_ARM_id_for_resource(job.trial.environment, AzureMLResourceType.ENVIRONMENT)
+        ):
+            job.trial.environment = resolver(  # type: ignore[assignment]
+                job.trial.environment, azureml_type=AzureMLResourceType.ENVIRONMENT
+            )
+        job.compute = self._resolve_compute_id(resolver, job.compute)
+        return job
+
+    def _resolve_arm_id_for_automl_job(
+        self, job: AutoMLJob, resolver: _AssetResolver, inside_pipeline: bool
+    ) -> AutoMLJob:
+        """Resolve arm_id for AutoMLJob.
+
+        :param job: The AutoML job
+        :type job: AutoMLJob
+        :param resolver: The asset resolver function
+        :type resolver: _AssetResolver
+        :param inside_pipeline: Whether the job is within a pipeline
+        :type inside_pipeline: bool
+        :return: The provided AutoMLJob, with resolved fields
+        :rtype: AutoMLJob
+        """
+        # AutoML does not have dependency uploads. Only need to resolve reference to arm id.
+
+        # automl node in pipeline has optional compute
+        if inside_pipeline and job.compute is None:
+            return job
+        job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE)
+        return job
+
+    def _resolve_arm_id_for_pipeline_job(self, pipeline_job: PipelineJob, resolver: _AssetResolver) -> PipelineJob:
+        """Resolve arm_id for pipeline_job.
+
+        :param pipeline_job: The pipeline job
+        :type pipeline_job: PipelineJob
+        :param resolver: The asset resolver function
+        :type resolver: _AssetResolver
+        :return: The provided PipelineJob, with resolved fields
+        :rtype: PipelineJob
+        """
+        # Get top-level job compute
+        _get_job_compute_id(pipeline_job, resolver)
+
+        # Process job defaults:
+        if pipeline_job.settings:
+            pipeline_job.settings.default_datastore = resolver(
+                pipeline_job.settings.default_datastore,
+                azureml_type=AzureMLResourceType.DATASTORE,
+            )
+            pipeline_job.settings.default_compute = resolver(
+                pipeline_job.settings.default_compute,
+                azureml_type=AzureMLResourceType.COMPUTE,
+            )
+
+        # Process each component job
+        try:
+            self._component_operations._resolve_dependencies_for_pipeline_component_jobs(
+                pipeline_job.component, resolver
+            )
+        except ComponentException as e:
+            raise JobException(
+                message=e.message,
+                target=ErrorTarget.JOB,
+                no_personal_data_message=e.no_personal_data_message,
+                error_category=e.error_category,
+            ) from e
+
+        # Create a pipeline component for pipeline job if user specified component in job yaml.
+        if (
+            not isinstance(pipeline_job.component, str)
+            and getattr(pipeline_job.component, "_source", None) == ComponentSource.YAML_COMPONENT
+        ):
+            pipeline_job.component = resolver(  # type: ignore
+                pipeline_job.component,
+                azureml_type=AzureMLResourceType.COMPONENT,
+            )
+
+        return pipeline_job
+
+    def _append_tid_to_studio_url(self, job: Job) -> None:
+        """Appends the user's tenant ID to the end of the studio URL.
+
+        Allows the UI to authenticate against the correct tenant.
+
+        :param job: The job
+        :type job: Job
+        """
+        try:
+            if job.services is not None:
+                studio_endpoint = job.services.get("Studio", None)
+                studio_url = studio_endpoint.endpoint
+                default_scopes = _resource_to_scopes(_get_base_url_from_metadata())
+                module_logger.debug("default_scopes used: `%s`\n", default_scopes)
+                # Extract the tenant id from the credential using PyJWT
+                decode = jwt.decode(
+                    self._credential.get_token(*default_scopes).token,
+                    options={"verify_signature": False, "verify_aud": False},
+                )
+                tid = decode["tid"]
+                formatted_tid = TID_FMT.format(tid)
+                studio_endpoint.endpoint = studio_url + formatted_tid
+        except Exception:  # pylint: disable=W0718
+            module_logger.info("Proceeding with no tenant id appended to studio URL\n")
+
+    def _set_headers_with_user_aml_token(self, kwargs: Any) -> None:
+        aml_resource_id = _get_aml_resource_id_from_metadata()
+        azure_ml_scopes = _resource_to_scopes(aml_resource_id)
+        module_logger.debug("azure_ml_scopes used: `%s`\n", azure_ml_scopes)
+        aml_token = self._credential.get_token(*azure_ml_scopes).token
+        # validate token has aml audience
+        decoded_token = jwt.decode(
+            aml_token,
+            options={"verify_signature": False, "verify_aud": False},
+        )
+        if decoded_token.get("aud") != aml_resource_id:
+            msg = """AAD token with aml scope could not be fetched using the credentials being used.
+            Please validate if token with {0} scope can be fetched using credentials provided to MLClient.
+            Token with {0} scope can be fetched using credentials.get_token({0})
+            """
+            raise ValidationException(
+                message=msg.format(*azure_ml_scopes),
+                target=ErrorTarget.JOB,
+                error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+                no_personal_data_message=msg.format("[job.code]"),
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+        headers = kwargs.pop("headers", {})
+        headers["x-azureml-token"] = aml_token
+        kwargs["headers"] = headers
+
+
+def _get_job_compute_id(job: Union[Job, Command], resolver: _AssetResolver) -> None:
+    job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE)