about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py1224
1 files changed, 1224 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py
new file mode 100644
index 00000000..7081d9d6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py
@@ -0,0 +1,1224 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access,too-many-lines
+
+import hashlib
+import logging
+import os
+import json
+import uuid
+import warnings
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from contextlib import suppress
+from multiprocessing import cpu_count
+from os import PathLike
+from pathlib import Path
+from platform import system
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Tuple,
+    Union,
+    cast,
+)
+
+from colorama import Fore
+from tqdm import TqdmWarning, tqdm
+from typing_extensions import Literal
+
+from azure.ai.ml._artifacts._constants import (
+    AML_IGNORE_FILE_NAME,
+    ARTIFACT_ORIGIN,
+    AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
+    BLOB_STORAGE_CLIENT_NAME,
+    CHUNK_SIZE,
+    DEFAULT_CONNECTION_TIMEOUT,
+    EMPTY_DIRECTORY_ERROR,
+    GEN2_STORAGE_CLIENT_NAME,
+    GIT_IGNORE_FILE_NAME,
+    INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
+    MAX_CONCURRENCY,
+    PROCESSES_PER_CORE,
+    UPLOAD_CONFIRMATION,
+    WORKSPACE_MANAGED_DATASTORE,
+    WORKSPACE_MANAGED_DATASTORE_WITH_SLASH,
+)
+from azure.ai.ml._restclient.v2022_02_01_preview.operations import (
+    ComponentContainersOperations,
+    ComponentVersionsOperations,
+    DataContainersOperations,
+    DataVersionsOperations,
+    EnvironmentContainersOperations,
+    EnvironmentVersionsOperations,
+    ModelContainersOperations,
+    ModelVersionsOperations,
+)
+from azure.ai.ml._restclient.v2022_05_01.models import (
+    DataVersionBaseData,
+    ModelVersionData,
+    ModelVersionResourceArmPaginatedResult,
+)
+from azure.ai.ml._restclient.v2023_04_01.models import PendingUploadRequestDto
+from azure.ai.ml._utils._pathspec import GitWildMatchPattern, normalize_file
+from azure.ai.ml._utils.utils import convert_windows_path_to_unix, retry, snake_to_camel
+from azure.ai.ml.constants._common import (
+    MAX_AUTOINCREMENT_ATTEMPTS,
+    DefaultOpenEncoding,
+    OrderString,
+)
+from azure.ai.ml.entities._assets.asset import Asset
+from azure.ai.ml.exceptions import (
+    AssetPathException,
+    EmptyDirectoryError,
+    ErrorCategory,
+    ErrorTarget,
+    ValidationErrorType,
+    ValidationException,
+)
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
+
+if TYPE_CHECKING:
+    from azure.ai.ml.operations import (
+        ComponentOperations,
+        DataOperations,
+        EnvironmentOperations,
+        IndexOperations,
+        ModelOperations,
+    )
+
+hash_type = type(hashlib.md5())  # nosec
+
+module_logger = logging.getLogger(__name__)
+
+
+class AssetNotChangedError(Exception):
+    pass
+
+
+class IgnoreFile(object):
+    def __init__(self, file_path: Optional[Union[str, os.PathLike]] = None):
+        """Base class for handling .gitignore and .amlignore files.
+
+        :param file_path: Relative path, or absolute path to the ignore file.
+        """
+        path = Path(file_path).resolve() if file_path else None
+        self._path = path
+        self._path_spec = None
+
+    def exists(self) -> bool:
+        """Checks if ignore file exists.
+        :return: True if file exists. False Otherwise
+        :rtype: bool
+        """
+        return self._file_exists()
+
+    def _file_exists(self) -> bool:
+        return self._path and self._path.exists()
+
+    @property
+    def base_path(self) -> Path:
+        return self._path.parent
+
+    def _get_ignore_list(self) -> List[str]:
+        """Get ignore list from ignore file contents.
+
+        :return: The lines of the ignore file
+        :rtype: List[str]
+        """
+        if not self.exists():
+            return []
+        if self._file_exists():
+            with open(self._path, "r", encoding=DefaultOpenEncoding.READ) as fh:
+                return [line.rstrip() for line in fh if line]
+        return []
+
+    def _create_pathspec(self) -> List[GitWildMatchPattern]:
+        """Creates path specification based on ignore list.
+
+        :return: Path specification
+        :rtype: List[GitWildMatchPattern]
+        """
+        return [GitWildMatchPattern(ignore) for ignore in self._get_ignore_list()]
+
+    def _get_rel_path(self, file_path: Union[str, os.PathLike]) -> Optional[str]:
+        """Get relative path of given file_path.
+
+        :param file_path: A file path
+        :type file_path: Union[str, os.PathLike]
+        :return: file_path relative to base_path, if computable. None otherwise
+        :rtype: Optional[str]
+        """
+        file_path = Path(file_path).absolute()
+        try:
+            # use os.path.relpath instead of Path.relative_to in case file_path is not a child of self.base_path
+            return os.path.relpath(file_path, self.base_path)
+        except ValueError:
+            # 2 paths are on different drives
+            return None
+
+    def is_file_excluded(self, file_path: Union[str, os.PathLike]) -> bool:
+        """Checks if given file_path is excluded.
+
+        :param file_path: File path to be checked against ignore file specifications
+        :type file_path: Union[str, os.PathLike]
+        :return: Whether the file is excluded by ignore file
+        :rtype: bool
+        """
+        # TODO: current design of ignore file can't distinguish between files and directories of the same name
+        if self._path_spec is None:
+            self._path_spec = self._create_pathspec()
+        if not self._path_spec:
+            return False
+        file_path = self._get_rel_path(file_path)
+        if file_path is None:
+            return True
+
+        norm_file = normalize_file(file_path)
+        matched = False
+        for pattern in self._path_spec:
+            if pattern.include is not None:
+                if pattern.match_file(norm_file) is not None:
+                    matched = pattern.include
+
+        return matched
+
+    @property
+    def path(self) -> Union[Path, str]:
+        return self._path
+
+
+class AmlIgnoreFile(IgnoreFile):
+    def __init__(self, directory_path: Union[Path, str]):
+        file_path = Path(directory_path).joinpath(AML_IGNORE_FILE_NAME)
+        super(AmlIgnoreFile, self).__init__(file_path)
+
+
+class GitIgnoreFile(IgnoreFile):
+    def __init__(self, directory_path: Union[Path, str]):
+        file_path = Path(directory_path).joinpath(GIT_IGNORE_FILE_NAME)
+        super(GitIgnoreFile, self).__init__(file_path)
+
+
+def get_ignore_file(directory_path: Union[Path, str]) -> IgnoreFile:
+    """Finds and returns IgnoreFile object based on ignore file found in directory_path.
+
+    .amlignore takes precedence over .gitignore and if no file is found, an empty
+    IgnoreFile object will be returned.
+
+    The ignore file must be in the root directory.
+
+    :param directory_path: Path to the (root) directory where ignore file is located
+    :type directory_path: Union[Path, str]
+    :return: The IgnoreFile found in the directory
+    :rtype: IgnoreFile
+    """
+    aml_ignore = AmlIgnoreFile(directory_path)
+    git_ignore = GitIgnoreFile(directory_path)
+
+    if aml_ignore.exists():
+        return aml_ignore
+    if git_ignore.exists():
+        return git_ignore
+    return IgnoreFile()
+
+
+def _validate_path(path: Union[str, os.PathLike], _type: str) -> None:
+    path = Path(path)  # Okay to do this since Path is idempotent
+    if not path.is_file() and not path.is_dir():
+        raise ValidationException(
+            message=f"No such file or directory: {path}",
+            target=_type,
+            error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+            no_personal_data_message="No such file or directory",
+            error_category=ErrorCategory.USER_ERROR,
+        )
+
+
+def _parse_name_version(
+    name: Optional[str] = None, version_as_int: bool = True
+) -> Tuple[Optional[str], Optional[Union[str, int]]]:
+    if not name:
+        return None, None
+
+    token_list = name.split(":")
+    if len(token_list) == 1:
+        return name, None
+    *name, version = token_list
+    if version_as_int:
+        version = int(version)
+    return ":".join(name), version
+
+
+def _get_file_hash(filename: Union[str, os.PathLike], _hash: hash_type) -> hash_type:
+    with open(str(filename), "rb") as f:
+        for chunk in iter(lambda: f.read(CHUNK_SIZE), b""):
+            _hash.update(chunk)
+    return _hash
+
+
+def delete_two_catalog_files(path):
+    """
+    Function that deletes the "catalog.json" and "catalog.json.sig" files located at 'path', if they exist
+
+    :param path: Path to the folder for signing
+    :type path: Union[Path, str]
+    :return: None
+    """
+    # catalog.json
+    file_path_json = os.path.join(path, "catalog.json")
+    if os.path.exists(file_path_json):
+        module_logger.warning("%s already exists. Deleting it", file_path_json)
+        os.remove(file_path_json)
+    # catalog.json.sig
+    file_path_json_sig = os.path.join(path, "catalog.json.sig")
+    if os.path.exists(file_path_json_sig):
+        module_logger.warning("%s already exists. Deleting it", file_path_json_sig)
+        os.remove(file_path_json_sig)
+
+
+def create_catalog_files(path, json_stub):
+    with open(os.path.join(path, "catalog.json"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile1:
+        json.dump(json_stub, jsonFile1)
+    with open(os.path.join(path, "catalog.json.sig"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile2:
+        json.dump(json_stub, jsonFile2)
+
+
+def _get_dir_hash(directory: Union[str, os.PathLike], _hash: hash_type, ignore_file: IgnoreFile) -> hash_type:
+    dir_contents = Path(directory).iterdir()
+    sorted_contents = sorted(dir_contents, key=lambda path: str(path).lower())
+    for path in sorted_contents:
+        if ignore_file.is_file_excluded(path):
+            continue
+        _hash.update(path.name.encode())
+        if os.path.islink(path):  # ensure we're hashing the contents of the linked file
+            path = _resolve_path(path)
+        if path.is_file():
+            _hash = _get_file_hash(path, _hash)
+        elif path.is_dir():
+            _hash = _get_dir_hash(path, _hash, ignore_file)
+    return _hash
+
+
+def _build_metadata_dict(name: str, version: str) -> Dict[str, str]:
+    """Build metadata dictionary to attach to uploaded data.
+
+    Metadata includes an upload confirmation field, and for code uploads only, the name and version of the code asset
+    being created for that data.
+
+    :param name: The name of the uploaded data
+    :type name: str
+    :param version: The version of the uploaded data
+    :type version: str
+    :return: Metadata dict
+    :rtype: Dict[str, str]
+    """
+    if name:
+        linked_asset_arm_id = {"name": name, "version": version}
+    else:
+        msg = "'name' cannot be NoneType for asset artifact upload."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.ASSET,
+            error_category=ErrorCategory.USER_ERROR,
+            error_type=ValidationErrorType.INVALID_VALUE,
+        )
+
+    metadata_dict = {**UPLOAD_CONFIRMATION, **linked_asset_arm_id}
+    return metadata_dict
+
+
+def get_object_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> str:
+    _hash = hashlib.md5(b"Initialize for october 2021 AML CLI version")  # nosec
+    if Path(path).is_dir():
+        object_hash = _get_dir_hash(directory=path, _hash=_hash, ignore_file=ignore_file)
+    else:
+        if os.path.islink(path):  # ensure we're hashing the contents of the linked file
+            path = _resolve_path(Path(path))
+        object_hash = _get_file_hash(filename=path, _hash=_hash)
+    return str(object_hash.hexdigest())
+
+
+def get_content_hash_version():
+    return 202208
+
+
+def get_content_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> Optional[str]:
+    """Generating sha256 hash for file/folder,
+
+    e.g. Code snapshot fingerprints to prevent tampering.
+
+    The process of hashing is:
+    1. If it's a link, get the actual path of the link.
+    2. If it's a file, append file content.
+    3. If it's a folder:
+        1. list all files under the folder
+        2. convert file count to str and append to hash
+        3. sort the files by lower case of relative path
+        4. for each file append '#'+relative path+'#' and file size to hash
+        5. do another iteration on file list to append each files content to hash.
+        The example of absolute path to relative path mapping is:
+        [
+            ('/mnt/c/codehash/code/file1.txt', 'file1.txt'),
+            ('/mnt/c/codehash/code/folder1/file1.txt', 'folder1/file1.txt'),
+            ('/mnt/c/codehash/code/Folder2/file1.txt', 'Folder2/file1.txt'),
+            ('/mnt/c/codehash/code/Folder2/folder1/file1.txt', 'Folder2/folder1/file1.txt')
+        ]
+    4. Hash the content and convert to hex digest string.
+
+    :param path: The directory to calculate the size of
+    :type path: Union[str, os.PathLike]
+    :param ignore_file: An ignore file that specifies files to ignore when computing the size
+    :type ignore_file: IgnoreFile
+    :return: The content hash if the content is a link, directory, or file. None otherwise
+    :rtype: Optional[str]
+    """
+    # DO NOT change this function unless you change the verification logic together
+    actual_path = path
+    if os.path.islink(path):
+        actual_path = _resolve_path(Path(path)).as_posix()
+    if os.path.isdir(actual_path):
+        return _get_file_list_content_hash(get_upload_files_from_folder(actual_path, ignore_file=ignore_file))
+    if os.path.isfile(actual_path):
+        return _get_file_list_content_hash([(actual_path, Path(actual_path).name)])
+    return None
+
+
+def get_upload_files_from_folder(
+    path: Union[str, os.PathLike],
+    *,
+    prefix: str = "",
+    ignore_file: IgnoreFile = IgnoreFile(),
+) -> List[str]:
+    path = Path(path)
+    upload_paths = []
+    for root, _, files in os.walk(path, followlinks=True):
+        upload_paths += list(
+            traverse_directory(
+                root,
+                files,
+                prefix=Path(prefix).joinpath(Path(root).relative_to(path)).as_posix(),
+                ignore_file=ignore_file,
+            )
+        )
+    return upload_paths
+
+
+def _get_file_list_content_hash(file_list) -> str:
+    # file_list is a list of tuples, (absolute_path, relative_path)
+
+    _hash = hashlib.sha256()
+    # Add file count to the hash and add '#' around file name then add each file's size to avoid collision like:
+    # Case 1:
+    # 'a.txt' with contents 'a'
+    # 'b.txt' with contents 'b'
+    #
+    # Case 2:
+    # cspell:disable-next-line
+    # 'a.txt' with contents 'ab.txtb'
+    _hash.update(str(len(file_list)).encode())
+    # Sort by "destination" path, since in this function destination prefix is empty and keep the link name in path.
+    for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
+        _hash.update(("#" + file_name + "#").encode())
+        _hash.update(str(os.path.getsize(file_path)).encode())
+    for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
+        _hash = _get_file_hash(file_path, _hash)
+    return str(_hash.hexdigest())
+
+
+def traverse_directory(  # pylint: disable=unused-argument
+    root: str,
+    files: List[str],
+    *,
+    prefix: str,
+    ignore_file: IgnoreFile = IgnoreFile(),
+    # keep this for backward compatibility
+    **kwargs: Any,
+) -> Iterable[Tuple[str, Union[str, Any]]]:
+    """Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage.
+    e.g.
+
+    [/mnt/c/Users/dipeck/upload_files/my_file1.txt,
+    /mnt/c/Users/dipeck/upload_files/my_file2.txt] -->
+
+        [(/mnt/c/Users/dipeck/upload_files/my_file1.txt, LocalUpload/<guid>/upload_files/my_file1.txt),
+        (/mnt/c/Users/dipeck/upload_files/my_file2.txt, LocalUpload/<guid>/upload_files/my_file2.txt))]
+
+    :param root: Root directory path
+    :type root: str
+    :param files: List of all file paths in the directory
+    :type files: List[str]
+    :keyword prefix: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
+    :paramtype prefix: str
+    :keyword ignore_file: The .amlignore or .gitignore file in the project directory
+    :paramtype ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile
+    :return: Zipped list of tuples representing the local path and remote destination path for each file
+    :rtype: Iterable[Tuple[str, Union[str, Any]]]
+    """
+    # Normalize Windows paths. Note that path should be resolved first as long part will be converted to a shortcut in
+    # Windows. For example, C:\Users\too-long-user-name\test will be converted to C:\Users\too-lo~1\test by default.
+    # Refer to https://en.wikipedia.org/wiki/8.3_filename for more details.
+    root = Path(root).resolve().absolute()
+
+    # filter out files excluded by the ignore file
+    # TODO: inner ignore file won't take effect. A merged IgnoreFile need to be generated in code resolution.
+    origin_file_paths = [
+        root.joinpath(filename)
+        for filename in files
+        if not ignore_file.is_file_excluded(root.joinpath(filename).as_posix())
+    ]
+
+    result = []
+    for origin_file_path in origin_file_paths:
+        relative_path = origin_file_path.relative_to(root)
+        result.append(
+            (
+                _resolve_path(origin_file_path).as_posix(),
+                Path(prefix).joinpath(relative_path).as_posix(),
+            )
+        )
+    return result
+
+
+def _resolve_path(path: Path) -> Path:
+    if not path.is_symlink():
+        return path
+
+    link_path = path.resolve()
+    if not link_path.is_absolute():
+        link_path = path.parent.joinpath(link_path).resolve()
+    return _resolve_path(link_path)
+
+
+def generate_asset_id(asset_hash: str, include_directory=True) -> str:
+    asset_id = asset_hash or str(uuid.uuid4())
+    if include_directory:
+        asset_id = "/".join((ARTIFACT_ORIGIN, asset_id))
+    return asset_id
+
+
+def get_directory_size(
+    root: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile(None)
+) -> Tuple[int, Dict[str, int]]:
+    """Returns total size of a directory and a dictionary itemizing each sub- path and its size.
+
+    If an optional ignore_file argument is provided, then files specified in the ignore file are not included in the
+    directory size calculation.
+
+    :param root: The directory to calculate the size of
+    :type root: Union[str, os.PathLike]
+    :param ignore_file: An ignore file that specifies files to ignore when computing the size
+    :type ignore_file: IgnoreFile
+    :return: The computed size of the directory, and the sizes of the child paths
+    :rtype: Tuple[int, Dict[str, int]]
+    """
+    total_size = 0
+    size_list = {}
+    for dirpath, _, filenames in os.walk(root, followlinks=True):
+        for name in filenames:
+            full_path = os.path.join(dirpath, name)
+            # Don't count files that are excluded by an ignore file
+            if ignore_file.is_file_excluded(full_path):
+                continue
+            if not os.path.islink(full_path):
+                path_size = os.path.getsize(full_path)
+            else:
+                # ensure we're counting the size of the linked file
+                # os.readlink returns a file path relative to dirpath, and must be
+                # re-joined to get a workable result
+                path_size = os.path.getsize(os.path.join(dirpath, os.readlink(convert_windows_path_to_unix(full_path))))
+            size_list[full_path] = path_size
+            total_size += path_size
+    return total_size, size_list
+
+
+def upload_file(
+    storage_client: Union["BlobStorageClient", "Gen2StorageClient"],
+    source: str,
+    dest: Optional[str] = None,
+    msg: Optional[str] = None,
+    size: int = 0,
+    show_progress: Optional[bool] = None,
+    in_directory: bool = False,
+    callback: Optional[Any] = None,
+) -> None:
+    """Upload a single file to remote storage.
+
+    :param storage_client: Storage client object
+    :type storage_client: Union[
+        azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient,
+        azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient]
+    :param source: Local path to project directory
+    :type source: str
+    :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
+    :type dest: str
+    :param msg: Message to be shown with progress bar (e.g. "Uploading <source>")
+    :type msg: str
+    :param size: Size of the file in bytes
+    :type size: int
+    :param show_progress: Whether to show progress bar or not
+    :type show_progress: bool
+    :param in_directory: Whether the file is part of a directory of files
+    :type in_directory: bool
+    :param callback: Callback to progress bar
+    :type callback: Any
+    :return: None
+    """
+    validate_content = size > 0  # don't do checksum for empty files
+
+    if (
+        type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME
+    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
+        if in_directory:
+            storage_client.temp_sub_directory_client = None
+            file_name_tail = dest.split(os.path.sep)[-1]
+            # Indexing from 2 because the first two parts of the remote path will always be LocalUpload/<asset_id>
+            all_sub_folders = dest.split(os.path.sep)[2:-1]
+
+            # Create remote directories for each nested directory if file is in a nested directory
+            for sub_folder in all_sub_folders:
+                if storage_client.temp_sub_directory_client:
+                    storage_client.temp_sub_directory_client = (
+                        storage_client.temp_sub_directory_client.create_sub_directory(sub_folder)
+                    )
+                else:
+                    storage_client.temp_sub_directory_client = storage_client.directory_client.create_sub_directory(
+                        sub_folder
+                    )
+
+            storage_client.file_client = storage_client.temp_sub_directory_client.create_file(file_name_tail)
+        else:
+            storage_client.file_client = storage_client.directory_client.create_file(source.split("/")[-1])
+
+    with open(source, "rb") as data:
+        if show_progress and not in_directory:
+            file_size, _ = get_directory_size(source)
+            file_size_in_mb = file_size / 10**6
+            if file_size_in_mb < 1:
+                msg += Fore.GREEN + " (< 1 MB)"
+            else:
+                msg += Fore.GREEN + f" ({round(file_size_in_mb, 2)} MBs)"
+            cntx_manager = FileUploadProgressBar(msg=msg)
+        else:
+            cntx_manager = suppress()
+
+        with cntx_manager as c:
+            callback = c.update_to if (show_progress and not in_directory) else None
+            if type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME:
+                storage_client.file_client.upload_data(
+                    data=data.read(),
+                    overwrite=True,
+                    validate_content=validate_content,
+                    raw_response_hook=callback,
+                    max_concurrency=MAX_CONCURRENCY,
+                )
+            elif type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME:
+                storage_client.container_client.upload_blob(
+                    name=dest,
+                    data=data,
+                    validate_content=validate_content,
+                    overwrite=storage_client.overwrite,
+                    raw_response_hook=callback,
+                    max_concurrency=MAX_CONCURRENCY,
+                    connection_timeout=DEFAULT_CONNECTION_TIMEOUT,
+                )
+
+    storage_client.uploaded_file_count += 1
+
+
+def upload_directory(
+    storage_client: Union["BlobStorageClient", "Gen2StorageClient"],
+    source: Union[str, os.PathLike],
+    dest: str,
+    msg: str,
+    show_progress: bool,
+    ignore_file: IgnoreFile,
+) -> None:
+    """Upload directory to remote storage.
+
+    :param storage_client: Storage client object
+    :type storage_client: Union[
+        azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient,
+        azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient]
+    :param source: Local path to project directory
+    :type source: Union[str, os.PathLike]
+    :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
+    :type dest: str
+    :param msg: Message to be shown with progress bar (e.g. "Uploading <source>")
+    :type msg: str
+    :param show_progress: Whether to show progress bar or not
+    :type show_progress: bool
+    :param ignore_file: The .amlignore or .gitignore file in the project directory
+    :type ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile
+    :return: None
+    """
+    source_path = Path(source).resolve()
+    prefix = "" if dest == "" else dest + "/"
+    prefix += os.path.basename(source_path) + "/"
+
+    if (
+        type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME
+    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
+        storage_client.sub_directory_client = storage_client.directory_client.create_sub_directory(
+            prefix.strip("/").split("/")[-1]
+        )
+
+    # Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage
+    upload_paths = get_upload_files_from_folder(
+        source_path,
+        prefix=prefix,
+        ignore_file=ignore_file,
+    )
+    size_dict = {}
+    total_size = 0
+
+    # Get each file's size for progress bar tracking
+    for path, _ in upload_paths:
+        # TODO: symbol links are already resolved
+        if os.path.islink(path):
+            path_size = os.path.getsize(
+                os.readlink(convert_windows_path_to_unix(path))
+            )  # ensure we're counting the size of the linked file
+        else:
+            path_size = os.path.getsize(path)
+        size_dict[path] = path_size
+        total_size += path_size
+
+    upload_paths = sorted(upload_paths)
+    if len(upload_paths) == 0:
+        raise EmptyDirectoryError(
+            message=EMPTY_DIRECTORY_ERROR.format(source),
+            no_personal_data_message=msg.format("[source]"),
+            target=ErrorTarget.ARTIFACT,
+            error_category=ErrorCategory.USER_ERROR,
+        )
+    storage_client.total_file_count = len(upload_paths)
+
+    if (
+        type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME
+    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
+        # Only for BlobStorageClient
+        # Azure Blob doesn't allow metadata setting at the directory level, so the first
+        # file in the directory is designated as the file where the confirmation metadata
+        # will be added at the end of the upload.
+        storage_client.indicator_file = upload_paths[0][1]
+        storage_client.check_blob_exists()
+
+    # Submit paths to workers for upload
+    num_cores = int(cpu_count()) * PROCESSES_PER_CORE
+    with ThreadPoolExecutor(max_workers=num_cores) as ex:
+        futures_dict = {
+            ex.submit(
+                upload_file,
+                storage_client=storage_client,
+                source=src,
+                dest=dest,
+                size=size_dict.get(src),
+                in_directory=True,
+                show_progress=show_progress,
+            ): (src, dest)
+            for (src, dest) in upload_paths
+        }
+        if show_progress:
+            warnings.simplefilter("ignore", category=TqdmWarning)
+            msg += f" ({round(total_size/10**6, 2)} MBs)"
+            is_windows = system() == "Windows"  # Default unicode progress bar doesn't display well on Windows
+            with tqdm(total=total_size, desc=msg, ascii=is_windows) as pbar:
+                for future in as_completed(futures_dict):
+                    future.result()  # access result to propagate any exceptions
+                    file_path_name = futures_dict[future][0]
+                    pbar.update(size_dict.get(file_path_name) or 0)
+
+
+@retry(
+    exceptions=ResourceExistsError,
+    failure_msg="Asset creation exceeded maximum retries.",
+    logger=module_logger,
+    max_attempts=MAX_AUTOINCREMENT_ATTEMPTS,
+)
+def _create_or_update_autoincrement(
+    name: str,
+    body: Any,
+    version_operation: Any,
+    container_operation: Any,
+    resource_group_name: str,
+    workspace_name: str,
+    **kwargs,
+) -> Any:
+    try:
+        container = container_operation.get(
+            name=name,
+            resource_group_name=resource_group_name,
+            workspace_name=workspace_name,
+            **kwargs,
+        )
+        version = container.properties.next_version
+
+    except ResourceNotFoundError:
+        version = "1"
+
+    result = version_operation.create_or_update(
+        name=name,
+        version=version,
+        resource_group_name=resource_group_name,
+        workspace_name=workspace_name,
+        body=body,
+        **kwargs,
+    )
+    return result
+
+
+def _get_next_version_from_container(
+    name: str,
+    container_operation: Any,
+    resource_group_name: str,
+    workspace_name: str,
+    registry_name: str = None,
+    **kwargs,
+) -> str:
+    try:
+        container = (
+            container_operation.get(
+                name=name,
+                resource_group_name=resource_group_name,
+                registry_name=registry_name,
+                **kwargs,
+            )
+            if registry_name
+            else container_operation.get(
+                name=name,
+                resource_group_name=resource_group_name,
+                workspace_name=workspace_name,
+                **kwargs,
+            )
+        )
+        version = container.properties.next_version
+
+    except ResourceNotFoundError:
+        version = "1"
+    return version
+
+
+def _get_next_latest_versions_from_container(
+    name: str,
+    container_operation: Any,
+    resource_group_name: str,
+    workspace_name: str,
+    registry_name: str = None,
+    **kwargs,
+) -> str:
+    try:
+        container = (
+            container_operation.get(
+                name=name,
+                resource_group_name=resource_group_name,
+                registry_name=registry_name,
+                **kwargs,
+            )
+            if registry_name
+            else container_operation.get(
+                name=name,
+                resource_group_name=resource_group_name,
+                workspace_name=workspace_name,
+                **kwargs,
+            )
+        )
+        next_version = container.properties.next_version
+        latest_version = container.properties.latest_version
+
+    except ResourceNotFoundError:
+        next_version = "1"
+        latest_version = "1"
+    return next_version, latest_version
+
+
+def _get_latest_version_from_container(
+    asset_name: str,
+    container_operation: Any,
+    resource_group_name: str,
+    workspace_name: Optional[str] = None,
+    registry_name: Optional[str] = None,
+    **kwargs,
+) -> str:
+    try:
+        container = (
+            container_operation.get(
+                name=asset_name,
+                resource_group_name=resource_group_name,
+                registry_name=registry_name,
+                **kwargs,
+            )
+            if registry_name
+            else container_operation.get(
+                name=asset_name,
+                resource_group_name=resource_group_name,
+                workspace_name=workspace_name,
+                **kwargs,
+            )
+        )
+        version = container.properties.latest_version
+
+    except ResourceNotFoundError as e:
+        message = (
+            f"Asset {asset_name} does not exist in registry {registry_name}."
+            if registry_name
+            else f"Asset {asset_name} does not exist in workspace {workspace_name}."
+        )
+        no_personal_data_message = (
+            "Asset {asset_name} does not exist in registry {registry_name}."
+            if registry_name
+            else "Asset {asset_name} does not exist in workspace {workspace_name}."
+        )
+        raise ValidationException(
+            message=message,
+            no_personal_data_message=no_personal_data_message,
+            target=ErrorTarget.ASSET,
+            error_category=ErrorCategory.USER_ERROR,
+            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+        ) from e
+    return version
+
+
+def _get_latest(
+    asset_name: str,
+    version_operation: Any,
+    resource_group_name: str,
+    workspace_name: Optional[str] = None,
+    registry_name: Optional[str] = None,
+    order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC] = OrderString.CREATED_AT_DESC,
+    **kwargs,
+) -> Union[ModelVersionData, DataVersionBaseData]:
+    """Retrieve the latest version of the asset with the given name.
+
+    Latest is defined as the most recently created, not the most recently updated.
+
+    :param asset_name: The asset name
+    :type asset_name: str
+    :param version_operation: Any
+    :type version_operation: Any
+    :param resource_group_name: The resource group name
+    :type resource_group_name: str
+    :param workspace_name: The workspace name
+    :type workspace_name: Optional[str]
+    :param registry_name: The registry name
+    :type registry_name: Optional[str]
+    :param order_by: Specifies how to order the results. Defaults to :attr:`OrderString.CREATED_AT_DESC`
+    :type order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC]
+    :return: The latest version of the requested asset
+    :rtype: Union[ModelVersionData, DataVersionBaseData]
+    """
+    result = (
+        version_operation.list(
+            name=asset_name,
+            resource_group_name=resource_group_name,
+            registry_name=registry_name,
+            order_by=order_by,
+            top=1,
+            **kwargs,
+        )
+        if registry_name
+        else version_operation.list(
+            name=asset_name,
+            resource_group_name=resource_group_name,
+            workspace_name=workspace_name,
+            order_by=order_by,
+            top=1,
+            **kwargs,
+        )
+    )
+    try:
+        latest = result.next()
+    except StopIteration:
+        latest = None
+
+    if latest and isinstance(latest, ModelVersionResourceArmPaginatedResult):
+        # Data list return object doesn't require this since its elements are already DatasetVersionResources
+        latest = cast(ModelVersionData, latest)
+    if not latest:
+        message = (
+            f"Asset {asset_name} does not exist in registry {registry_name}."
+            if registry_name
+            else f"Asset {asset_name} does not exist in workspace {workspace_name}."
+        )
+        no_personal_data_message = (
+            "Asset {asset_name} does not exist in registry {registry_name}."
+            if registry_name
+            else "Asset {asset_name} does not exist in workspace {workspace_name}."
+        )
+        raise ValidationException(
+            message=message,
+            no_personal_data_message=no_personal_data_message,
+            target=ErrorTarget.ASSET,
+            error_category=ErrorCategory.USER_ERROR,
+            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+        )
+    return latest
+
+
+def _archive_or_restore(
+    asset_operations: Union[
+        "DataOperations",
+        "EnvironmentOperations",
+        "ModelOperations",
+        "ComponentOperations",
+    ],
+    version_operation: Union[
+        "DataVersionsOperations",
+        "EnvironmentVersionsOperations",
+        "ModelVersionsOperations",
+        "ComponentVersionsOperations",
+    ],
+    container_operation: Union[
+        "DataContainersOperations",
+        "EnvironmentContainersOperations",
+        "ModelContainersOperations",
+        "ComponentContainersOperations",
+    ],
+    is_archived: bool,
+    name: str,
+    version: Optional[str] = None,
+    label: Optional[str] = None,
+) -> None:
+    resource_group_name = asset_operations._operation_scope._resource_group_name
+    workspace_name = asset_operations._workspace_name
+    registry_name = asset_operations._registry_name
+    if version and label:
+        msg = "Cannot specify both version and label."
+        raise ValidationException(
+            message=msg,
+            no_personal_data_message=msg,
+            target=ErrorTarget.ASSET,
+            error_category=ErrorCategory.USER_ERROR,
+            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+        )
+    if label:
+        version = _resolve_label_to_asset(asset_operations, name, label).version
+
+    if version:
+        version_resource = (
+            version_operation.get(
+                name=name,
+                version=version,
+                resource_group_name=resource_group_name,
+                registry_name=registry_name,
+            )
+            if registry_name
+            else version_operation.get(
+                name=name,
+                version=version,
+                resource_group_name=resource_group_name,
+                workspace_name=workspace_name,
+            )
+        )
+        version_resource.properties.is_archived = is_archived
+        version_resource.properties.stage = None
+        (  # pylint: disable=expression-not-assigned
+            version_operation.begin_create_or_update(
+                name=name,
+                version=version,
+                resource_group_name=resource_group_name,
+                registry_name=registry_name,
+                body=version_resource,
+            )
+            if registry_name
+            else version_operation.create_or_update(
+                name=name,
+                version=version,
+                resource_group_name=resource_group_name,
+                workspace_name=workspace_name,
+                body=version_resource,
+            )
+        )
+    else:
+        container_resource = (
+            container_operation.get(
+                name=name,
+                resource_group_name=resource_group_name,
+                registry_name=registry_name,
+            )
+            if registry_name
+            else container_operation.get(
+                name=name,
+                resource_group_name=resource_group_name,
+                workspace_name=workspace_name,
+            )
+        )
+        container_resource.properties.is_archived = is_archived
+        (  # pylint: disable=expression-not-assigned
+            container_operation.create_or_update(
+                name=name,
+                resource_group_name=resource_group_name,
+                registry_name=registry_name,
+                body=container_resource,
+            )
+            if registry_name
+            else container_operation.create_or_update(
+                name=name,
+                resource_group_name=resource_group_name,
+                workspace_name=workspace_name,
+                body=container_resource,
+            )
+        )
+
+
+def _resolve_label_to_asset(
+    assetOperations: Union[
+        "DataOperations",
+        "ComponentOperations",
+        "EnvironmentOperations",
+        "ModelOperations",
+        "IndexOperations",
+    ],
+    name: str,
+    label: str,
+) -> Asset:
+    """Returns the asset referred to by the given label.
+
+    Throws if label does not refer to a version of the named asset
+
+    :param assetOperations: The operations class used to retrieve the asset
+    :type assetOperations:
+        Union["DataOperations", "ComponentOperations", "EnvironmentOperations", "ModelOperations", "IndexOperations"]
+    :param name: The name of the asset
+    :type name: str
+    :param label: The label to resolve
+    :type label: str
+    :return: The requested asset
+    :rtype: Asset
+    """
+
+    resolver = assetOperations._managed_label_resolver.get(label, None)
+    if not resolver:
+        scope = "registry" if assetOperations._registry_name else "workspace"
+        msg = "Asset {} with version label {} does not exist in {}."
+        raise ValidationException(
+            message=msg.format(name, label, scope),
+            no_personal_data_message=msg.format("[name]", "[label]", "[scope]"),
+            target=ErrorTarget.ASSET,
+            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+        )
+    return resolver(name)
+
+
+def _check_or_modify_auto_delete_setting(
+    autoDeleteSetting: Union[Dict, "AutoDeleteSetting"],
+):
+    if autoDeleteSetting is not None:
+        if hasattr(autoDeleteSetting, "condition"):
+            condition = getattr(autoDeleteSetting, "condition")
+            condition = snake_to_camel(condition)
+            setattr(autoDeleteSetting, "condition", condition)
+        elif "condition" in autoDeleteSetting:
+            autoDeleteSetting["condition"] = snake_to_camel(autoDeleteSetting["condition"])
+
+
+def _validate_workspace_managed_datastore(path: Optional[Union[str, PathLike]]) -> Optional[Union[str, PathLike]]:
+    # block cumtomer specified path on managed datastore
+    if path.startswith(WORKSPACE_MANAGED_DATASTORE_WITH_SLASH) or path == WORKSPACE_MANAGED_DATASTORE:
+        path = path.rstrip("/")
+
+        if path != WORKSPACE_MANAGED_DATASTORE:
+            raise AssetPathException(
+                message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
+                tartget=ErrorTarget.DATA,
+                no_personal_data_message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
+                error_category=ErrorCategory.USER_ERROR,
+            )
+
+        return path + "/paths"
+    return path
+
+
+def _validate_auto_delete_setting_in_data_output(
+    auto_delete_setting: Optional[Union[Dict, "AutoDeleteSetting"]]
+) -> None:
+    # avoid specifying auto_delete_setting in job output now
+    if auto_delete_setting:
+        raise ValidationException(
+            message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
+            tartget=ErrorTarget.DATA,
+            no_personal_data_message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
+            error_category=ErrorCategory.USER_ERROR,
+        )
+
+
+class FileUploadProgressBar(tqdm):
+    def __init__(self, msg: Optional[str] = None):
+        warnings.simplefilter("ignore", category=TqdmWarning)
+        is_windows = system() == "Windows"  # Default unicode progress bar doesn't display well on Windows
+        super().__init__(unit="B", unit_scale=True, desc=msg, ascii=is_windows)
+
+    def update_to(self, response):
+        current = response.context["upload_stream_current"]
+        self.total = response.context["data_stream_total"]
+        if current:
+            self.update(current - self.n)
+
+
+class DirectoryUploadProgressBar(tqdm):
+    def __init__(self, dir_size: int, msg: Optional[str] = None):
+        super().__init__(unit="B", unit_scale=True, desc=msg, colour="green")
+        self.total = dir_size
+        self.completed = 0
+
+    def update_to(self, response):
+        current = None
+        if response.context["upload_stream_current"]:
+            current = response.context["upload_stream_current"] + self.completed
+            self.completed = current
+        if current:
+            self.update(current - self.n)
+
+
+def get_storage_info_for_non_registry_asset(
+    service_client, workspace_name: str, name: str, version: str, resource_group: str
+) -> Dict[str, str]:
+    """Get SAS uri and blob uri for non-registry asset. Note that this function won't return the same
+    SAS uri and blob uri for the same asset. It will return a new SAS uri and blob uri every time it is called.
+    :param service_client: Service client
+    :type service_client: AzureMachineLearningWorkspaces
+    :param workspace_name: The workspace name
+    :type workspace_name: str
+    :param name: Asset name
+    :type name: str
+    :param version: Asset version
+    :type version: str
+    :param resource_group: Resource group
+    :type resource_group: str
+    :return: The sas_uri and blob_uri
+    :rtype: Dict[str, str]
+    """
+    request_body = PendingUploadRequestDto(pending_upload_type="TemporaryBlobReference")
+    response = service_client.code_versions.create_or_get_start_pending_upload(
+        resource_group_name=resource_group,
+        workspace_name=workspace_name,
+        name=name,
+        version=version,
+        body=request_body,
+    )
+
+    sas_info = {
+        "sas_uri": response.blob_reference_for_consumption.credential.sas_uri,
+        "blob_uri": response.blob_reference_for_consumption.blob_uri,
+    }
+
+    return sas_info
+
+
+def _get_existing_asset_name_and_version(existing_asset):
+    import re
+
+    regex = r"/codes/([^/]+)/versions/([^/]+)"
+
+    arm_id = existing_asset.id
+    match = re.search(regex, arm_id)
+    name = match.group(1)
+    version = match.group(2)
+
+    return name, version