aboutsummaryrefslogtreecommitdiff
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access,too-many-lines

import hashlib
import logging
import os
import json
import uuid
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import suppress
from multiprocessing import cpu_count
from os import PathLike
from pathlib import Path
from platform import system
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Union,
    cast,
)

from colorama import Fore
from tqdm import TqdmWarning, tqdm
from typing_extensions import Literal

from azure.ai.ml._artifacts._constants import (
    AML_IGNORE_FILE_NAME,
    ARTIFACT_ORIGIN,
    AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
    BLOB_STORAGE_CLIENT_NAME,
    CHUNK_SIZE,
    DEFAULT_CONNECTION_TIMEOUT,
    EMPTY_DIRECTORY_ERROR,
    GEN2_STORAGE_CLIENT_NAME,
    GIT_IGNORE_FILE_NAME,
    INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
    MAX_CONCURRENCY,
    PROCESSES_PER_CORE,
    UPLOAD_CONFIRMATION,
    WORKSPACE_MANAGED_DATASTORE,
    WORKSPACE_MANAGED_DATASTORE_WITH_SLASH,
)
from azure.ai.ml._restclient.v2022_02_01_preview.operations import (
    ComponentContainersOperations,
    ComponentVersionsOperations,
    DataContainersOperations,
    DataVersionsOperations,
    EnvironmentContainersOperations,
    EnvironmentVersionsOperations,
    ModelContainersOperations,
    ModelVersionsOperations,
)
from azure.ai.ml._restclient.v2022_05_01.models import (
    DataVersionBaseData,
    ModelVersionData,
    ModelVersionResourceArmPaginatedResult,
)
from azure.ai.ml._restclient.v2023_04_01.models import PendingUploadRequestDto
from azure.ai.ml._utils._pathspec import GitWildMatchPattern, normalize_file
from azure.ai.ml._utils.utils import convert_windows_path_to_unix, retry, snake_to_camel
from azure.ai.ml.constants._common import (
    MAX_AUTOINCREMENT_ATTEMPTS,
    DefaultOpenEncoding,
    OrderString,
)
from azure.ai.ml.entities._assets.asset import Asset
from azure.ai.ml.exceptions import (
    AssetPathException,
    EmptyDirectoryError,
    ErrorCategory,
    ErrorTarget,
    ValidationErrorType,
    ValidationException,
)
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError

if TYPE_CHECKING:
    from azure.ai.ml.operations import (
        ComponentOperations,
        DataOperations,
        EnvironmentOperations,
        IndexOperations,
        ModelOperations,
    )

hash_type = type(hashlib.md5())  # nosec

module_logger = logging.getLogger(__name__)


class AssetNotChangedError(Exception):
    pass


class IgnoreFile(object):
    def __init__(self, file_path: Optional[Union[str, os.PathLike]] = None):
        """Base class for handling .gitignore and .amlignore files.

        :param file_path: Relative path, or absolute path to the ignore file.
        """
        path = Path(file_path).resolve() if file_path else None
        self._path = path
        self._path_spec = None

    def exists(self) -> bool:
        """Checks if ignore file exists.
        :return: True if file exists. False Otherwise
        :rtype: bool
        """
        return self._file_exists()

    def _file_exists(self) -> bool:
        return self._path and self._path.exists()

    @property
    def base_path(self) -> Path:
        return self._path.parent

    def _get_ignore_list(self) -> List[str]:
        """Get ignore list from ignore file contents.

        :return: The lines of the ignore file
        :rtype: List[str]
        """
        if not self.exists():
            return []
        if self._file_exists():
            with open(self._path, "r", encoding=DefaultOpenEncoding.READ) as fh:
                return [line.rstrip() for line in fh if line]
        return []

    def _create_pathspec(self) -> List[GitWildMatchPattern]:
        """Creates path specification based on ignore list.

        :return: Path specification
        :rtype: List[GitWildMatchPattern]
        """
        return [GitWildMatchPattern(ignore) for ignore in self._get_ignore_list()]

    def _get_rel_path(self, file_path: Union[str, os.PathLike]) -> Optional[str]:
        """Get relative path of given file_path.

        :param file_path: A file path
        :type file_path: Union[str, os.PathLike]
        :return: file_path relative to base_path, if computable. None otherwise
        :rtype: Optional[str]
        """
        file_path = Path(file_path).absolute()
        try:
            # use os.path.relpath instead of Path.relative_to in case file_path is not a child of self.base_path
            return os.path.relpath(file_path, self.base_path)
        except ValueError:
            # 2 paths are on different drives
            return None

    def is_file_excluded(self, file_path: Union[str, os.PathLike]) -> bool:
        """Checks if given file_path is excluded.

        :param file_path: File path to be checked against ignore file specifications
        :type file_path: Union[str, os.PathLike]
        :return: Whether the file is excluded by ignore file
        :rtype: bool
        """
        # TODO: current design of ignore file can't distinguish between files and directories of the same name
        if self._path_spec is None:
            self._path_spec = self._create_pathspec()
        if not self._path_spec:
            return False
        file_path = self._get_rel_path(file_path)
        if file_path is None:
            return True

        norm_file = normalize_file(file_path)
        matched = False
        for pattern in self._path_spec:
            if pattern.include is not None:
                if pattern.match_file(norm_file) is not None:
                    matched = pattern.include

        return matched

    @property
    def path(self) -> Union[Path, str]:
        return self._path


class AmlIgnoreFile(IgnoreFile):
    def __init__(self, directory_path: Union[Path, str]):
        file_path = Path(directory_path).joinpath(AML_IGNORE_FILE_NAME)
        super(AmlIgnoreFile, self).__init__(file_path)


class GitIgnoreFile(IgnoreFile):
    def __init__(self, directory_path: Union[Path, str]):
        file_path = Path(directory_path).joinpath(GIT_IGNORE_FILE_NAME)
        super(GitIgnoreFile, self).__init__(file_path)


def get_ignore_file(directory_path: Union[Path, str]) -> IgnoreFile:
    """Finds and returns IgnoreFile object based on ignore file found in directory_path.

    .amlignore takes precedence over .gitignore and if no file is found, an empty
    IgnoreFile object will be returned.

    The ignore file must be in the root directory.

    :param directory_path: Path to the (root) directory where ignore file is located
    :type directory_path: Union[Path, str]
    :return: The IgnoreFile found in the directory
    :rtype: IgnoreFile
    """
    aml_ignore = AmlIgnoreFile(directory_path)
    git_ignore = GitIgnoreFile(directory_path)

    if aml_ignore.exists():
        return aml_ignore
    if git_ignore.exists():
        return git_ignore
    return IgnoreFile()


def _validate_path(path: Union[str, os.PathLike], _type: str) -> None:
    path = Path(path)  # Okay to do this since Path is idempotent
    if not path.is_file() and not path.is_dir():
        raise ValidationException(
            message=f"No such file or directory: {path}",
            target=_type,
            error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
            no_personal_data_message="No such file or directory",
            error_category=ErrorCategory.USER_ERROR,
        )


def _parse_name_version(
    name: Optional[str] = None, version_as_int: bool = True
) -> Tuple[Optional[str], Optional[Union[str, int]]]:
    if not name:
        return None, None

    token_list = name.split(":")
    if len(token_list) == 1:
        return name, None
    *name, version = token_list
    if version_as_int:
        version = int(version)
    return ":".join(name), version


def _get_file_hash(filename: Union[str, os.PathLike], _hash: hash_type) -> hash_type:
    with open(str(filename), "rb") as f:
        for chunk in iter(lambda: f.read(CHUNK_SIZE), b""):
            _hash.update(chunk)
    return _hash


def delete_two_catalog_files(path):
    """
    Function that deletes the "catalog.json" and "catalog.json.sig" files located at 'path', if they exist

    :param path: Path to the folder for signing
    :type path: Union[Path, str]
    :return: None
    """
    # catalog.json
    file_path_json = os.path.join(path, "catalog.json")
    if os.path.exists(file_path_json):
        module_logger.warning("%s already exists. Deleting it", file_path_json)
        os.remove(file_path_json)
    # catalog.json.sig
    file_path_json_sig = os.path.join(path, "catalog.json.sig")
    if os.path.exists(file_path_json_sig):
        module_logger.warning("%s already exists. Deleting it", file_path_json_sig)
        os.remove(file_path_json_sig)


def create_catalog_files(path, json_stub):
    with open(os.path.join(path, "catalog.json"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile1:
        json.dump(json_stub, jsonFile1)
    with open(os.path.join(path, "catalog.json.sig"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile2:
        json.dump(json_stub, jsonFile2)


def _get_dir_hash(directory: Union[str, os.PathLike], _hash: hash_type, ignore_file: IgnoreFile) -> hash_type:
    dir_contents = Path(directory).iterdir()
    sorted_contents = sorted(dir_contents, key=lambda path: str(path).lower())
    for path in sorted_contents:
        if ignore_file.is_file_excluded(path):
            continue
        _hash.update(path.name.encode())
        if os.path.islink(path):  # ensure we're hashing the contents of the linked file
            path = _resolve_path(path)
        if path.is_file():
            _hash = _get_file_hash(path, _hash)
        elif path.is_dir():
            _hash = _get_dir_hash(path, _hash, ignore_file)
    return _hash


def _build_metadata_dict(name: str, version: str) -> Dict[str, str]:
    """Build metadata dictionary to attach to uploaded data.

    Metadata includes an upload confirmation field, and for code uploads only, the name and version of the code asset
    being created for that data.

    :param name: The name of the uploaded data
    :type name: str
    :param version: The version of the uploaded data
    :type version: str
    :return: Metadata dict
    :rtype: Dict[str, str]
    """
    if name:
        linked_asset_arm_id = {"name": name, "version": version}
    else:
        msg = "'name' cannot be NoneType for asset artifact upload."
        raise ValidationException(
            message=msg,
            no_personal_data_message=msg,
            target=ErrorTarget.ASSET,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.INVALID_VALUE,
        )

    metadata_dict = {**UPLOAD_CONFIRMATION, **linked_asset_arm_id}
    return metadata_dict


def get_object_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> str:
    _hash = hashlib.md5(b"Initialize for october 2021 AML CLI version")  # nosec
    if Path(path).is_dir():
        object_hash = _get_dir_hash(directory=path, _hash=_hash, ignore_file=ignore_file)
    else:
        if os.path.islink(path):  # ensure we're hashing the contents of the linked file
            path = _resolve_path(Path(path))
        object_hash = _get_file_hash(filename=path, _hash=_hash)
    return str(object_hash.hexdigest())


def get_content_hash_version():
    return 202208


def get_content_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> Optional[str]:
    """Generating sha256 hash for file/folder,

    e.g. Code snapshot fingerprints to prevent tampering.

    The process of hashing is:
    1. If it's a link, get the actual path of the link.
    2. If it's a file, append file content.
    3. If it's a folder:
        1. list all files under the folder
        2. convert file count to str and append to hash
        3. sort the files by lower case of relative path
        4. for each file append '#'+relative path+'#' and file size to hash
        5. do another iteration on file list to append each files content to hash.
        The example of absolute path to relative path mapping is:
        [
            ('/mnt/c/codehash/code/file1.txt', 'file1.txt'),
            ('/mnt/c/codehash/code/folder1/file1.txt', 'folder1/file1.txt'),
            ('/mnt/c/codehash/code/Folder2/file1.txt', 'Folder2/file1.txt'),
            ('/mnt/c/codehash/code/Folder2/folder1/file1.txt', 'Folder2/folder1/file1.txt')
        ]
    4. Hash the content and convert to hex digest string.

    :param path: The directory to calculate the size of
    :type path: Union[str, os.PathLike]
    :param ignore_file: An ignore file that specifies files to ignore when computing the size
    :type ignore_file: IgnoreFile
    :return: The content hash if the content is a link, directory, or file. None otherwise
    :rtype: Optional[str]
    """
    # DO NOT change this function unless you change the verification logic together
    actual_path = path
    if os.path.islink(path):
        actual_path = _resolve_path(Path(path)).as_posix()
    if os.path.isdir(actual_path):
        return _get_file_list_content_hash(get_upload_files_from_folder(actual_path, ignore_file=ignore_file))
    if os.path.isfile(actual_path):
        return _get_file_list_content_hash([(actual_path, Path(actual_path).name)])
    return None


def get_upload_files_from_folder(
    path: Union[str, os.PathLike],
    *,
    prefix: str = "",
    ignore_file: IgnoreFile = IgnoreFile(),
) -> List[str]:
    path = Path(path)
    upload_paths = []
    for root, _, files in os.walk(path, followlinks=True):
        upload_paths += list(
            traverse_directory(
                root,
                files,
                prefix=Path(prefix).joinpath(Path(root).relative_to(path)).as_posix(),
                ignore_file=ignore_file,
            )
        )
    return upload_paths


def _get_file_list_content_hash(file_list) -> str:
    # file_list is a list of tuples, (absolute_path, relative_path)

    _hash = hashlib.sha256()
    # Add file count to the hash and add '#' around file name then add each file's size to avoid collision like:
    # Case 1:
    # 'a.txt' with contents 'a'
    # 'b.txt' with contents 'b'
    #
    # Case 2:
    # cspell:disable-next-line
    # 'a.txt' with contents 'ab.txtb'
    _hash.update(str(len(file_list)).encode())
    # Sort by "destination" path, since in this function destination prefix is empty and keep the link name in path.
    for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
        _hash.update(("#" + file_name + "#").encode())
        _hash.update(str(os.path.getsize(file_path)).encode())
    for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
        _hash = _get_file_hash(file_path, _hash)
    return str(_hash.hexdigest())


def traverse_directory(  # pylint: disable=unused-argument
    root: str,
    files: List[str],
    *,
    prefix: str,
    ignore_file: IgnoreFile = IgnoreFile(),
    # keep this for backward compatibility
    **kwargs: Any,
) -> Iterable[Tuple[str, Union[str, Any]]]:
    """Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage.
    e.g.

    [/mnt/c/Users/dipeck/upload_files/my_file1.txt,
    /mnt/c/Users/dipeck/upload_files/my_file2.txt] -->

        [(/mnt/c/Users/dipeck/upload_files/my_file1.txt, LocalUpload/<guid>/upload_files/my_file1.txt),
        (/mnt/c/Users/dipeck/upload_files/my_file2.txt, LocalUpload/<guid>/upload_files/my_file2.txt))]

    :param root: Root directory path
    :type root: str
    :param files: List of all file paths in the directory
    :type files: List[str]
    :keyword prefix: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
    :paramtype prefix: str
    :keyword ignore_file: The .amlignore or .gitignore file in the project directory
    :paramtype ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile
    :return: Zipped list of tuples representing the local path and remote destination path for each file
    :rtype: Iterable[Tuple[str, Union[str, Any]]]
    """
    # Normalize Windows paths. Note that path should be resolved first as long part will be converted to a shortcut in
    # Windows. For example, C:\Users\too-long-user-name\test will be converted to C:\Users\too-lo~1\test by default.
    # Refer to https://en.wikipedia.org/wiki/8.3_filename for more details.
    root = Path(root).resolve().absolute()

    # filter out files excluded by the ignore file
    # TODO: inner ignore file won't take effect. A merged IgnoreFile need to be generated in code resolution.
    origin_file_paths = [
        root.joinpath(filename)
        for filename in files
        if not ignore_file.is_file_excluded(root.joinpath(filename).as_posix())
    ]

    result = []
    for origin_file_path in origin_file_paths:
        relative_path = origin_file_path.relative_to(root)
        result.append(
            (
                _resolve_path(origin_file_path).as_posix(),
                Path(prefix).joinpath(relative_path).as_posix(),
            )
        )
    return result


def _resolve_path(path: Path) -> Path:
    if not path.is_symlink():
        return path

    link_path = path.resolve()
    if not link_path.is_absolute():
        link_path = path.parent.joinpath(link_path).resolve()
    return _resolve_path(link_path)


def generate_asset_id(asset_hash: str, include_directory=True) -> str:
    asset_id = asset_hash or str(uuid.uuid4())
    if include_directory:
        asset_id = "/".join((ARTIFACT_ORIGIN, asset_id))
    return asset_id


def get_directory_size(
    root: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile(None)
) -> Tuple[int, Dict[str, int]]:
    """Returns total size of a directory and a dictionary itemizing each sub- path and its size.

    If an optional ignore_file argument is provided, then files specified in the ignore file are not included in the
    directory size calculation.

    :param root: The directory to calculate the size of
    :type root: Union[str, os.PathLike]
    :param ignore_file: An ignore file that specifies files to ignore when computing the size
    :type ignore_file: IgnoreFile
    :return: The computed size of the directory, and the sizes of the child paths
    :rtype: Tuple[int, Dict[str, int]]
    """
    total_size = 0
    size_list = {}
    for dirpath, _, filenames in os.walk(root, followlinks=True):
        for name in filenames:
            full_path = os.path.join(dirpath, name)
            # Don't count files that are excluded by an ignore file
            if ignore_file.is_file_excluded(full_path):
                continue
            if not os.path.islink(full_path):
                path_size = os.path.getsize(full_path)
            else:
                # ensure we're counting the size of the linked file
                # os.readlink returns a file path relative to dirpath, and must be
                # re-joined to get a workable result
                path_size = os.path.getsize(os.path.join(dirpath, os.readlink(convert_windows_path_to_unix(full_path))))
            size_list[full_path] = path_size
            total_size += path_size
    return total_size, size_list


def upload_file(
    storage_client: Union["BlobStorageClient", "Gen2StorageClient"],
    source: str,
    dest: Optional[str] = None,
    msg: Optional[str] = None,
    size: int = 0,
    show_progress: Optional[bool] = None,
    in_directory: bool = False,
    callback: Optional[Any] = None,
) -> None:
    """Upload a single file to remote storage.

    :param storage_client: Storage client object
    :type storage_client: Union[
        azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient,
        azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient]
    :param source: Local path to project directory
    :type source: str
    :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
    :type dest: str
    :param msg: Message to be shown with progress bar (e.g. "Uploading <source>")
    :type msg: str
    :param size: Size of the file in bytes
    :type size: int
    :param show_progress: Whether to show progress bar or not
    :type show_progress: bool
    :param in_directory: Whether the file is part of a directory of files
    :type in_directory: bool
    :param callback: Callback to progress bar
    :type callback: Any
    :return: None
    """
    validate_content = size > 0  # don't do checksum for empty files

    if (
        type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME
    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
        if in_directory:
            storage_client.temp_sub_directory_client = None
            file_name_tail = dest.split(os.path.sep)[-1]
            # Indexing from 2 because the first two parts of the remote path will always be LocalUpload/<asset_id>
            all_sub_folders = dest.split(os.path.sep)[2:-1]

            # Create remote directories for each nested directory if file is in a nested directory
            for sub_folder in all_sub_folders:
                if storage_client.temp_sub_directory_client:
                    storage_client.temp_sub_directory_client = (
                        storage_client.temp_sub_directory_client.create_sub_directory(sub_folder)
                    )
                else:
                    storage_client.temp_sub_directory_client = storage_client.directory_client.create_sub_directory(
                        sub_folder
                    )

            storage_client.file_client = storage_client.temp_sub_directory_client.create_file(file_name_tail)
        else:
            storage_client.file_client = storage_client.directory_client.create_file(source.split("/")[-1])

    with open(source, "rb") as data:
        if show_progress and not in_directory:
            file_size, _ = get_directory_size(source)
            file_size_in_mb = file_size / 10**6
            if file_size_in_mb < 1:
                msg += Fore.GREEN + " (< 1 MB)"
            else:
                msg += Fore.GREEN + f" ({round(file_size_in_mb, 2)} MBs)"
            cntx_manager = FileUploadProgressBar(msg=msg)
        else:
            cntx_manager = suppress()

        with cntx_manager as c:
            callback = c.update_to if (show_progress and not in_directory) else None
            if type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME:
                storage_client.file_client.upload_data(
                    data=data.read(),
                    overwrite=True,
                    validate_content=validate_content,
                    raw_response_hook=callback,
                    max_concurrency=MAX_CONCURRENCY,
                )
            elif type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME:
                storage_client.container_client.upload_blob(
                    name=dest,
                    data=data,
                    validate_content=validate_content,
                    overwrite=storage_client.overwrite,
                    raw_response_hook=callback,
                    max_concurrency=MAX_CONCURRENCY,
                    connection_timeout=DEFAULT_CONNECTION_TIMEOUT,
                )

    storage_client.uploaded_file_count += 1


def upload_directory(
    storage_client: Union["BlobStorageClient", "Gen2StorageClient"],
    source: Union[str, os.PathLike],
    dest: str,
    msg: str,
    show_progress: bool,
    ignore_file: IgnoreFile,
) -> None:
    """Upload directory to remote storage.

    :param storage_client: Storage client object
    :type storage_client: Union[
        azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient,
        azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient]
    :param source: Local path to project directory
    :type source: Union[str, os.PathLike]
    :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
    :type dest: str
    :param msg: Message to be shown with progress bar (e.g. "Uploading <source>")
    :type msg: str
    :param show_progress: Whether to show progress bar or not
    :type show_progress: bool
    :param ignore_file: The .amlignore or .gitignore file in the project directory
    :type ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile
    :return: None
    """
    source_path = Path(source).resolve()
    prefix = "" if dest == "" else dest + "/"
    prefix += os.path.basename(source_path) + "/"

    if (
        type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME
    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
        storage_client.sub_directory_client = storage_client.directory_client.create_sub_directory(
            prefix.strip("/").split("/")[-1]
        )

    # Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage
    upload_paths = get_upload_files_from_folder(
        source_path,
        prefix=prefix,
        ignore_file=ignore_file,
    )
    size_dict = {}
    total_size = 0

    # Get each file's size for progress bar tracking
    for path, _ in upload_paths:
        # TODO: symbol links are already resolved
        if os.path.islink(path):
            path_size = os.path.getsize(
                os.readlink(convert_windows_path_to_unix(path))
            )  # ensure we're counting the size of the linked file
        else:
            path_size = os.path.getsize(path)
        size_dict[path] = path_size
        total_size += path_size

    upload_paths = sorted(upload_paths)
    if len(upload_paths) == 0:
        raise EmptyDirectoryError(
            message=EMPTY_DIRECTORY_ERROR.format(source),
            no_personal_data_message=msg.format("[source]"),
            target=ErrorTarget.ARTIFACT,
            error_category=ErrorCategory.USER_ERROR,
        )
    storage_client.total_file_count = len(upload_paths)

    if (
        type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME
    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
        # Only for BlobStorageClient
        # Azure Blob doesn't allow metadata setting at the directory level, so the first
        # file in the directory is designated as the file where the confirmation metadata
        # will be added at the end of the upload.
        storage_client.indicator_file = upload_paths[0][1]
        storage_client.check_blob_exists()

    # Submit paths to workers for upload
    num_cores = int(cpu_count()) * PROCESSES_PER_CORE
    with ThreadPoolExecutor(max_workers=num_cores) as ex:
        futures_dict = {
            ex.submit(
                upload_file,
                storage_client=storage_client,
                source=src,
                dest=dest,
                size=size_dict.get(src),
                in_directory=True,
                show_progress=show_progress,
            ): (src, dest)
            for (src, dest) in upload_paths
        }
        if show_progress:
            warnings.simplefilter("ignore", category=TqdmWarning)
            msg += f" ({round(total_size/10**6, 2)} MBs)"
            is_windows = system() == "Windows"  # Default unicode progress bar doesn't display well on Windows
            with tqdm(total=total_size, desc=msg, ascii=is_windows) as pbar:
                for future in as_completed(futures_dict):
                    future.result()  # access result to propagate any exceptions
                    file_path_name = futures_dict[future][0]
                    pbar.update(size_dict.get(file_path_name) or 0)


@retry(
    exceptions=ResourceExistsError,
    failure_msg="Asset creation exceeded maximum retries.",
    logger=module_logger,
    max_attempts=MAX_AUTOINCREMENT_ATTEMPTS,
)
def _create_or_update_autoincrement(
    name: str,
    body: Any,
    version_operation: Any,
    container_operation: Any,
    resource_group_name: str,
    workspace_name: str,
    **kwargs,
) -> Any:
    try:
        container = container_operation.get(
            name=name,
            resource_group_name=resource_group_name,
            workspace_name=workspace_name,
            **kwargs,
        )
        version = container.properties.next_version

    except ResourceNotFoundError:
        version = "1"

    result = version_operation.create_or_update(
        name=name,
        version=version,
        resource_group_name=resource_group_name,
        workspace_name=workspace_name,
        body=body,
        **kwargs,
    )
    return result


def _get_next_version_from_container(
    name: str,
    container_operation: Any,
    resource_group_name: str,
    workspace_name: str,
    registry_name: str = None,
    **kwargs,
) -> str:
    try:
        container = (
            container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                **kwargs,
            )
            if registry_name
            else container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                **kwargs,
            )
        )
        version = container.properties.next_version

    except ResourceNotFoundError:
        version = "1"
    return version


def _get_next_latest_versions_from_container(
    name: str,
    container_operation: Any,
    resource_group_name: str,
    workspace_name: str,
    registry_name: str = None,
    **kwargs,
) -> str:
    try:
        container = (
            container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                **kwargs,
            )
            if registry_name
            else container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                **kwargs,
            )
        )
        next_version = container.properties.next_version
        latest_version = container.properties.latest_version

    except ResourceNotFoundError:
        next_version = "1"
        latest_version = "1"
    return next_version, latest_version


def _get_latest_version_from_container(
    asset_name: str,
    container_operation: Any,
    resource_group_name: str,
    workspace_name: Optional[str] = None,
    registry_name: Optional[str] = None,
    **kwargs,
) -> str:
    try:
        container = (
            container_operation.get(
                name=asset_name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                **kwargs,
            )
            if registry_name
            else container_operation.get(
                name=asset_name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                **kwargs,
            )
        )
        version = container.properties.latest_version

    except ResourceNotFoundError as e:
        message = (
            f"Asset {asset_name} does not exist in registry {registry_name}."
            if registry_name
            else f"Asset {asset_name} does not exist in workspace {workspace_name}."
        )
        no_personal_data_message = (
            "Asset {asset_name} does not exist in registry {registry_name}."
            if registry_name
            else "Asset {asset_name} does not exist in workspace {workspace_name}."
        )
        raise ValidationException(
            message=message,
            no_personal_data_message=no_personal_data_message,
            target=ErrorTarget.ASSET,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
        ) from e
    return version


def _get_latest(
    asset_name: str,
    version_operation: Any,
    resource_group_name: str,
    workspace_name: Optional[str] = None,
    registry_name: Optional[str] = None,
    order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC] = OrderString.CREATED_AT_DESC,
    **kwargs,
) -> Union[ModelVersionData, DataVersionBaseData]:
    """Retrieve the latest version of the asset with the given name.

    Latest is defined as the most recently created, not the most recently updated.

    :param asset_name: The asset name
    :type asset_name: str
    :param version_operation: Any
    :type version_operation: Any
    :param resource_group_name: The resource group name
    :type resource_group_name: str
    :param workspace_name: The workspace name
    :type workspace_name: Optional[str]
    :param registry_name: The registry name
    :type registry_name: Optional[str]
    :param order_by: Specifies how to order the results. Defaults to :attr:`OrderString.CREATED_AT_DESC`
    :type order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC]
    :return: The latest version of the requested asset
    :rtype: Union[ModelVersionData, DataVersionBaseData]
    """
    result = (
        version_operation.list(
            name=asset_name,
            resource_group_name=resource_group_name,
            registry_name=registry_name,
            order_by=order_by,
            top=1,
            **kwargs,
        )
        if registry_name
        else version_operation.list(
            name=asset_name,
            resource_group_name=resource_group_name,
            workspace_name=workspace_name,
            order_by=order_by,
            top=1,
            **kwargs,
        )
    )
    try:
        latest = result.next()
    except StopIteration:
        latest = None

    if latest and isinstance(latest, ModelVersionResourceArmPaginatedResult):
        # Data list return object doesn't require this since its elements are already DatasetVersionResources
        latest = cast(ModelVersionData, latest)
    if not latest:
        message = (
            f"Asset {asset_name} does not exist in registry {registry_name}."
            if registry_name
            else f"Asset {asset_name} does not exist in workspace {workspace_name}."
        )
        no_personal_data_message = (
            "Asset {asset_name} does not exist in registry {registry_name}."
            if registry_name
            else "Asset {asset_name} does not exist in workspace {workspace_name}."
        )
        raise ValidationException(
            message=message,
            no_personal_data_message=no_personal_data_message,
            target=ErrorTarget.ASSET,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
        )
    return latest


def _archive_or_restore(
    asset_operations: Union[
        "DataOperations",
        "EnvironmentOperations",
        "ModelOperations",
        "ComponentOperations",
    ],
    version_operation: Union[
        "DataVersionsOperations",
        "EnvironmentVersionsOperations",
        "ModelVersionsOperations",
        "ComponentVersionsOperations",
    ],
    container_operation: Union[
        "DataContainersOperations",
        "EnvironmentContainersOperations",
        "ModelContainersOperations",
        "ComponentContainersOperations",
    ],
    is_archived: bool,
    name: str,
    version: Optional[str] = None,
    label: Optional[str] = None,
) -> None:
    resource_group_name = asset_operations._operation_scope._resource_group_name
    workspace_name = asset_operations._workspace_name
    registry_name = asset_operations._registry_name
    if version and label:
        msg = "Cannot specify both version and label."
        raise ValidationException(
            message=msg,
            no_personal_data_message=msg,
            target=ErrorTarget.ASSET,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
        )
    if label:
        version = _resolve_label_to_asset(asset_operations, name, label).version

    if version:
        version_resource = (
            version_operation.get(
                name=name,
                version=version,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
            )
            if registry_name
            else version_operation.get(
                name=name,
                version=version,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
            )
        )
        version_resource.properties.is_archived = is_archived
        version_resource.properties.stage = None
        (  # pylint: disable=expression-not-assigned
            version_operation.begin_create_or_update(
                name=name,
                version=version,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                body=version_resource,
            )
            if registry_name
            else version_operation.create_or_update(
                name=name,
                version=version,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                body=version_resource,
            )
        )
    else:
        container_resource = (
            container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
            )
            if registry_name
            else container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
            )
        )
        container_resource.properties.is_archived = is_archived
        (  # pylint: disable=expression-not-assigned
            container_operation.create_or_update(
                name=name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                body=container_resource,
            )
            if registry_name
            else container_operation.create_or_update(
                name=name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                body=container_resource,
            )
        )


def _resolve_label_to_asset(
    assetOperations: Union[
        "DataOperations",
        "ComponentOperations",
        "EnvironmentOperations",
        "ModelOperations",
        "IndexOperations",
    ],
    name: str,
    label: str,
) -> Asset:
    """Returns the asset referred to by the given label.

    Throws if label does not refer to a version of the named asset

    :param assetOperations: The operations class used to retrieve the asset
    :type assetOperations:
        Union["DataOperations", "ComponentOperations", "EnvironmentOperations", "ModelOperations", "IndexOperations"]
    :param name: The name of the asset
    :type name: str
    :param label: The label to resolve
    :type label: str
    :return: The requested asset
    :rtype: Asset
    """

    resolver = assetOperations._managed_label_resolver.get(label, None)
    if not resolver:
        scope = "registry" if assetOperations._registry_name else "workspace"
        msg = "Asset {} with version label {} does not exist in {}."
        raise ValidationException(
            message=msg.format(name, label, scope),
            no_personal_data_message=msg.format("[name]", "[label]", "[scope]"),
            target=ErrorTarget.ASSET,
            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
        )
    return resolver(name)


def _check_or_modify_auto_delete_setting(
    autoDeleteSetting: Union[Dict, "AutoDeleteSetting"],
):
    if autoDeleteSetting is not None:
        if hasattr(autoDeleteSetting, "condition"):
            condition = getattr(autoDeleteSetting, "condition")
            condition = snake_to_camel(condition)
            setattr(autoDeleteSetting, "condition", condition)
        elif "condition" in autoDeleteSetting:
            autoDeleteSetting["condition"] = snake_to_camel(autoDeleteSetting["condition"])


def _validate_workspace_managed_datastore(path: Optional[Union[str, PathLike]]) -> Optional[Union[str, PathLike]]:
    # block cumtomer specified path on managed datastore
    if path.startswith(WORKSPACE_MANAGED_DATASTORE_WITH_SLASH) or path == WORKSPACE_MANAGED_DATASTORE:
        path = path.rstrip("/")

        if path != WORKSPACE_MANAGED_DATASTORE:
            raise AssetPathException(
                message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
                tartget=ErrorTarget.DATA,
                no_personal_data_message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
                error_category=ErrorCategory.USER_ERROR,
            )

        return path + "/paths"
    return path


def _validate_auto_delete_setting_in_data_output(
    auto_delete_setting: Optional[Union[Dict, "AutoDeleteSetting"]]
) -> None:
    # avoid specifying auto_delete_setting in job output now
    if auto_delete_setting:
        raise ValidationException(
            message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
            tartget=ErrorTarget.DATA,
            no_personal_data_message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
            error_category=ErrorCategory.USER_ERROR,
        )


class FileUploadProgressBar(tqdm):
    def __init__(self, msg: Optional[str] = None):
        warnings.simplefilter("ignore", category=TqdmWarning)
        is_windows = system() == "Windows"  # Default unicode progress bar doesn't display well on Windows
        super().__init__(unit="B", unit_scale=True, desc=msg, ascii=is_windows)

    def update_to(self, response):
        current = response.context["upload_stream_current"]
        self.total = response.context["data_stream_total"]
        if current:
            self.update(current - self.n)


class DirectoryUploadProgressBar(tqdm):
    def __init__(self, dir_size: int, msg: Optional[str] = None):
        super().__init__(unit="B", unit_scale=True, desc=msg, colour="green")
        self.total = dir_size
        self.completed = 0

    def update_to(self, response):
        current = None
        if response.context["upload_stream_current"]:
            current = response.context["upload_stream_current"] + self.completed
            self.completed = current
        if current:
            self.update(current - self.n)


def get_storage_info_for_non_registry_asset(
    service_client, workspace_name: str, name: str, version: str, resource_group: str
) -> Dict[str, str]:
    """Get SAS uri and blob uri for non-registry asset. Note that this function won't return the same
    SAS uri and blob uri for the same asset. It will return a new SAS uri and blob uri every time it is called.
    :param service_client: Service client
    :type service_client: AzureMachineLearningWorkspaces
    :param workspace_name: The workspace name
    :type workspace_name: str
    :param name: Asset name
    :type name: str
    :param version: Asset version
    :type version: str
    :param resource_group: Resource group
    :type resource_group: str
    :return: The sas_uri and blob_uri
    :rtype: Dict[str, str]
    """
    request_body = PendingUploadRequestDto(pending_upload_type="TemporaryBlobReference")
    response = service_client.code_versions.create_or_get_start_pending_upload(
        resource_group_name=resource_group,
        workspace_name=workspace_name,
        name=name,
        version=version,
        body=request_body,
    )

    sas_info = {
        "sas_uri": response.blob_reference_for_consumption.credential.sas_uri,
        "blob_uri": response.blob_reference_for_consumption.blob_uri,
    }

    return sas_info


def _get_existing_asset_name_and_version(existing_asset):
    import re

    regex = r"/codes/([^/]+)/versions/([^/]+)"

    arm_id = existing_asset.id
    match = re.search(regex, arm_id)
    name = match.group(1)
    version = match.group(2)

    return name, version