diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts')
6 files changed, 1697 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/__init__.py new file mode 100644 index 00000000..d540fd20 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/__init__.py @@ -0,0 +1,3 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_artifact_utilities.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_artifact_utilities.py new file mode 100644 index 00000000..c225a73e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_artifact_utilities.py @@ -0,0 +1,598 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access + +import logging +import os +import uuid +from datetime import datetime, timedelta +from pathlib import Path +from typing import TYPE_CHECKING, Dict, Optional, Tuple, TypeVar, Union + +from typing_extensions import Literal + +from azure.ai.ml._artifacts._blob_storage_helper import BlobStorageClient +from azure.ai.ml._artifacts._gen2_storage_helper import Gen2StorageClient +from azure.ai.ml._azure_environments import _get_storage_endpoint_from_metadata +from azure.ai.ml._restclient.v2022_10_01.models import DatastoreType +from azure.ai.ml._scope_dependent_operations import OperationScope +from azure.ai.ml._utils._arm_id_utils import ( + AMLNamedArmId, + get_datastore_arm_id, + get_resource_name_from_arm_id, + is_ARM_id_for_resource, + remove_aml_prefix, +) +from azure.ai.ml._utils._asset_utils import ( + IgnoreFile, + _build_metadata_dict, + _validate_path, + get_content_hash, + get_ignore_file, + get_object_hash, +) +from azure.ai.ml._utils._storage_utils import ( + AzureMLDatastorePathUri, + get_artifact_path_from_storage_url, + get_storage_client, +) +from azure.ai.ml._utils.utils import is_mlflow_uri, is_url +from azure.ai.ml.constants._common import SHORT_URI_FORMAT, STORAGE_ACCOUNT_URLS +from azure.ai.ml.entities import Environment +from azure.ai.ml.entities._assets._artifacts.artifact import Artifact, ArtifactStorageInfo +from azure.ai.ml.entities._datastore._constants import WORKSPACE_BLOB_STORE +from azure.ai.ml.exceptions import ErrorTarget, MlException, ValidationException +from azure.ai.ml.operations._datastore_operations import DatastoreOperations +from azure.core.exceptions import HttpResponseError +from azure.storage.blob import BlobSasPermissions, generate_blob_sas +from azure.storage.filedatalake import FileSasPermissions, generate_file_sas + +if TYPE_CHECKING: + from azure.ai.ml.operations import ( + DataOperations, + EnvironmentOperations, + EvaluatorOperations, + FeatureSetOperations, + IndexOperations, + ModelOperations, + ) + from azure.ai.ml.operations._code_operations import CodeOperations + +module_logger = logging.getLogger(__name__) + + +def _get_datastore_name(*, datastore_name: Optional[str] = WORKSPACE_BLOB_STORE) -> str: + datastore_name = WORKSPACE_BLOB_STORE if not datastore_name else datastore_name + try: + datastore_name = get_resource_name_from_arm_id(datastore_name) + except (ValueError, AttributeError, ValidationException): + module_logger.debug("datastore_name %s is not a full arm id. Proceed with a shortened name.\n", datastore_name) + datastore_name = remove_aml_prefix(datastore_name) + if is_ARM_id_for_resource(datastore_name): + datastore_name = get_resource_name_from_arm_id(datastore_name) + return str(datastore_name) + + +def get_datastore_info( + operations: DatastoreOperations, + name: str, + *, + credential=None, + **kwargs, +) -> Dict[Literal["storage_type", "storage_account", "account_url", "container_name", "credential"], str]: + """Get datastore account, type, and auth information. + + :param operations: DatastoreOperations object + :type operations: DatastoreOperations + :param name: Name of the datastore. If not provided, the default datastore will be used. + :type name: str + :keyword credential: Local credential to use for authentication. This argument is no longer used as of 1.18.0. + Instead, a SAS token will be requested from the datastore, and the MLClient credential will be used as backup, + if necessary. + :paramtype credential: str + :return: The dictionary with datastore info + :rtype: Dict[Literal["storage_type", "storage_account", "account_url", "container_name", "credential"], str] + """ + datastore_info: Dict = {} + datastore = operations.get(name) if name else operations.get_default() + + storage_endpoint = _get_storage_endpoint_from_metadata() + datastore_info["storage_type"] = datastore.type + datastore_info["storage_account"] = datastore.account_name + datastore_info["account_url"] = STORAGE_ACCOUNT_URLS[datastore.type].format( + datastore.account_name, storage_endpoint + ) + + try: + credential = operations._list_secrets(name=name, expirable_secret=True) + datastore_info["credential"] = credential.sas_token + except HttpResponseError: + datastore_info["credential"] = operations._credential + + if datastore.type == DatastoreType.AZURE_BLOB: + datastore_info["container_name"] = str(datastore.container_name) + elif datastore.type == DatastoreType.AZURE_DATA_LAKE_GEN2: + datastore_info["container_name"] = str(datastore.filesystem) + else: + msg = ( + f"Datastore type {datastore.type} is not supported for uploads. " + f"Supported types are {DatastoreType.AZURE_BLOB} and {DatastoreType.AZURE_DATA_LAKE_GEN2}." + ) + raise MlException(message=msg, no_personal_data_message=msg) + + for override_param_name, value in kwargs.items(): + if override_param_name in datastore_info: + datastore_info[override_param_name] = value + + return datastore_info + + +def list_logs_in_datastore( + ds_info: Dict[Literal["storage_type", "storage_account", "account_url", "container_name", "credential"], str], + prefix: str, + legacy_log_folder_name: str, +) -> Dict[str, str]: + """Returns a dictionary of file name to blob or data lake uri with SAS token, matching the structure of + RunDetails.logFiles. + + :param ds_info: The datastore info + :type ds_info: Dict[Literal["storage_type", "storage_account", "account_url", "container_name", "credential"], str] + :param prefix: A prefix used to filter logs by path + :type prefix: str + :param legacy_log_folder_name: the name of the folder in the datastore that contains the logs + * /azureml-logs/*.txt is the legacy log structure for commandJob and sweepJob + * /logs/azureml/*.txt is the legacy log structure for pipeline parent Job + :type legacy_log_folder_name: str + :return: A mapping of log file name to the remote URI + :rtype: Dict[str, str] + """ + if ds_info["storage_type"] not in [ + DatastoreType.AZURE_BLOB, + DatastoreType.AZURE_DATA_LAKE_GEN2, + ]: + msg = "Only Blob and Azure DataLake Storage Gen2 datastores are supported." + raise MlException(message=msg, no_personal_data_message=msg) + + storage_client = get_storage_client( + credential=ds_info["credential"], + container_name=ds_info["container_name"], + storage_account=ds_info["storage_account"], + storage_type=ds_info["storage_type"], + ) + + items = storage_client.list(starts_with=prefix + "/user_logs/") + # Append legacy log files if present + items.extend(storage_client.list(starts_with=prefix + legacy_log_folder_name)) + + log_dict = {} + for item_name in items: + sub_name = item_name.split(prefix + "/")[1] + if isinstance(storage_client, BlobStorageClient): + token = generate_blob_sas( + account_name=ds_info["storage_account"], + container_name=ds_info["container_name"], + blob_name=item_name, + account_key=ds_info["credential"], + permission=BlobSasPermissions(read=True), + expiry=datetime.utcnow() + timedelta(minutes=30), + ) + elif isinstance(storage_client, Gen2StorageClient): + token = generate_file_sas( # pylint: disable=no-value-for-parameter + account_name=ds_info["storage_account"], + file_system_name=ds_info["container_name"], + file_name=item_name, + credential=ds_info["credential"], + permission=FileSasPermissions(read=True), + expiry=datetime.utcnow() + timedelta(minutes=30), + ) + + log_dict[sub_name] = "{}/{}/{}?{}".format( + ds_info["account_url"], + ds_info["container_name"], + item_name, + token, # pylint: disable=possibly-used-before-assignment + ) + return log_dict + + +def _get_default_datastore_info(datastore_operation): + return get_datastore_info(datastore_operation, None) + + +def upload_artifact( + local_path: str, + datastore_operation: DatastoreOperations, + operation_scope: OperationScope, + datastore_name: Optional[str], + asset_hash: Optional[str] = None, + show_progress: bool = True, + asset_name: Optional[str] = None, + asset_version: Optional[str] = None, + ignore_file: IgnoreFile = IgnoreFile(None), + sas_uri: Optional[str] = None, +) -> ArtifactStorageInfo: + """Upload local file or directory to datastore. + + :param local_path: The local file or directory to upload + :type local_path: str + :param datastore_operation: The datastore operation + :type datastore_operation: DatastoreOperations + :param operation_scope: The operation scope + :type operation_scope: OperationScope + :param datastore_name: The datastore name + :type datastore_name: Optional[str] + :param asset_hash: The asset hash + :type asset_hash: Optional[str] + :param show_progress: Whether to show progress on the console. Defaults to True. + :type show_progress: bool + :param asset_name: The asset name + :type asset_name: Optional[str] + :param asset_version: The asset version + :type asset_version: Optional[str] + :param ignore_file: The IgnoreFile determining which, if any, files to ignore when uploading + :type ignore_file: IgnoreFile + :param sas_uri: The sas uri to use for uploading + :type sas_uri: Optional[str] + :return: The artifact storage info + :rtype: ArtifactStorageInfo + """ + if sas_uri: + storage_client = get_storage_client(credential=None, storage_account=None, account_url=sas_uri) + else: + datastore_name = _get_datastore_name(datastore_name=datastore_name) + datastore_info = get_datastore_info(datastore_operation, datastore_name) + storage_client = get_storage_client(**datastore_info) + + artifact_info = storage_client.upload( + local_path, + asset_hash=asset_hash, + show_progress=show_progress, + name=asset_name, + version=asset_version, + ignore_file=ignore_file, + ) + + artifact = ArtifactStorageInfo( + name=artifact_info["name"], + version=artifact_info["version"], + relative_path=artifact_info["remote path"], + datastore_arm_id=get_datastore_arm_id(datastore_name, operation_scope) if not sas_uri else None, + container_name=( + storage_client.container if isinstance(storage_client, BlobStorageClient) else storage_client.file_system + ), + storage_account_url=datastore_info.get("account_url") if not sas_uri else sas_uri, + indicator_file=artifact_info["indicator file"], + is_file=Path(local_path).is_file(), + ) + return artifact + + +def download_artifact( + starts_with: Union[str, os.PathLike], + destination: str, + datastore_operation: DatastoreOperations, + datastore_name: Optional[str], + datastore_info: Optional[Dict] = None, +) -> str: + """Download datastore path to local file or directory. + + :param starts_with: Prefix of blobs to download + :type starts_with: Union[str, os.PathLike] + :param destination: Path that files will be written to + :type destination: str + :param datastore_operation: Datastore operations + :type datastore_operation: DatastoreOperations + :param datastore_name: name of datastore + :type datastore_name: Optional[str] + :param datastore_info: the return value of invoking get_datastore_info + :type datastore_info: Optional[Dict] + :return: Path that files were written to + :rtype: str + """ + starts_with = starts_with.as_posix() if isinstance(starts_with, Path) else starts_with + datastore_name = _get_datastore_name(datastore_name=datastore_name) + if datastore_info is None: + datastore_info = get_datastore_info(datastore_operation, datastore_name) + storage_client = get_storage_client(**datastore_info) + storage_client.download(starts_with=starts_with, destination=destination) + return destination + + +def download_artifact_from_storage_url( + blob_url: str, + destination: str, + datastore_operation: DatastoreOperations, + datastore_name: Optional[str], +) -> str: + """Download datastore blob URL to local file or directory. + + :param blob_url: The blob url to download + :type blob_url: str + :param destination: Path that the artifact will be written to + :type destination: str + :param datastore_operation: The datastore operations + :type datastore_operation: DatastoreOperations + :param datastore_name: The datastore name + :type datastore_name: Optional[str] + :return: Path that files were written to + :rtype: str + """ + datastore_name = _get_datastore_name(datastore_name=datastore_name) + datastore_info = get_datastore_info(datastore_operation, datastore_name) + starts_with = get_artifact_path_from_storage_url( + blob_url=str(blob_url), container_name=datastore_info.get("container_name") + ) + return download_artifact( + starts_with=starts_with, + destination=destination, + datastore_operation=datastore_operation, + datastore_name=datastore_name, + datastore_info=datastore_info, + ) + + +def download_artifact_from_aml_uri(uri: str, destination: str, datastore_operation: DatastoreOperations) -> str: + """Downloads artifact pointed to by URI of the form `azureml://...` to destination. + + :param str uri: AzureML uri of artifact to download + :param str destination: Path to download artifact to + :param DatastoreOperations datastore_operation: datastore operations + :return: Path that files were downloaded to + :rtype: str + """ + parsed_uri = AzureMLDatastorePathUri(uri) + return download_artifact( + starts_with=parsed_uri.path, + destination=destination, + datastore_operation=datastore_operation, + datastore_name=parsed_uri.datastore, + ) + + +def aml_datastore_path_exists( + uri: str, datastore_operation: DatastoreOperations, datastore_info: Optional[dict] = None +) -> bool: + """Checks whether `uri` of the form "azureml://" points to either a directory or a file. + + :param str uri: azure ml datastore uri + :param DatastoreOperations datastore_operation: Datastore operation + :param dict datastore_info: return value of get_datastore_info + :return: True if uri exists False otherwise + :rtype: bool + """ + parsed_uri = AzureMLDatastorePathUri(uri) + datastore_info = datastore_info or get_datastore_info(datastore_operation, parsed_uri.datastore) + return get_storage_client(**datastore_info).exists(parsed_uri.path) + + +def _upload_to_datastore( + operation_scope: OperationScope, + datastore_operation: DatastoreOperations, + path: Union[str, Path, os.PathLike], + artifact_type: str, + datastore_name: Optional[str] = None, + show_progress: bool = True, + asset_name: Optional[str] = None, + asset_version: Optional[str] = None, + asset_hash: Optional[str] = None, + ignore_file: Optional[IgnoreFile] = None, + sas_uri: Optional[str] = None, + blob_uri: Optional[str] = None, +) -> ArtifactStorageInfo: + _validate_path(path, _type=artifact_type) + if not ignore_file: + ignore_file = get_ignore_file(path) + if not asset_hash: + asset_hash = get_object_hash(path, ignore_file) + artifact = upload_artifact( + str(path), + datastore_operation, + operation_scope, + datastore_name, + show_progress=show_progress, + asset_hash=asset_hash, + asset_name=asset_name, + asset_version=asset_version, + ignore_file=ignore_file, + sas_uri=sas_uri, + ) + if blob_uri: + artifact.storage_account_url = blob_uri + + return artifact + + +def _upload_and_generate_remote_uri( + operation_scope: OperationScope, + datastore_operation: DatastoreOperations, + path: Union[str, Path, os.PathLike], + artifact_type: str = ErrorTarget.ARTIFACT, + datastore_name: str = WORKSPACE_BLOB_STORE, + show_progress: bool = True, +) -> str: + # Asset name is required for uploading to a datastore + asset_name = str(uuid.uuid4()) + artifact_info = _upload_to_datastore( + operation_scope=operation_scope, + datastore_operation=datastore_operation, + path=path, + datastore_name=datastore_name, + asset_name=asset_name, + artifact_type=artifact_type, + show_progress=show_progress, + ) + + path = artifact_info.relative_path + datastore = AMLNamedArmId(artifact_info.datastore_arm_id).asset_name + return SHORT_URI_FORMAT.format(datastore, path) + + +def _update_metadata(name, version, indicator_file, datastore_info) -> None: + storage_client = get_storage_client(**datastore_info) + + if isinstance(storage_client, BlobStorageClient): + _update_blob_metadata(name, version, indicator_file, storage_client) + elif isinstance(storage_client, Gen2StorageClient): + _update_gen2_metadata(name, version, indicator_file, storage_client) + + +def _update_blob_metadata(name, version, indicator_file, storage_client) -> None: + container_client = storage_client.container_client + if indicator_file.startswith(storage_client.container): + indicator_file = indicator_file.split(storage_client.container)[1] + blob = container_client.get_blob_client(blob=indicator_file) + blob.set_blob_metadata(_build_metadata_dict(name=name, version=version)) + + +def _update_gen2_metadata(name, version, indicator_file, storage_client) -> None: + artifact_directory_client = storage_client.file_system_client.get_directory_client(indicator_file) + artifact_directory_client.set_metadata(_build_metadata_dict(name=name, version=version)) + + +T = TypeVar("T", bound=Artifact) + + +def _check_and_upload_path( + artifact: T, + asset_operations: Union[ + "DataOperations", + "ModelOperations", + "EvaluatorOperations", + "CodeOperations", + "FeatureSetOperations", + "IndexOperations", + ], + artifact_type: str, + datastore_name: Optional[str] = None, + sas_uri: Optional[str] = None, + show_progress: bool = True, + blob_uri: Optional[str] = None, +) -> Tuple[T, Optional[str]]: + """Checks whether `artifact` is a path or a uri and uploads it to the datastore if necessary. + + :param artifact: artifact to check and upload param + :type artifact: T + :param asset_operations: The asset operations to use for uploading + :type asset_operations: Union["DataOperations", "ModelOperations", "CodeOperations", "IndexOperations"] + :param artifact_type: The artifact type + :type artifact_type: str + :param datastore_name: the name of the datastore to upload to + :type datastore_name: Optional[str] + :param sas_uri: the sas uri to use for uploading + :type sas_uri: Optional[str] + :param show_progress: Whether to show progress on the console. Defaults to True. + :type show_progress: bool + :param blob_uri: The storage account uri + :type blob_uri: Optional[str] + :return: A 2-tuple of the uploaded artifact, and the indicator file. + :rtype: Tuple[T, Optional[str]] + """ + + datastore_name = artifact.datastore + indicator_file = None + if ( + hasattr(artifact, "local_path") + and artifact.local_path is not None + or ( + hasattr(artifact, "path") + and artifact.path is not None + and not (is_url(artifact.path) or is_mlflow_uri(artifact.path)) + ) + ): + path = ( + Path(artifact.path) + if hasattr(artifact, "path") and artifact.path is not None + else Path(artifact.local_path) + ) + if not path.is_absolute(): + path = Path(artifact.base_path, path).resolve() + uploaded_artifact = _upload_to_datastore( + asset_operations._operation_scope, + asset_operations._datastore_operation, + path, + datastore_name=datastore_name, + asset_name=artifact.name, + asset_version=str(artifact.version), + asset_hash=getattr(artifact, "_upload_hash", None), + sas_uri=sas_uri, + artifact_type=artifact_type, + show_progress=show_progress, + ignore_file=getattr(artifact, "_ignore_file", None), + blob_uri=blob_uri, + ) + indicator_file = uploaded_artifact.indicator_file # reference to storage contents + if artifact._is_anonymous: + artifact.name, artifact.version = ( + uploaded_artifact.name, + uploaded_artifact.version, + ) + # Pass all of the upload information to the assets, and they will each construct the URLs that they support + artifact._update_path(uploaded_artifact) + return artifact, indicator_file + + +def _check_and_upload_env_build_context( + environment: Environment, + operations: "EnvironmentOperations", + sas_uri=None, + show_progress: bool = True, +) -> Environment: + if environment.path: + uploaded_artifact = _upload_to_datastore( + operations._operation_scope, + operations._datastore_operation, + environment.path, + asset_name=environment.name, + asset_version=str(environment.version), + asset_hash=environment._upload_hash, + sas_uri=sas_uri, + artifact_type=ErrorTarget.ENVIRONMENT, + datastore_name=environment.datastore, + show_progress=show_progress, + ) + if environment.build is not None: + # TODO: Depending on decision trailing "/" needs to stay or not. EMS requires it to be present + environment.build.path = str(uploaded_artifact.full_storage_path) + "/" + return environment + + +def _get_snapshot_path_info(artifact) -> Optional[Tuple[Path, IgnoreFile, str]]: + """ + Validate an Artifact's local path and get its resolved path, ignore file, and hash. If no local path, return None. + :param artifact: Artifact object + :type artifact: azure.ai.ml.entities._assets._artifacts.artifact.Artifact + :return: Artifact's path, ignorefile, and hash + :rtype: Tuple[os.PathLike, IgnoreFile, str] + """ + if ( + hasattr(artifact, "local_path") + and artifact.local_path is not None + or ( + hasattr(artifact, "path") + and artifact.path is not None + and not (is_url(artifact.path) or is_mlflow_uri(artifact.path)) + ) + ): + path = ( + Path(artifact.path) + if hasattr(artifact, "path") and artifact.path is not None + else Path(artifact.local_path) + ) + if not path.is_absolute(): + path = Path(artifact.base_path, path).resolve() + else: + return None + + _validate_path(path, _type=ErrorTarget.CODE) + + # to align with _check_and_upload_path, we need to try getting the ignore file from the artifact first + ignore_file = getattr(artifact, "_ignore_file", get_ignore_file(path)) + # Note that we haven't used getattr(artifact, "_upload_hash", get_content_hash(path, ignore_file)) here, which + # is aligned with _check_and_upload_path. Current guess is that content_hash is what we used in blob, so we must + # use it to retrieve the artifact. + # TODO: Core SDK team to provide more information on this + asset_hash = get_content_hash(path, ignore_file) + + return path, ignore_file, asset_hash diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_blob_storage_helper.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_blob_storage_helper.py new file mode 100644 index 00000000..4cd8ced1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_blob_storage_helper.py @@ -0,0 +1,344 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=client-accepts-api-version-keyword,too-many-instance-attributes,client-method-missing-type-annotations,missing-client-constructor-parameter-kwargs,logging-format-interpolation + +import logging +import os +import sys +import time +import uuid +from pathlib import Path, PurePosixPath +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +from colorama import Fore +from typing_extensions import Literal + +from azure.ai.ml._artifacts._constants import ( + ARTIFACT_ORIGIN, + BLOB_DATASTORE_IS_HDI_FOLDER_KEY, + FILE_SIZE_WARNING, + KEY_AUTHENTICATION_ERROR_CODE, + LEGACY_ARTIFACT_DIRECTORY, + MAX_CONCURRENCY, + SAS_KEY_AUTHENTICATION_ERROR_MSG, + UPLOAD_CONFIRMATION, +) +from azure.ai.ml._azure_environments import _get_cloud_details +from azure.ai.ml._utils._asset_utils import ( + AssetNotChangedError, + IgnoreFile, + _build_metadata_dict, + generate_asset_id, + get_directory_size, + upload_directory, + upload_file, +) +from azure.ai.ml.constants._common import STORAGE_AUTH_MISMATCH_ERROR +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, MlException, ValidationException +from azure.core.exceptions import HttpResponseError, ResourceNotFoundError +from azure.storage.blob import BlobServiceClient, ContainerClient + +if TYPE_CHECKING: + from azure.storage.blob import BlobProperties + +module_logger = logging.getLogger(__name__) + + +class BlobStorageClient: + def __init__(self, credential: str, account_url: str, container_name: Optional[str] = None): + self.account_name = account_url.split(".")[0].split("//")[1] + self.service_client = BlobServiceClient(account_url=account_url, credential=credential) + self.upload_to_root_container = None + if container_name: + self.container_client = self.service_client.get_container_client(container=container_name) + else: + self.container_client = ContainerClient.from_container_url(account_url) + self.upload_to_root_container = True + self.container = container_name if container_name else self.container_client.container_name + self.total_file_count = 1 + self.uploaded_file_count = 0 + self.overwrite = False + self.indicator_file: Any = None + self.legacy = False + self.name = None + self.version = None + + def upload( + self, + source: str, + name: str, + version: str, + ignore_file: IgnoreFile = IgnoreFile(None), + asset_hash: Optional[str] = None, + show_progress: bool = True, + ) -> Dict[Literal["remote path", "name", "version", "indicator file"], str]: + """Upload a file or directory to a path inside the container. + + :param source: The path to either a file or directory to upload + :type source: str + :param name: The asset name + :type name: str + :param version: The asset version + :type version: str + :param ignore_file: The IgnoreFile that specifies which files, if any, to ignore when uploading files + :type ignore_file: IgnoreFile + :param asset_hash: The asset hash + :type asset_hash: Optional[str] + :param show_progress: Whether to show progress on the console. Defaults to True. + :type show_progress: bool + :return: A dictionary containing info of the uploaded artifact + :rtype: Dict[Literal["remote path", "name", "version", "indicator file"], str] + """ + if name and version is None: + version = str(uuid.uuid4()) # placeholder for auto-increment artifacts + + asset_id = generate_asset_id(asset_hash, include_directory=True) if not self.upload_to_root_container else "" + source_name = Path(source).name + dest = str(PurePosixPath(asset_id, source_name)) + + try: + # truncate path longer than 50 chars for terminal display + if show_progress and len(source_name) >= 50: + formatted_path = "{:.47}".format(source_name) + "..." + else: + formatted_path = source_name + + # configure progress bar description + msg = Fore.GREEN + f"Uploading {formatted_path}" + + # warn if large file (> 100 MB) + file_size, _ = get_directory_size(source, ignore_file=ignore_file) + file_size_in_mb = file_size / 10**6 + cloud = _get_cloud_details() + cloud_endpoint = cloud["storage_endpoint"] # make sure proper cloud endpoint is used + full_storage_url = f"https://{self.account_name}.blob.{cloud_endpoint}/{self.container}/{dest}" + if file_size_in_mb > 100: + module_logger.warning(FILE_SIZE_WARNING.format(source=source, destination=full_storage_url)) + + # start upload + if os.path.isdir(source): + upload_directory( + storage_client=self, + source=source, + dest=asset_id, + msg=msg, + show_progress=show_progress, + ignore_file=ignore_file, + ) + else: + self.indicator_file = dest + self.check_blob_exists() + upload_file( + storage_client=self, + source=source, + dest=dest, + msg=msg, + show_progress=show_progress, + ) + print(Fore.RESET + "\n", file=sys.stderr) + + # upload must be completed before we try to generate confirmation file + while self.uploaded_file_count < self.total_file_count: + time.sleep(0.5) + self._set_confirmation_metadata(name, version) + except AssetNotChangedError: + name = str(self.name) + version = str(self.version) + if self.legacy: + dest = dest.replace(ARTIFACT_ORIGIN, LEGACY_ARTIFACT_DIRECTORY) + + artifact_info: Dict = { + "remote path": dest, + "name": name, + "version": version, + "indicator file": self.indicator_file, + } + + return artifact_info + + def check_blob_exists(self) -> None: + """Throw error if blob already exists. + + Check if blob already exists in container by checking the metadata for existence and confirmation data. If + confirmation data is missing, blob does not exist or was only partially uploaded and the partial upload will be + overwritten with a complete upload. + """ + + try: + legacy_indicator_file = self.indicator_file.replace(ARTIFACT_ORIGIN, LEGACY_ARTIFACT_DIRECTORY) + blob_client = self.container_client.get_blob_client(blob=self.indicator_file) + legacy_blob_client = self.container_client.get_blob_client(blob=legacy_indicator_file) + + try: + properties = blob_client.get_blob_properties() + except HttpResponseError as e: + if e.error_code == KEY_AUTHENTICATION_ERROR_CODE: # pylint: disable=no-member + formatted_msg = SAS_KEY_AUTHENTICATION_ERROR_MSG.format(e.error_code, e.exc_value) + exception_with_documentation = Exception(formatted_msg) + exception_with_documentation.__traceback__ = e.exc_traceback + raise exception_with_documentation from e + raise e + + metadata = properties.get("metadata") + + # first check legacy folder's metadata to see if artifact is stored there + try: + legacy_properties = legacy_blob_client.get_blob_properties() + legacy_metadata = legacy_properties.get("metadata") + + if ( + legacy_metadata and UPLOAD_CONFIRMATION.items() <= legacy_metadata.items() + ): # checks if metadata dictionary includes confirmation key and value + self.name = legacy_metadata.get("name") + self.version = legacy_metadata.get("version") + self.legacy = True + + raise AssetNotChangedError + except ResourceNotFoundError: + pass + + # check LocalUpload folder's metadata if not found in legacy metadata + if metadata and UPLOAD_CONFIRMATION.items() <= metadata.items(): + self.name = metadata.get("name") + self.version = metadata.get("version") + raise AssetNotChangedError + self.overwrite = True # if upload never confirmed, approve overriding the partial upload + except ResourceNotFoundError: + pass + except Exception as e: + # pylint: disable=no-member + if hasattr(e, "error_code") and e.error_code == STORAGE_AUTH_MISMATCH_ERROR: + msg = ( + "You don't have permission to alter this storage account. " + "Ensure that you have been assigned both Storage Blob Data Reader " + "and Storage Blob Data Contributor roles." + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.ARTIFACT, + error_category=ErrorCategory.USER_ERROR, + ) from e + raise e + + def _set_confirmation_metadata(self, name: str, version: str) -> None: + blob_client = self.container_client.get_blob_client(blob=self.indicator_file) + metadata_dict = _build_metadata_dict(name, version) + blob_client.set_blob_metadata(metadata_dict) + + def download( + self, + starts_with: str, + destination: Union[str, os.PathLike] = Path.home(), + max_concurrency: int = MAX_CONCURRENCY, + ) -> None: + """Downloads all blobs inside a specified container to the destination folder. + + :param starts_with: Indicates the blob name starts with to search. + :type starts_with: str + :param destination: Indicates path to download in local + :type destination: Union[str, os.PathLike[str]] + :param max_concurrency: Indicates concurrent connections to download a blob. + :type max_concurrency: int + """ + try: + my_list = list(self.container_client.list_blobs(name_starts_with=starts_with, include="metadata")) + download_size_in_mb = 0 + for item in my_list: + blob_name = item.name[len(starts_with) :].lstrip("/") or Path(starts_with).name + target_path = Path(destination, blob_name).resolve() + + if _blob_is_hdi_folder(item): + target_path.mkdir(parents=True, exist_ok=True) + continue + + blob_content = self.container_client.download_blob(item) + + # check if total size of download has exceeded 100 MB + # make sure proper cloud endpoint is used + cloud = _get_cloud_details() + cloud_endpoint = cloud["storage_endpoint"] + full_storage_url = f"https://{self.account_name}.blob.{cloud_endpoint}/{self.container}/{starts_with}" + download_size_in_mb += blob_content.size / 10**6 + if download_size_in_mb > 100: + module_logger.warning(FILE_SIZE_WARNING.format(source=full_storage_url, destination=destination)) + + blob_content = blob_content.content_as_bytes(max_concurrency) + target_path.parent.mkdir(parents=True, exist_ok=True) + with target_path.open("wb") as file: + file.write(blob_content) + except OSError as ex: + raise ex + except Exception as e: + msg = "Saving blob with prefix {} was unsuccessful. exception={}" + raise MlException( + message=msg.format(starts_with, e), + no_personal_data_message=msg.format("[starts_with]", "[exception]"), + target=ErrorTarget.ARTIFACT, + error_category=ErrorCategory.USER_ERROR, + error=e, + ) from e + + def list(self, starts_with: str) -> List[str]: + """Lists all blob names in the specified container. + + :param starts_with: Indicates the blob name starts with to search. + :type starts_with: str + :return: the list of blob paths in container + :rtype: List[str] + """ + blobs = self.container_client.list_blobs(name_starts_with=starts_with) + return [blob.name for blob in blobs] + + def exists(self, blobpath: str, delimiter: str = "/") -> bool: + """Returns whether there exists a blob named `blobpath`, or if there exists a virtual directory given path + delimeter `delimeter` + + e.g: + Given blob store with blobs + foobar/baz.txt + foobar/baz.txt + + self.exists("foobar") -> True + self.exists("foobar/baz.txt") -> True + self.exists("foobar/blah.txt") -> False + self.exists("foo") -> False + + + :param str blobpath: prefix matched against blob names + :param str delimiter: The path delimeter (defaults to /) + :return: True if file or virtual directory exists, False otherwise + :rtype: bool + """ + if self.container_client.get_blob_client(blobpath).exists(): + return True + + ensure_delimeter = delimiter if not blobpath.endswith(delimiter) else "" + + # Virtual directory only exists if there is atleast one blob with it + result = next( + self.container_client.walk_blobs(name_starts_with=blobpath + ensure_delimeter, delimiter=delimiter), + None, + ) + return result is not None + + +def _blob_is_hdi_folder(blob: "BlobProperties") -> bool: + """Checks if a given blob actually represents a folder. + + Blob datastores do not natively have any conception of a folder. Instead, + empty blobs with the same name as a "folder" can have additional metadata + specifying that it is actually a folder. + + :param BlobProperties blob: Blob to check + :return: True if blob represents a folder, False otherwise + :rtype: bool + """ + + # Metadata isn't always a populated field, and may need to be explicitly + # requested from whatever function generates the blobproperties object + # + # e.g self.container_client.list_blobs(..., include='metadata') + return bool(blob.metadata and blob.metadata.get(BLOB_DATASTORE_IS_HDI_FOLDER_KEY, None)) diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_constants.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_constants.py new file mode 100644 index 00000000..8c9e97ea --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_constants.py @@ -0,0 +1,60 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + + +CHUNK_SIZE = 1024 +PROCESSES_PER_CORE = 2 +# number of parallel connections to be used for uploads > 64MB and downloads +# pylint: disable=line-too-long +# (Azure Storage param: https://learn.microsoft.com/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#upload-blob-data--blob-type--blobtype-blockblob---blockblob----length-none--metadata-none----kwargs-) +MAX_CONCURRENCY = 16 + +ARTIFACT_ORIGIN = "LocalUpload" +LEGACY_ARTIFACT_DIRECTORY = "az-ml-artifacts" +UPLOAD_CONFIRMATION = {"upload_status": "completed"} + +HASH_ALGORITHM_NAME = "md5" +AML_IGNORE_FILE_NAME = ".amlignore" +GIT_IGNORE_FILE_NAME = ".gitignore" + +ASSET_PATH_ERROR = "(UserError) Asset paths cannot be updated." +CHANGED_ASSET_PATH_MSG = ( + "The code asset {name}:{version} is already linked to an asset " + "in your datastore that does not match the content from your `directory` param " + "and cannot be overwritten. Please provide a unique name or version " + "to successfully create a new code asset." +) +CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA = "The code asset is already linked to an asset." +EMPTY_DIRECTORY_ERROR = "Directory {0} is empty. path or local_path must be a non-empty directory." +FILE_SIZE_WARNING = ( + "Your file exceeds 100 MB. If you experience low speeds, latency, or broken connections, we recommend using " + "the AzCopyv10 tool for this file transfer.\n\nExample: azcopy copy '{source}' '{destination}' " # cspell:disable-line + "\n\nSee https://learn.microsoft.com/azure/storage/common/storage-use-azcopy-v10 for more information." +) +INVALID_MLTABLE_METADATA_SCHEMA_MSG = "Invalid MLTable metadata schema" +INVALID_MLTABLE_METADATA_SCHEMA_ERROR = ( + "{jsonSchemaErrorPath}{jsonSchemaMessage}\n{invalidMLTableMsg}:\n{invalidSchemaSnippet}" +) +BLOB_DATASTORE_IS_HDI_FOLDER_KEY = "hdi_isfolder" +BLOB_STORAGE_CLIENT_NAME = "BlobStorageClient" +GEN2_STORAGE_CLIENT_NAME = "Gen2StorageClient" +DEFAULT_CONNECTION_TIMEOUT = 14400 +STORAGE_URI_REGEX = ( + r"(https:\/\/([a-zA-Z0-9@:%_\\\-+~#?&=]+)[a-zA-Z0-9@:%._\\\-+~#?&=]+\.?)\/([a-zA-Z0-9@:%._\\\-+~#?&=]+)\/?(.*)" +) + +WORKSPACE_MANAGED_DATASTORE_WITH_SLASH = "azureml://datastores/workspacemanageddatastore/" +WORKSPACE_MANAGED_DATASTORE = "azureml://datastores/workspacemanageddatastore" +AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA = ( + "Auto delete setting cannot be specified in JobOutput now. Please remove it and try again." +) +INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA = f'Cannot specify a sub-path for workspace managed datastore. Please set "{WORKSPACE_MANAGED_DATASTORE}" as the path.' +SAS_KEY_AUTHENTICATION_ERROR_MSG = ( + "{0}\n{1}\n" + "This SAS token is derived from an account key, but key-based authentication is not permitted " + "for this storage account. To update workspace properties, please see the documentation: " + "https://review.learn.microsoft.com/azure/machine-learning/how-to-disable-local-auth-storage?view=" + "azureml-api-2&branch=pr-en-us-278974&tabs=cli#update-an-existing-workspace" +) +KEY_AUTHENTICATION_ERROR_CODE = "KeyBasedAuthenticationNotPermitted" diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_fileshare_storage_helper.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_fileshare_storage_helper.py new file mode 100644 index 00000000..01da568c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_fileshare_storage_helper.py @@ -0,0 +1,426 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=client-accepts-api-version-keyword,client-method-missing-type-annotations,missing-client-constructor-parameter-kwargs + +import logging +import os +import sys +import time +from pathlib import Path, PurePosixPath +from typing import Callable, Dict, Optional, Tuple, Union + +from typing_extensions import Literal + +from azure.ai.ml._artifacts._constants import ( + ARTIFACT_ORIGIN, + FILE_SIZE_WARNING, + LEGACY_ARTIFACT_DIRECTORY, + UPLOAD_CONFIRMATION, +) +from azure.ai.ml._utils._asset_utils import ( + DirectoryUploadProgressBar, + FileUploadProgressBar, + IgnoreFile, + _build_metadata_dict, + generate_asset_id, + get_directory_size, + get_upload_files_from_folder, +) +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, MlException +from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError +from azure.storage.fileshare import ShareDirectoryClient, ShareFileClient + +module_logger = logging.getLogger(__name__) + + +class FileStorageClient: + def __init__(self, credential: str, file_share_name: str, account_url: str): + self.directory_client = ShareDirectoryClient( + account_url=account_url, + credential=credential, + share_name=file_share_name, + directory_path=ARTIFACT_ORIGIN, + ) + self.legacy_directory_client = ShareDirectoryClient( + account_url=account_url, + credential=credential, + share_name=file_share_name, + directory_path=LEGACY_ARTIFACT_DIRECTORY, + ) + self.file_share = file_share_name + self.total_file_count = 1 + self.uploaded_file_count = 0 + self.name = None + self.version = None + self.legacy = False + + try: + self.directory_client.create_directory() + except ResourceExistsError: + pass + + self.subdirectory_client = None + + def upload( + self, + source: str, + name: str, + version: str, + ignore_file: IgnoreFile = IgnoreFile(None), + asset_hash: Optional[str] = None, + show_progress: bool = True, + ) -> Dict[Literal["remote path", "name", "version"], str]: + """Upload a file or directory to a path inside the file system. + + :param source: The path to either a file or directory to upload + :type source: str + :param name: The asset name + :type name: str + :param version: The asset version + :type version: str + :param ignore_file: The IgnoreFile that specifies which files, if any, to ignore when uploading files + :type ignore_file: IgnoreFile + :param asset_hash: The asset hash + :type asset_hash: Optional[str] + :param show_progress: Whether to show progress on the console. Defaults to True. + :type show_progress: bool + :return: A dictionary containing info of the uploaded artifact + :rtype: Dict[Literal["remote path", "name", "version"], str] + """ + asset_id = generate_asset_id(asset_hash, include_directory=False) + source_name = Path(source).name + dest = str(PurePosixPath(asset_id, source_name)) + + if not self.exists(asset_id): + # truncate path longer than 50 chars for terminal display + if show_progress and len(source_name) >= 50: + formatted_path = "{:.47}".format(source_name) + "..." + else: + formatted_path = source_name + msg = f"Uploading {formatted_path}" + + # warn if large file (> 100 MB) + file_size, _ = get_directory_size(source) + file_size_in_mb = file_size / 10**6 + if file_size_in_mb > 100: + module_logger.warning(FILE_SIZE_WARNING) + + # start upload + if os.path.isdir(source): + self.upload_dir( + source, + asset_id, + msg=msg, + show_progress=show_progress, + ignore_file=ignore_file, + ) + else: + self.upload_file(source, asset_id, msg=msg, show_progress=show_progress) + + # upload must be completed before we try to generate confirmation file + while self.uploaded_file_count < self.total_file_count: + time.sleep(0.5) + self._set_confirmation_metadata(source, asset_id, name, version) + else: + name = str(self.name) + version = str(self.version) + if self.legacy: + dest = dest.replace(ARTIFACT_ORIGIN, LEGACY_ARTIFACT_DIRECTORY) + artifact_info: Dict = {"remote path": dest, "name": name, "version": version} + + return artifact_info + + def upload_file( + self, + source: Union[str, os.PathLike], + dest: str, + show_progress: bool = False, + msg: Optional[str] = None, + in_directory: bool = False, + subdirectory_client: Optional[ShareDirectoryClient] = None, + callback: Optional[Callable[[Dict], None]] = None, + ) -> None: + """Upload a single file to a path inside the file system directory. + + :param source: The file to upload + :type source: Union[str, os.PathLike] + :param dest: The destination in the fileshare to upload to + :type dest: str + :param show_progress: Whether to show progress on the console. Defaults to False. + :type show_progress: bool + :param msg: Message to display on progress bar. Defaults to None. + :type msg: Optional[str] + :param in_directory: Whether this function is being called by :attr:`FileStorageClient.upload_dir`. Defaults + to False. + :type in_directory: bool + :param subdirectory_client: The subdirectory client. + :type subdirectory_client: Optional[ShareDirectoryClient] + :param callback: A callback that receives the raw requests returned by the service during the upload process. + Only used if `in_directory` and `show_progress` are True. + :type callback: Optional[Callable[[Dict], None]] + """ + validate_content = os.stat(source).st_size > 0 # don't do checksum for empty files + + with open(source, "rb") as data: + if in_directory: + file_name = dest.rsplit("/")[-1] + if subdirectory_client is not None: + if show_progress: + subdirectory_client.upload_file( + file_name=file_name, + data=data, + validate_content=validate_content, + raw_response_hook=callback, + ) + else: + subdirectory_client.upload_file( + file_name=file_name, + data=data, + validate_content=validate_content, + ) + else: + if show_progress: + with FileUploadProgressBar(msg=msg) as pbar: + self.directory_client.upload_file( + file_name=dest, + data=data, + validate_content=validate_content, + raw_response_hook=pbar.update_to, + ) + else: + self.directory_client.upload_file(file_name=dest, data=data, validate_content=validate_content) + self.uploaded_file_count = self.uploaded_file_count + 1 + + def upload_dir( + self, + source: Union[str, os.PathLike], + dest: str, + msg: str, + show_progress: bool, + ignore_file: IgnoreFile, + ) -> None: + """Upload a directory to a path inside the fileshare directory. + + :param source: The directory to upload + :type source: Union[str, os.PathLike] + :param dest: The destination in the fileshare to upload to + :type dest: str + :param msg: Message to display on progress bar + :type msg: str + :param show_progress: Whether to show progress on the console. + :type show_progress: bool + :param ignore_file: The IgnoreFile that specifies which files, if any, to ignore when uploading files + :type ignore_file: IgnoreFile + """ + subdir = self.directory_client.create_subdirectory(dest) + source_path = Path(source).resolve() + prefix = "" if dest == "" else dest + "/" + prefix += os.path.basename(source) + "/" + + upload_paths = sorted(get_upload_files_from_folder(source_path, prefix=prefix, ignore_file=ignore_file)) + self.total_file_count = len(upload_paths) + + for root, *_ in os.walk(source): # type: ignore[type-var] + if sys.platform.startswith(("win32", "cygwin")): + split_char = "\\" + else: + split_char = "/" + trunc_root = root.rsplit(split_char)[-1] # type: ignore[union-attr] + subdir = subdir.create_subdirectory(trunc_root) + + if show_progress: + with DirectoryUploadProgressBar(dir_size=get_directory_size(source_path), msg=msg) as pbar: + for src, destination in upload_paths: + self.upload_file( + src, + destination, + in_directory=True, + subdirectory_client=subdir, + show_progress=show_progress, + callback=pbar.update_to, + ) + else: + for src, destination in upload_paths: + self.upload_file( + src, + destination, + in_directory=True, + subdirectory_client=subdir, + show_progress=show_progress, + ) + + def exists(self, asset_id: str) -> bool: + """Check if file or directory already exists in fileshare directory. + + :param asset_id: The file or directory + :type asset_id: str + :return: True if the file or directory exists, False otherwise + :rtype: bool + """ + # get dictionary of asset ids and if each asset is a file or directory (e.g. {"ijd930j23d8": True}) + default_directory_items = { + item["name"]: item["is_directory"] for item in self.directory_client.list_directories_and_files() + } + try: + legacy_directory_items = { + item["name"]: item["is_directory"] for item in self.legacy_directory_client.list_directories_and_files() + } + except ResourceNotFoundError: + # if the legacy directory does not exist, a ResourceNotFoundError is thrown. For a new file share + # without this directory, this will fail an upload into this file share. We catch the error here + # so that uploading a file into a file share that does not have this directory will not fail. + # We don't have this issue with the default directory since the constructor of this class creates + # the default directory if it does not already exist. + legacy_directory_items = {} + existing_items = {**default_directory_items, **legacy_directory_items} + + if asset_id in existing_items: + client, properties = self._get_asset_metadata(asset_id, default_directory_items, legacy_directory_items) + metadata = properties.get("metadata") + if metadata and UPLOAD_CONFIRMATION.items() <= metadata.items(): + self.name = metadata.get("name") + self.version = metadata.get("version") + return True + if not self.legacy: + delete(client) # If past upload never reached upload confirmation, delete and proceed to upload + return False + + def download( + self, + starts_with: str = "", + destination: str = str(Path.home()), + max_concurrency: int = 4, + ) -> None: + """Downloads all contents inside a specified fileshare directory. + + :param starts_with: The prefix used to filter files to download + :type starts_with: str + :param destination: The destination to download to. Default to user's home directory. + :type destination: str + :param max_concurrency: The maximum number of concurrent downloads. Defaults to 4. + :type max_concurrency: int + """ + recursive_download( + client=self.directory_client, + starts_with=starts_with, + destination=destination, + max_concurrency=max_concurrency, + ) + + def _set_confirmation_metadata(self, source: str, dest: str, name: str, version: str) -> None: + metadata_dict = _build_metadata_dict(name, version) + if os.path.isdir(source): + properties = self.directory_client.get_subdirectory_client(dest) + properties.set_directory_metadata(metadata_dict) + else: + properties = self.directory_client.get_file_client(dest) + properties.set_file_metadata(metadata_dict) + + def _get_asset_metadata( + self, + asset_id: str, + default_items: Dict[str, bool], + legacy_items: Dict[str, bool], + ) -> Tuple: + # if asset_id key's value doesn't match either bool, + # it's not in the dictionary and we check "LocalUpload" dictionary below. + + client, properties = None, None + + if legacy_items.get(asset_id) is True: + self.legacy = True + client = self.legacy_directory_client.get_subdirectory_client(asset_id) + properties = client.get_directory_properties() + elif legacy_items.get(asset_id) is False: + self.legacy = True + client = self.legacy_directory_client.get_file_client(asset_id) + properties = client.get_file_properties() + if client and properties: + return ( + client, + properties, + ) # if found in legacy, no need to look in "LocalUpload" + + if default_items.get(asset_id) is True: + client = self.directory_client.get_subdirectory_client(asset_id) + properties = client.get_directory_properties() + elif default_items.get(asset_id) is False: + client = self.directory_client.get_file_client(asset_id) + properties = client.get_file_properties() + + return client, properties + + +def delete(root_client: Union[ShareDirectoryClient, ShareFileClient]) -> None: + """Deletes a file or directory recursively. + + Azure File Share SDK does not allow overwriting, so if an upload is + interrupted before it can finish, the files from that upload must be + deleted before the upload can be re-attempted. + + :param root_client: The client used to delete the file or directory + :type root_client: Union[ShareDirectoryClient, ShareFileClient] + """ + if isinstance(root_client, ShareFileClient): + root_client.delete_file() + return + + all_contents = list(root_client.list_directories_and_files()) + len_contents = sum(1 for _ in all_contents) + if len_contents > 0: + for f in all_contents: + if f["is_directory"]: + f_client = root_client.get_subdirectory_client(f["name"]) + delete(f_client) + else: + root_client.delete_file(f["name"]) + root_client.delete_directory() + + +def recursive_download( + client: ShareDirectoryClient, + destination: str, + max_concurrency: int, + starts_with: str = "", +) -> None: + """Helper function for `download`. + + Recursively downloads remote fileshare directory locally + + :param client: The share directory client + :type client: ShareDirectoryClient + :param destination: The destination path to download to + :type destination: str + :param max_concurrency: The maximum number of concurrent downloads + :type max_concurrency: int + :param starts_with: The prefix used to filter files to download. Defaults to "" + :type starts_with: str + """ + try: + items = list(client.list_directories_and_files(name_starts_with=starts_with)) + files = [item for item in items if not item["is_directory"]] + folders = [item for item in items if item["is_directory"]] + + for f in files: + Path(destination).mkdir(parents=True, exist_ok=True) + file_name = f["name"] + file_client = client.get_file_client(file_name) + file_content = file_client.download_file(max_concurrency=max_concurrency) + local_path = Path(destination, file_name) + with open(local_path, "wb") as file_data: + file_data.write(file_content.readall()) + + for f in folders: + sub_client = client.get_subdirectory_client(f["name"]) + destination = "/".join((destination, f["name"])) + recursive_download(sub_client, destination=destination, max_concurrency=max_concurrency) + except Exception as e: + msg = f"Saving fileshare directory with prefix {starts_with} was unsuccessful." + raise MlException( + message=msg.format(starts_with), + no_personal_data_message=msg.format("[prefix]"), + target=ErrorTarget.ARTIFACT, + error_category=ErrorCategory.SYSTEM_ERROR, + ) from e diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_gen2_storage_helper.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_gen2_storage_helper.py new file mode 100644 index 00000000..7cd3b6c6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_artifacts/_gen2_storage_helper.py @@ -0,0 +1,266 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=client-accepts-api-version-keyword,too-many-instance-attributes,client-method-missing-type-annotations,missing-client-constructor-parameter-kwargs,logging-format-interpolation + +import logging +import os +import sys +import time +import uuid +from pathlib import Path, PurePosixPath +from typing import Dict, List, Optional, Union + +from colorama import Fore +from typing_extensions import Literal + +from azure.ai.ml._artifacts._constants import FILE_SIZE_WARNING, UPLOAD_CONFIRMATION +from azure.ai.ml._azure_environments import _get_cloud_details +from azure.ai.ml._utils._asset_utils import ( + AssetNotChangedError, + IgnoreFile, + _build_metadata_dict, + generate_asset_id, + get_directory_size, + upload_directory, + upload_file, +) +from azure.ai.ml.constants._common import STORAGE_AUTH_MISMATCH_ERROR +from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, MlException, ValidationException +from azure.core.exceptions import ResourceExistsError +from azure.storage.filedatalake import DataLakeServiceClient + +module_logger = logging.getLogger(__name__) + + +class Gen2StorageClient: + def __init__(self, credential: str, file_system: str, account_url: str): + self.account_name = account_url.split(".")[0].split("//")[1] + self.file_system = file_system + + try: + service_client = DataLakeServiceClient(account_url=account_url, credential=credential) + self.file_system_client = service_client.get_file_system_client(file_system=file_system) + except ValueError as e: + api_version = e.args[0].split("\n")[-1] + service_client = DataLakeServiceClient( + account_url=account_url, credential=credential, api_version=api_version + ) + self.file_system_client = service_client.get_file_system_client(file_system=file_system) + + try: + self.file_system_client.create_file_system() + except ResourceExistsError: + pass + + self.directory_client = None + self.sub_directory_client = None + self.temp_sub_directory_client = None + self.file_client = None + self.total_file_count = 1 + self.uploaded_file_count = 0 + self.name = None + self.version = None + + def upload( + self, + source: str, + name: str, + version: str, + ignore_file: IgnoreFile = IgnoreFile(None), + asset_hash: Optional[str] = None, + show_progress: bool = True, + ) -> Dict[Literal["remote path", "name", "version", "indicator file"], str]: + """Upload a file or directory to a path inside the filesystem. + + :param source: The path to either a file or directory to upload + :type source: str + :param name: The asset name + :type name: str + :param version: The asset version + :type version: str + :param ignore_file: The IgnoreFile that specifies which files, if any, to ignore when uploading files + :type ignore_file: IgnoreFile + :param asset_hash: The asset hash + :type asset_hash: Optional[str] + :param show_progress: Whether to show progress on the console. Defaults to True. + :type show_progress: bool + :return: A dictionary containing info of the uploaded artifact + :rtype: Dict[Literal["remote path", "name", "version", "indicator file"], str] + """ + if name and version is None: + version = str(uuid.uuid4()) # placeholder for auto-increment artifacts + + asset_id = generate_asset_id(asset_hash, include_directory=True) + source_name = Path(source).name + dest = str(PurePosixPath(asset_id, source_name)) + + try: + # truncate path longer than 50 chars for terminal display + if show_progress and len(source_name) >= 50: + formatted_path = "{:.47}".format(source_name) + "..." + else: + formatted_path = source_name + + # configure progress bar description + msg = Fore.GREEN + f"Uploading {formatted_path}" + + # warn if large file (> 100 MB) + file_size, _ = get_directory_size(source) + file_size_in_mb = file_size / 10**6 + + cloud = _get_cloud_details() + cloud_endpoint = cloud["storage_endpoint"] # make sure proper cloud endpoint is used + full_storage_url = f"https://{self.account_name}.dfs.{cloud_endpoint}/{self.file_system}/{dest}" + if file_size_in_mb > 100: + module_logger.warning(FILE_SIZE_WARNING.format(source=source, destination=full_storage_url)) + + # start upload + self.directory_client = self.file_system_client.get_directory_client(asset_id) + self.check_blob_exists() + + if os.path.isdir(source): + upload_directory( + storage_client=self, + source=source, + dest=asset_id, + msg=msg, + show_progress=show_progress, + ignore_file=ignore_file, + ) + else: + upload_file( + storage_client=self, + source=source, + msg=msg, + show_progress=show_progress, + ) + print(Fore.RESET + "\n", file=sys.stderr) + + # upload must be completed before we try to generate confirmation file + while self.uploaded_file_count < self.total_file_count: + time.sleep(0.5) + self._set_confirmation_metadata(name, version) + except AssetNotChangedError: + name = str(self.name) + version = str(self.version) + + artifact_info: Dict = { + "remote path": dest, + "name": name, + "version": version, + "indicator file": asset_id, + } + + return artifact_info + + def check_blob_exists(self) -> None: + """Throw error if file or directory already exists. + + Check if file or directory already exists in filesystem by + checking the metadata for existence and confirmation data. If + confirmation data is missing, file or directory does not exist + or was only partially uploaded and the partial upload will be + overwritten with a complete upload. + """ + try: + if self.directory_client is not None and self.directory_client.exists(): + metadata = self.directory_client.get_directory_properties().metadata + + if ( + metadata and UPLOAD_CONFIRMATION.items() <= metadata.items() + ): # checks if metadata dictionary includes confirmation key and value + self.name = metadata.get("name") + self.version = metadata.get("version") + raise AssetNotChangedError + except Exception as e: + # pylint: disable=no-member + if hasattr(e, "error_code") and e.error_code == STORAGE_AUTH_MISMATCH_ERROR: + msg = ( + "You don't have permission to alter this storage account. " + "Ensure that you have been assigned both " + "Storage Blob Data Reader and Storage Blob Data Contributor roles." + ) + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.ARTIFACT, + error_category=ErrorCategory.USER_ERROR, + ) from e + raise e + + def _set_confirmation_metadata(self, name: str, version: str) -> None: + if self.directory_client is not None: + self.directory_client.set_metadata(_build_metadata_dict(name, version)) + + def download(self, starts_with: str, destination: Union[str, os.PathLike] = Path.home()) -> None: + """Downloads all items inside a specified filesystem directory with the prefix `starts_with` to the destination + folder. + + :param starts_with: The prefix used to filter items to download + :type starts_with: str + :param destination: The path to download items to + :type destination: Union[str, os.PathLike] + """ + try: + mylist = self.file_system_client.get_paths(path=starts_with) + download_size_in_mb = 0 + for item in mylist: + file_name = item.name[len(starts_with) :].lstrip("/") or Path(starts_with).name + target_path = Path(destination, file_name) + + if item.is_directory: + target_path.mkdir(parents=True, exist_ok=True) + continue + + file_client = self.file_system_client.get_file_client(item.name) + + # check if total size of download has exceeded 100 MB + cloud = _get_cloud_details() + cloud_endpoint = cloud["storage_endpoint"] # make sure proper cloud endpoint is used + full_storage_url = f"https://{self.account_name}.dfs.{cloud_endpoint}/{self.file_system}/{starts_with}" + download_size_in_mb += file_client.get_file_properties().size / 10**6 + if download_size_in_mb > 100: + module_logger.warning(FILE_SIZE_WARNING.format(source=full_storage_url, destination=destination)) + + file_content = file_client.download_file().readall() + try: + os.makedirs(str(target_path.parent), exist_ok=True) + except FileExistsError: + pass + with target_path.open("wb") as f: + f.write(file_content) + except OSError as ex: + raise ex + except Exception as e: + msg = "Saving output with prefix {} was unsuccessful. exception={}" + raise MlException( + message=msg.format(starts_with, e), + no_personal_data_message=msg.format("[starts_with]", "[exception]"), + target=ErrorTarget.ARTIFACT, + error_category=ErrorCategory.USER_ERROR, + error=e, + ) from e + + def list(self, starts_with: str) -> List[str]: + """Lists all file names in the specified filesystem with the prefix + `starts_with` + + :param starts_with: The prefix used to filter results + :type starts_with: str + :return: The list of filenames that start with the prefix + :rtype: List[str] + """ + return [f.get("name") for f in self.file_system_client.get_paths(path=starts_with)] + + def exists(self, path: str) -> bool: + """Returns whether there exists a file named `path` + + :param path: The path to check + :type path: str + :return: True if `path` exists, False otherwise + :rtype: bool + """ + file_client = self.file_system_client.get_file_client(path) + return file_client.exists() |
