diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_artifact_utilities.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_artifact_utilities.py | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_artifact_utilities.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_artifact_utilities.py new file mode 100644 index 00000000..c225a73e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_artifact_utilities.py @@ -0,0 +1,598 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import logging +import os +import uuid +from datetime import datetime, timedelta +from pathlib import Path +from typing import TYPE_CHECKING, Dict, Optional, Tuple, TypeVar, Union + +from typing_extensions import Literal + +from azure.ai.ml._artifacts._blob_storage_helper import BlobStorageClient +from azure.ai.ml._artifacts._gen2_storage_helper import Gen2StorageClient +from azure.ai.ml._azure_environments import _get_storage_endpoint_from_metadata +from azure.ai.ml._restclient.v2022_10_01.models import DatastoreType +from azure.ai.ml._scope_dependent_operations import OperationScope +from azure.ai.ml._utils._arm_id_utils import ( + AMLNamedArmId, + get_datastore_arm_id, + get_resource_name_from_arm_id, + is_ARM_id_for_resource, + remove_aml_prefix, +) +from azure.ai.ml._utils._asset_utils import ( + IgnoreFile, + _build_metadata_dict, + _validate_path, + get_content_hash, + get_ignore_file, + get_object_hash, +) +from azure.ai.ml._utils._storage_utils import ( + AzureMLDatastorePathUri, + get_artifact_path_from_storage_url, + get_storage_client, +) +from azure.ai.ml._utils.utils import is_mlflow_uri, is_url +from azure.ai.ml.constants._common import SHORT_URI_FORMAT, STORAGE_ACCOUNT_URLS +from azure.ai.ml.entities import Environment +from azure.ai.ml.entities._assets._artifacts.artifact import Artifact, ArtifactStorageInfo +from azure.ai.ml.entities._datastore._constants import WORKSPACE_BLOB_STORE +from azure.ai.ml.exceptions import ErrorTarget, MlException, ValidationException +from azure.ai.ml.operations._datastore_operations import DatastoreOperations +from azure.core.exceptions import HttpResponseError +from azure.storage.blob import BlobSasPermissions, generate_blob_sas +from azure.storage.filedatalake import FileSasPermissions, generate_file_sas + +if TYPE_CHECKING: + from azure.ai.ml.operations import ( + DataOperations, + EnvironmentOperations, + EvaluatorOperations, + FeatureSetOperations, + IndexOperations, + ModelOperations, + ) + from azure.ai.ml.operations._code_operations import CodeOperations + +module_logger = logging.getLogger(__name__) + + +def _get_datastore_name(*, datastore_name: Optional[str] = WORKSPACE_BLOB_STORE) -> str: + datastore_name = WORKSPACE_BLOB_STORE if not datastore_name else datastore_name + try: + datastore_name = get_resource_name_from_arm_id(datastore_name) + except (ValueError, AttributeError, ValidationException): + module_logger.debug("datastore_name %s is not a full arm id. Proceed with a shortened name.\n", datastore_name) + datastore_name = remove_aml_prefix(datastore_name) + if is_ARM_id_for_resource(datastore_name): + datastore_name = get_resource_name_from_arm_id(datastore_name) + return str(datastore_name) + + +def get_datastore_info( + operations: DatastoreOperations, + name: str, + *, + credential=None, + **kwargs, +) -> Dict[Literal["storage_type", "storage_account", "account_url", "container_name", "credential"], str]: + """Get datastore account, type, and auth information. + + :param operations: DatastoreOperations object + :type operations: DatastoreOperations + :param name: Name of the datastore. If not provided, the default datastore will be used. + :type name: str + :keyword credential: Local credential to use for authentication. This argument is no longer used as of 1.18.0. + Instead, a SAS token will be requested from the datastore, and the MLClient credential will be used as backup, + if necessary. + :paramtype credential: str + :return: The dictionary with datastore info + :rtype: Dict[Literal["storage_type", "storage_account", "account_url", "container_name", "credential"], str] + """ + datastore_info: Dict = {} + datastore = operations.get(name) if name else operations.get_default() + + storage_endpoint = _get_storage_endpoint_from_metadata() + datastore_info["storage_type"] = datastore.type + datastore_info["storage_account"] = datastore.account_name + datastore_info["account_url"] = STORAGE_ACCOUNT_URLS[datastore.type].format( + datastore.account_name, storage_endpoint + ) + + try: + credential = operations._list_secrets(name=name, expirable_secret=True) + datastore_info["credential"] = credential.sas_token + except HttpResponseError: + datastore_info["credential"] = operations._credential + + if datastore.type == DatastoreType.AZURE_BLOB: + datastore_info["container_name"] = str(datastore.container_name) + elif datastore.type == DatastoreType.AZURE_DATA_LAKE_GEN2: + datastore_info["container_name"] = str(datastore.filesystem) + else: + msg = ( + f"Datastore type {datastore.type} is not supported for uploads. " + f"Supported types are {DatastoreType.AZURE_BLOB} and {DatastoreType.AZURE_DATA_LAKE_GEN2}." + ) + raise MlException(message=msg, no_personal_data_message=msg) + + for override_param_name, value in kwargs.items(): + if override_param_name in datastore_info: + datastore_info[override_param_name] = value + + return datastore_info + + +def list_logs_in_datastore( + ds_info: Dict[Literal["storage_type", "storage_account", "account_url", "container_name", "credential"], str], + prefix: str, + legacy_log_folder_name: str, +) -> Dict[str, str]: + """Returns a dictionary of file name to blob or data lake uri with SAS token, matching the structure of + RunDetails.logFiles. + + :param ds_info: The datastore info + :type ds_info: Dict[Literal["storage_type", "storage_account", "account_url", "container_name", "credential"], str] + :param prefix: A prefix used to filter logs by path + :type prefix: str + :param legacy_log_folder_name: the name of the folder in the datastore that contains the logs + * /azureml-logs/*.txt is the legacy log structure for commandJob and sweepJob + * /logs/azureml/*.txt is the legacy log structure for pipeline parent Job + :type legacy_log_folder_name: str + :return: A mapping of log file name to the remote URI + :rtype: Dict[str, str] + """ + if ds_info["storage_type"] not in [ + DatastoreType.AZURE_BLOB, + DatastoreType.AZURE_DATA_LAKE_GEN2, + ]: + msg = "Only Blob and Azure DataLake Storage Gen2 datastores are supported." + raise MlException(message=msg, no_personal_data_message=msg) + + storage_client = get_storage_client( + credential=ds_info["credential"], + container_name=ds_info["container_name"], + storage_account=ds_info["storage_account"], + storage_type=ds_info["storage_type"], + ) + + items = storage_client.list(starts_with=prefix + "/user_logs/") + # Append legacy log files if present + items.extend(storage_client.list(starts_with=prefix + legacy_log_folder_name)) + + log_dict = {} + for item_name in items: + sub_name = item_name.split(prefix + "/")[1] + if isinstance(storage_client, BlobStorageClient): + token = generate_blob_sas( + account_name=ds_info["storage_account"], + container_name=ds_info["container_name"], + blob_name=item_name, + account_key=ds_info["credential"], + permission=BlobSasPermissions(read=True), + expiry=datetime.utcnow() + timedelta(minutes=30), + ) + elif isinstance(storage_client, Gen2StorageClient): + token = generate_file_sas( # pylint: disable=no-value-for-parameter + account_name=ds_info["storage_account"], + file_system_name=ds_info["container_name"], + file_name=item_name, + credential=ds_info["credential"], + permission=FileSasPermissions(read=True), + expiry=datetime.utcnow() + timedelta(minutes=30), + ) + + log_dict[sub_name] = "{}/{}/{}?{}".format( + ds_info["account_url"], + ds_info["container_name"], + item_name, + token, # pylint: disable=possibly-used-before-assignment + ) + return log_dict + + +def _get_default_datastore_info(datastore_operation): + return get_datastore_info(datastore_operation, None) + + +def upload_artifact( + local_path: str, + datastore_operation: DatastoreOperations, + operation_scope: OperationScope, + datastore_name: Optional[str], + asset_hash: Optional[str] = None, + show_progress: bool = True, + asset_name: Optional[str] = None, + asset_version: Optional[str] = None, + ignore_file: IgnoreFile = IgnoreFile(None), + sas_uri: Optional[str] = None, +) -> ArtifactStorageInfo: + """Upload local file or directory to datastore. + + :param local_path: The local file or directory to upload + :type local_path: str + :param datastore_operation: The datastore operation + :type datastore_operation: DatastoreOperations + :param operation_scope: The operation scope + :type operation_scope: OperationScope + :param datastore_name: The datastore name + :type datastore_name: Optional[str] + :param asset_hash: The asset hash + :type asset_hash: Optional[str] + :param show_progress: Whether to show progress on the console. Defaults to True. + :type show_progress: bool + :param asset_name: The asset name + :type asset_name: Optional[str] + :param asset_version: The asset version + :type asset_version: Optional[str] + :param ignore_file: The IgnoreFile determining which, if any, files to ignore when uploading + :type ignore_file: IgnoreFile + :param sas_uri: The sas uri to use for uploading + :type sas_uri: Optional[str] + :return: The artifact storage info + :rtype: ArtifactStorageInfo + """ + if sas_uri: + storage_client = get_storage_client(credential=None, storage_account=None, account_url=sas_uri) + else: + datastore_name = _get_datastore_name(datastore_name=datastore_name) + datastore_info = get_datastore_info(datastore_operation, datastore_name) + storage_client = get_storage_client(**datastore_info) + + artifact_info = storage_client.upload( + local_path, + asset_hash=asset_hash, + show_progress=show_progress, + name=asset_name, + version=asset_version, + ignore_file=ignore_file, + ) + + artifact = ArtifactStorageInfo( + name=artifact_info["name"], + version=artifact_info["version"], + relative_path=artifact_info["remote path"], + datastore_arm_id=get_datastore_arm_id(datastore_name, operation_scope) if not sas_uri else None, + container_name=( + storage_client.container if isinstance(storage_client, BlobStorageClient) else storage_client.file_system + ), + storage_account_url=datastore_info.get("account_url") if not sas_uri else sas_uri, + indicator_file=artifact_info["indicator file"], + is_file=Path(local_path).is_file(), + ) + return artifact + + +def download_artifact( + starts_with: Union[str, os.PathLike], + destination: str, + datastore_operation: DatastoreOperations, + datastore_name: Optional[str], + datastore_info: Optional[Dict] = None, +) -> str: + """Download datastore path to local file or directory. + + :param starts_with: Prefix of blobs to download + :type starts_with: Union[str, os.PathLike] + :param destination: Path that files will be written to + :type destination: str + :param datastore_operation: Datastore operations + :type datastore_operation: DatastoreOperations + :param datastore_name: name of datastore + :type datastore_name: Optional[str] + :param datastore_info: the return value of invoking get_datastore_info + :type datastore_info: Optional[Dict] + :return: Path that files were written to + :rtype: str + """ + starts_with = starts_with.as_posix() if isinstance(starts_with, Path) else starts_with + datastore_name = _get_datastore_name(datastore_name=datastore_name) + if datastore_info is None: + datastore_info = get_datastore_info(datastore_operation, datastore_name) + storage_client = get_storage_client(**datastore_info) + storage_client.download(starts_with=starts_with, destination=destination) + return destination + + +def download_artifact_from_storage_url( + blob_url: str, + destination: str, + datastore_operation: DatastoreOperations, + datastore_name: Optional[str], +) -> str: + """Download datastore blob URL to local file or directory. + + :param blob_url: The blob url to download + :type blob_url: str + :param destination: Path that the artifact will be written to + :type destination: str + :param datastore_operation: The datastore operations + :type datastore_operation: DatastoreOperations + :param datastore_name: The datastore name + :type datastore_name: Optional[str] + :return: Path that files were written to + :rtype: str + """ + datastore_name = _get_datastore_name(datastore_name=datastore_name) + datastore_info = get_datastore_info(datastore_operation, datastore_name) + starts_with = get_artifact_path_from_storage_url( + blob_url=str(blob_url), container_name=datastore_info.get("container_name") + ) + return download_artifact( + starts_with=starts_with, + destination=destination, + datastore_operation=datastore_operation, + datastore_name=datastore_name, + datastore_info=datastore_info, + ) + + +def download_artifact_from_aml_uri(uri: str, destination: str, datastore_operation: DatastoreOperations) -> str: + """Downloads artifact pointed to by URI of the form `azureml://...` to destination. + + :param str uri: AzureML uri of artifact to download + :param str destination: Path to download artifact to + :param DatastoreOperations datastore_operation: datastore operations + :return: Path that files were downloaded to + :rtype: str + """ + parsed_uri = AzureMLDatastorePathUri(uri) + return download_artifact( + starts_with=parsed_uri.path, + destination=destination, + datastore_operation=datastore_operation, + datastore_name=parsed_uri.datastore, + ) + + +def aml_datastore_path_exists( + uri: str, datastore_operation: DatastoreOperations, datastore_info: Optional[dict] = None +) -> bool: + """Checks whether `uri` of the form "azureml://" points to either a directory or a file. + + :param str uri: azure ml datastore uri + :param DatastoreOperations datastore_operation: Datastore operation + :param dict datastore_info: return value of get_datastore_info + :return: True if uri exists False otherwise + :rtype: bool + """ + parsed_uri = AzureMLDatastorePathUri(uri) + datastore_info = datastore_info or get_datastore_info(datastore_operation, parsed_uri.datastore) + return get_storage_client(**datastore_info).exists(parsed_uri.path) + + +def _upload_to_datastore( + operation_scope: OperationScope, + datastore_operation: DatastoreOperations, + path: Union[str, Path, os.PathLike], + artifact_type: str, + datastore_name: Optional[str] = None, + show_progress: bool = True, + asset_name: Optional[str] = None, + asset_version: Optional[str] = None, + asset_hash: Optional[str] = None, + ignore_file: Optional[IgnoreFile] = None, + sas_uri: Optional[str] = None, + blob_uri: Optional[str] = None, +) -> ArtifactStorageInfo: + _validate_path(path, _type=artifact_type) + if not ignore_file: + ignore_file = get_ignore_file(path) + if not asset_hash: + asset_hash = get_object_hash(path, ignore_file) + artifact = upload_artifact( + str(path), + datastore_operation, + operation_scope, + datastore_name, + show_progress=show_progress, + asset_hash=asset_hash, + asset_name=asset_name, + asset_version=asset_version, + ignore_file=ignore_file, + sas_uri=sas_uri, + ) + if blob_uri: + artifact.storage_account_url = blob_uri + + return artifact + + +def _upload_and_generate_remote_uri( + operation_scope: OperationScope, + datastore_operation: DatastoreOperations, + path: Union[str, Path, os.PathLike], + artifact_type: str = ErrorTarget.ARTIFACT, + datastore_name: str = WORKSPACE_BLOB_STORE, + show_progress: bool = True, +) -> str: + # Asset name is required for uploading to a datastore + asset_name = str(uuid.uuid4()) + artifact_info = _upload_to_datastore( + operation_scope=operation_scope, + datastore_operation=datastore_operation, + path=path, + datastore_name=datastore_name, + asset_name=asset_name, + artifact_type=artifact_type, + show_progress=show_progress, + ) + + path = artifact_info.relative_path + datastore = AMLNamedArmId(artifact_info.datastore_arm_id).asset_name + return SHORT_URI_FORMAT.format(datastore, path) + + +def _update_metadata(name, version, indicator_file, datastore_info) -> None: + storage_client = get_storage_client(**datastore_info) + + if isinstance(storage_client, BlobStorageClient): + _update_blob_metadata(name, version, indicator_file, storage_client) + elif isinstance(storage_client, Gen2StorageClient): + _update_gen2_metadata(name, version, indicator_file, storage_client) + + +def _update_blob_metadata(name, version, indicator_file, storage_client) -> None: + container_client = storage_client.container_client + if indicator_file.startswith(storage_client.container): + indicator_file = indicator_file.split(storage_client.container)[1] + blob = container_client.get_blob_client(blob=indicator_file) + blob.set_blob_metadata(_build_metadata_dict(name=name, version=version)) + + +def _update_gen2_metadata(name, version, indicator_file, storage_client) -> None: + artifact_directory_client = storage_client.file_system_client.get_directory_client(indicator_file) + artifact_directory_client.set_metadata(_build_metadata_dict(name=name, version=version)) + + +T = TypeVar("T", bound=Artifact) + + +def _check_and_upload_path( + artifact: T, + asset_operations: Union[ + "DataOperations", + "ModelOperations", + "EvaluatorOperations", + "CodeOperations", + "FeatureSetOperations", + "IndexOperations", + ], + artifact_type: str, + datastore_name: Optional[str] = None, + sas_uri: Optional[str] = None, + show_progress: bool = True, + blob_uri: Optional[str] = None, +) -> Tuple[T, Optional[str]]: + """Checks whether `artifact` is a path or a uri and uploads it to the datastore if necessary. + + :param artifact: artifact to check and upload param + :type artifact: T + :param asset_operations: The asset operations to use for uploading + :type asset_operations: Union["DataOperations", "ModelOperations", "CodeOperations", "IndexOperations"] + :param artifact_type: The artifact type + :type artifact_type: str + :param datastore_name: the name of the datastore to upload to + :type datastore_name: Optional[str] + :param sas_uri: the sas uri to use for uploading + :type sas_uri: Optional[str] + :param show_progress: Whether to show progress on the console. Defaults to True. + :type show_progress: bool + :param blob_uri: The storage account uri + :type blob_uri: Optional[str] + :return: A 2-tuple of the uploaded artifact, and the indicator file. + :rtype: Tuple[T, Optional[str]] + """ + + datastore_name = artifact.datastore + indicator_file = None + if ( + hasattr(artifact, "local_path") + and artifact.local_path is not None + or ( + hasattr(artifact, "path") + and artifact.path is not None + and not (is_url(artifact.path) or is_mlflow_uri(artifact.path)) + ) + ): + path = ( + Path(artifact.path) + if hasattr(artifact, "path") and artifact.path is not None + else Path(artifact.local_path) + ) + if not path.is_absolute(): + path = Path(artifact.base_path, path).resolve() + uploaded_artifact = _upload_to_datastore( + asset_operations._operation_scope, + asset_operations._datastore_operation, + path, + datastore_name=datastore_name, + asset_name=artifact.name, + asset_version=str(artifact.version), + asset_hash=getattr(artifact, "_upload_hash", None), + sas_uri=sas_uri, + artifact_type=artifact_type, + show_progress=show_progress, + ignore_file=getattr(artifact, "_ignore_file", None), + blob_uri=blob_uri, + ) + indicator_file = uploaded_artifact.indicator_file # reference to storage contents + if artifact._is_anonymous: + artifact.name, artifact.version = ( + uploaded_artifact.name, + uploaded_artifact.version, + ) + # Pass all of the upload information to the assets, and they will each construct the URLs that they support + artifact._update_path(uploaded_artifact) + return artifact, indicator_file + + +def _check_and_upload_env_build_context( + environment: Environment, + operations: "EnvironmentOperations", + sas_uri=None, + show_progress: bool = True, +) -> Environment: + if environment.path: + uploaded_artifact = _upload_to_datastore( + operations._operation_scope, + operations._datastore_operation, + environment.path, + asset_name=environment.name, + asset_version=str(environment.version), + asset_hash=environment._upload_hash, + sas_uri=sas_uri, + artifact_type=ErrorTarget.ENVIRONMENT, + datastore_name=environment.datastore, + show_progress=show_progress, + ) + if environment.build is not None: + # TODO: Depending on decision trailing "/" needs to stay or not. EMS requires it to be present + environment.build.path = str(uploaded_artifact.full_storage_path) + "/" + return environment + + +def _get_snapshot_path_info(artifact) -> Optional[Tuple[Path, IgnoreFile, str]]: + """ + Validate an Artifact's local path and get its resolved path, ignore file, and hash. If no local path, return None. + :param artifact: Artifact object + :type artifact: azure.ai.ml.entities._assets._artifacts.artifact.Artifact + :return: Artifact's path, ignorefile, and hash + :rtype: Tuple[os.PathLike, IgnoreFile, str] + """ + if ( + hasattr(artifact, "local_path") + and artifact.local_path is not None + or ( + hasattr(artifact, "path") + and artifact.path is not None + and not (is_url(artifact.path) or is_mlflow_uri(artifact.path)) + ) + ): + path = ( + Path(artifact.path) + if hasattr(artifact, "path") and artifact.path is not None + else Path(artifact.local_path) + ) + if not path.is_absolute(): + path = Path(artifact.base_path, path).resolve() + else: + return None + + _validate_path(path, _type=ErrorTarget.CODE) + + # to align with _check_and_upload_path, we need to try getting the ignore file from the artifact first + ignore_file = getattr(artifact, "_ignore_file", get_ignore_file(path)) + # Note that we haven't used getattr(artifact, "_upload_hash", get_content_hash(path, ignore_file)) here, which + # is aligned with _check_and_upload_path. Current guess is that content_hash is what we used in blob, so we must + # use it to retrieve the artifact. + # TODO: Core SDK team to provide more information on this + asset_hash = get_content_hash(path, ignore_file) + + return path, ignore_file, asset_hash |