diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py | 1224 |
1 files changed, 1224 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py new file mode 100644 index 00000000..7081d9d6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py @@ -0,0 +1,1224 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# pylint: disable=protected-access,too-many-lines + +import hashlib +import logging +import os +import json +import uuid +import warnings +from concurrent.futures import ThreadPoolExecutor, as_completed +from contextlib import suppress +from multiprocessing import cpu_count +from os import PathLike +from pathlib import Path +from platform import system +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, + cast, +) + +from colorama import Fore +from tqdm import TqdmWarning, tqdm +from typing_extensions import Literal + +from azure.ai.ml._artifacts._constants import ( + AML_IGNORE_FILE_NAME, + ARTIFACT_ORIGIN, + AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA, + BLOB_STORAGE_CLIENT_NAME, + CHUNK_SIZE, + DEFAULT_CONNECTION_TIMEOUT, + EMPTY_DIRECTORY_ERROR, + GEN2_STORAGE_CLIENT_NAME, + GIT_IGNORE_FILE_NAME, + INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA, + MAX_CONCURRENCY, + PROCESSES_PER_CORE, + UPLOAD_CONFIRMATION, + WORKSPACE_MANAGED_DATASTORE, + WORKSPACE_MANAGED_DATASTORE_WITH_SLASH, +) +from azure.ai.ml._restclient.v2022_02_01_preview.operations import ( + ComponentContainersOperations, + ComponentVersionsOperations, + DataContainersOperations, + DataVersionsOperations, + EnvironmentContainersOperations, + EnvironmentVersionsOperations, + ModelContainersOperations, + ModelVersionsOperations, +) +from azure.ai.ml._restclient.v2022_05_01.models import ( + DataVersionBaseData, + ModelVersionData, + ModelVersionResourceArmPaginatedResult, +) +from azure.ai.ml._restclient.v2023_04_01.models import PendingUploadRequestDto +from azure.ai.ml._utils._pathspec import GitWildMatchPattern, normalize_file +from azure.ai.ml._utils.utils import convert_windows_path_to_unix, retry, snake_to_camel +from azure.ai.ml.constants._common import ( + MAX_AUTOINCREMENT_ATTEMPTS, + DefaultOpenEncoding, + OrderString, +) +from azure.ai.ml.entities._assets.asset import Asset +from azure.ai.ml.exceptions import ( + AssetPathException, + EmptyDirectoryError, + ErrorCategory, + ErrorTarget, + ValidationErrorType, + ValidationException, +) +from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError + +if TYPE_CHECKING: + from azure.ai.ml.operations import ( + ComponentOperations, + DataOperations, + EnvironmentOperations, + IndexOperations, + ModelOperations, + ) + +hash_type = type(hashlib.md5()) # nosec + +module_logger = logging.getLogger(__name__) + + +class AssetNotChangedError(Exception): + pass + + +class IgnoreFile(object): + def __init__(self, file_path: Optional[Union[str, os.PathLike]] = None): + """Base class for handling .gitignore and .amlignore files. + + :param file_path: Relative path, or absolute path to the ignore file. + """ + path = Path(file_path).resolve() if file_path else None + self._path = path + self._path_spec = None + + def exists(self) -> bool: + """Checks if ignore file exists. + :return: True if file exists. False Otherwise + :rtype: bool + """ + return self._file_exists() + + def _file_exists(self) -> bool: + return self._path and self._path.exists() + + @property + def base_path(self) -> Path: + return self._path.parent + + def _get_ignore_list(self) -> List[str]: + """Get ignore list from ignore file contents. + + :return: The lines of the ignore file + :rtype: List[str] + """ + if not self.exists(): + return [] + if self._file_exists(): + with open(self._path, "r", encoding=DefaultOpenEncoding.READ) as fh: + return [line.rstrip() for line in fh if line] + return [] + + def _create_pathspec(self) -> List[GitWildMatchPattern]: + """Creates path specification based on ignore list. + + :return: Path specification + :rtype: List[GitWildMatchPattern] + """ + return [GitWildMatchPattern(ignore) for ignore in self._get_ignore_list()] + + def _get_rel_path(self, file_path: Union[str, os.PathLike]) -> Optional[str]: + """Get relative path of given file_path. + + :param file_path: A file path + :type file_path: Union[str, os.PathLike] + :return: file_path relative to base_path, if computable. None otherwise + :rtype: Optional[str] + """ + file_path = Path(file_path).absolute() + try: + # use os.path.relpath instead of Path.relative_to in case file_path is not a child of self.base_path + return os.path.relpath(file_path, self.base_path) + except ValueError: + # 2 paths are on different drives + return None + + def is_file_excluded(self, file_path: Union[str, os.PathLike]) -> bool: + """Checks if given file_path is excluded. + + :param file_path: File path to be checked against ignore file specifications + :type file_path: Union[str, os.PathLike] + :return: Whether the file is excluded by ignore file + :rtype: bool + """ + # TODO: current design of ignore file can't distinguish between files and directories of the same name + if self._path_spec is None: + self._path_spec = self._create_pathspec() + if not self._path_spec: + return False + file_path = self._get_rel_path(file_path) + if file_path is None: + return True + + norm_file = normalize_file(file_path) + matched = False + for pattern in self._path_spec: + if pattern.include is not None: + if pattern.match_file(norm_file) is not None: + matched = pattern.include + + return matched + + @property + def path(self) -> Union[Path, str]: + return self._path + + +class AmlIgnoreFile(IgnoreFile): + def __init__(self, directory_path: Union[Path, str]): + file_path = Path(directory_path).joinpath(AML_IGNORE_FILE_NAME) + super(AmlIgnoreFile, self).__init__(file_path) + + +class GitIgnoreFile(IgnoreFile): + def __init__(self, directory_path: Union[Path, str]): + file_path = Path(directory_path).joinpath(GIT_IGNORE_FILE_NAME) + super(GitIgnoreFile, self).__init__(file_path) + + +def get_ignore_file(directory_path: Union[Path, str]) -> IgnoreFile: + """Finds and returns IgnoreFile object based on ignore file found in directory_path. + + .amlignore takes precedence over .gitignore and if no file is found, an empty + IgnoreFile object will be returned. + + The ignore file must be in the root directory. + + :param directory_path: Path to the (root) directory where ignore file is located + :type directory_path: Union[Path, str] + :return: The IgnoreFile found in the directory + :rtype: IgnoreFile + """ + aml_ignore = AmlIgnoreFile(directory_path) + git_ignore = GitIgnoreFile(directory_path) + + if aml_ignore.exists(): + return aml_ignore + if git_ignore.exists(): + return git_ignore + return IgnoreFile() + + +def _validate_path(path: Union[str, os.PathLike], _type: str) -> None: + path = Path(path) # Okay to do this since Path is idempotent + if not path.is_file() and not path.is_dir(): + raise ValidationException( + message=f"No such file or directory: {path}", + target=_type, + error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND, + no_personal_data_message="No such file or directory", + error_category=ErrorCategory.USER_ERROR, + ) + + +def _parse_name_version( + name: Optional[str] = None, version_as_int: bool = True +) -> Tuple[Optional[str], Optional[Union[str, int]]]: + if not name: + return None, None + + token_list = name.split(":") + if len(token_list) == 1: + return name, None + *name, version = token_list + if version_as_int: + version = int(version) + return ":".join(name), version + + +def _get_file_hash(filename: Union[str, os.PathLike], _hash: hash_type) -> hash_type: + with open(str(filename), "rb") as f: + for chunk in iter(lambda: f.read(CHUNK_SIZE), b""): + _hash.update(chunk) + return _hash + + +def delete_two_catalog_files(path): + """ + Function that deletes the "catalog.json" and "catalog.json.sig" files located at 'path', if they exist + + :param path: Path to the folder for signing + :type path: Union[Path, str] + :return: None + """ + # catalog.json + file_path_json = os.path.join(path, "catalog.json") + if os.path.exists(file_path_json): + module_logger.warning("%s already exists. Deleting it", file_path_json) + os.remove(file_path_json) + # catalog.json.sig + file_path_json_sig = os.path.join(path, "catalog.json.sig") + if os.path.exists(file_path_json_sig): + module_logger.warning("%s already exists. Deleting it", file_path_json_sig) + os.remove(file_path_json_sig) + + +def create_catalog_files(path, json_stub): + with open(os.path.join(path, "catalog.json"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile1: + json.dump(json_stub, jsonFile1) + with open(os.path.join(path, "catalog.json.sig"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile2: + json.dump(json_stub, jsonFile2) + + +def _get_dir_hash(directory: Union[str, os.PathLike], _hash: hash_type, ignore_file: IgnoreFile) -> hash_type: + dir_contents = Path(directory).iterdir() + sorted_contents = sorted(dir_contents, key=lambda path: str(path).lower()) + for path in sorted_contents: + if ignore_file.is_file_excluded(path): + continue + _hash.update(path.name.encode()) + if os.path.islink(path): # ensure we're hashing the contents of the linked file + path = _resolve_path(path) + if path.is_file(): + _hash = _get_file_hash(path, _hash) + elif path.is_dir(): + _hash = _get_dir_hash(path, _hash, ignore_file) + return _hash + + +def _build_metadata_dict(name: str, version: str) -> Dict[str, str]: + """Build metadata dictionary to attach to uploaded data. + + Metadata includes an upload confirmation field, and for code uploads only, the name and version of the code asset + being created for that data. + + :param name: The name of the uploaded data + :type name: str + :param version: The version of the uploaded data + :type version: str + :return: Metadata dict + :rtype: Dict[str, str] + """ + if name: + linked_asset_arm_id = {"name": name, "version": version} + else: + msg = "'name' cannot be NoneType for asset artifact upload." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.ASSET, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.INVALID_VALUE, + ) + + metadata_dict = {**UPLOAD_CONFIRMATION, **linked_asset_arm_id} + return metadata_dict + + +def get_object_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> str: + _hash = hashlib.md5(b"Initialize for october 2021 AML CLI version") # nosec + if Path(path).is_dir(): + object_hash = _get_dir_hash(directory=path, _hash=_hash, ignore_file=ignore_file) + else: + if os.path.islink(path): # ensure we're hashing the contents of the linked file + path = _resolve_path(Path(path)) + object_hash = _get_file_hash(filename=path, _hash=_hash) + return str(object_hash.hexdigest()) + + +def get_content_hash_version(): + return 202208 + + +def get_content_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> Optional[str]: + """Generating sha256 hash for file/folder, + + e.g. Code snapshot fingerprints to prevent tampering. + + The process of hashing is: + 1. If it's a link, get the actual path of the link. + 2. If it's a file, append file content. + 3. If it's a folder: + 1. list all files under the folder + 2. convert file count to str and append to hash + 3. sort the files by lower case of relative path + 4. for each file append '#'+relative path+'#' and file size to hash + 5. do another iteration on file list to append each files content to hash. + The example of absolute path to relative path mapping is: + [ + ('/mnt/c/codehash/code/file1.txt', 'file1.txt'), + ('/mnt/c/codehash/code/folder1/file1.txt', 'folder1/file1.txt'), + ('/mnt/c/codehash/code/Folder2/file1.txt', 'Folder2/file1.txt'), + ('/mnt/c/codehash/code/Folder2/folder1/file1.txt', 'Folder2/folder1/file1.txt') + ] + 4. Hash the content and convert to hex digest string. + + :param path: The directory to calculate the size of + :type path: Union[str, os.PathLike] + :param ignore_file: An ignore file that specifies files to ignore when computing the size + :type ignore_file: IgnoreFile + :return: The content hash if the content is a link, directory, or file. None otherwise + :rtype: Optional[str] + """ + # DO NOT change this function unless you change the verification logic together + actual_path = path + if os.path.islink(path): + actual_path = _resolve_path(Path(path)).as_posix() + if os.path.isdir(actual_path): + return _get_file_list_content_hash(get_upload_files_from_folder(actual_path, ignore_file=ignore_file)) + if os.path.isfile(actual_path): + return _get_file_list_content_hash([(actual_path, Path(actual_path).name)]) + return None + + +def get_upload_files_from_folder( + path: Union[str, os.PathLike], + *, + prefix: str = "", + ignore_file: IgnoreFile = IgnoreFile(), +) -> List[str]: + path = Path(path) + upload_paths = [] + for root, _, files in os.walk(path, followlinks=True): + upload_paths += list( + traverse_directory( + root, + files, + prefix=Path(prefix).joinpath(Path(root).relative_to(path)).as_posix(), + ignore_file=ignore_file, + ) + ) + return upload_paths + + +def _get_file_list_content_hash(file_list) -> str: + # file_list is a list of tuples, (absolute_path, relative_path) + + _hash = hashlib.sha256() + # Add file count to the hash and add '#' around file name then add each file's size to avoid collision like: + # Case 1: + # 'a.txt' with contents 'a' + # 'b.txt' with contents 'b' + # + # Case 2: + # cspell:disable-next-line + # 'a.txt' with contents 'ab.txtb' + _hash.update(str(len(file_list)).encode()) + # Sort by "destination" path, since in this function destination prefix is empty and keep the link name in path. + for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()): + _hash.update(("#" + file_name + "#").encode()) + _hash.update(str(os.path.getsize(file_path)).encode()) + for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()): + _hash = _get_file_hash(file_path, _hash) + return str(_hash.hexdigest()) + + +def traverse_directory( # pylint: disable=unused-argument + root: str, + files: List[str], + *, + prefix: str, + ignore_file: IgnoreFile = IgnoreFile(), + # keep this for backward compatibility + **kwargs: Any, +) -> Iterable[Tuple[str, Union[str, Any]]]: + """Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage. + e.g. + + [/mnt/c/Users/dipeck/upload_files/my_file1.txt, + /mnt/c/Users/dipeck/upload_files/my_file2.txt] --> + + [(/mnt/c/Users/dipeck/upload_files/my_file1.txt, LocalUpload/<guid>/upload_files/my_file1.txt), + (/mnt/c/Users/dipeck/upload_files/my_file2.txt, LocalUpload/<guid>/upload_files/my_file2.txt))] + + :param root: Root directory path + :type root: str + :param files: List of all file paths in the directory + :type files: List[str] + :keyword prefix: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir) + :paramtype prefix: str + :keyword ignore_file: The .amlignore or .gitignore file in the project directory + :paramtype ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile + :return: Zipped list of tuples representing the local path and remote destination path for each file + :rtype: Iterable[Tuple[str, Union[str, Any]]] + """ + # Normalize Windows paths. Note that path should be resolved first as long part will be converted to a shortcut in + # Windows. For example, C:\Users\too-long-user-name\test will be converted to C:\Users\too-lo~1\test by default. + # Refer to https://en.wikipedia.org/wiki/8.3_filename for more details. + root = Path(root).resolve().absolute() + + # filter out files excluded by the ignore file + # TODO: inner ignore file won't take effect. A merged IgnoreFile need to be generated in code resolution. + origin_file_paths = [ + root.joinpath(filename) + for filename in files + if not ignore_file.is_file_excluded(root.joinpath(filename).as_posix()) + ] + + result = [] + for origin_file_path in origin_file_paths: + relative_path = origin_file_path.relative_to(root) + result.append( + ( + _resolve_path(origin_file_path).as_posix(), + Path(prefix).joinpath(relative_path).as_posix(), + ) + ) + return result + + +def _resolve_path(path: Path) -> Path: + if not path.is_symlink(): + return path + + link_path = path.resolve() + if not link_path.is_absolute(): + link_path = path.parent.joinpath(link_path).resolve() + return _resolve_path(link_path) + + +def generate_asset_id(asset_hash: str, include_directory=True) -> str: + asset_id = asset_hash or str(uuid.uuid4()) + if include_directory: + asset_id = "/".join((ARTIFACT_ORIGIN, asset_id)) + return asset_id + + +def get_directory_size( + root: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile(None) +) -> Tuple[int, Dict[str, int]]: + """Returns total size of a directory and a dictionary itemizing each sub- path and its size. + + If an optional ignore_file argument is provided, then files specified in the ignore file are not included in the + directory size calculation. + + :param root: The directory to calculate the size of + :type root: Union[str, os.PathLike] + :param ignore_file: An ignore file that specifies files to ignore when computing the size + :type ignore_file: IgnoreFile + :return: The computed size of the directory, and the sizes of the child paths + :rtype: Tuple[int, Dict[str, int]] + """ + total_size = 0 + size_list = {} + for dirpath, _, filenames in os.walk(root, followlinks=True): + for name in filenames: + full_path = os.path.join(dirpath, name) + # Don't count files that are excluded by an ignore file + if ignore_file.is_file_excluded(full_path): + continue + if not os.path.islink(full_path): + path_size = os.path.getsize(full_path) + else: + # ensure we're counting the size of the linked file + # os.readlink returns a file path relative to dirpath, and must be + # re-joined to get a workable result + path_size = os.path.getsize(os.path.join(dirpath, os.readlink(convert_windows_path_to_unix(full_path)))) + size_list[full_path] = path_size + total_size += path_size + return total_size, size_list + + +def upload_file( + storage_client: Union["BlobStorageClient", "Gen2StorageClient"], + source: str, + dest: Optional[str] = None, + msg: Optional[str] = None, + size: int = 0, + show_progress: Optional[bool] = None, + in_directory: bool = False, + callback: Optional[Any] = None, +) -> None: + """Upload a single file to remote storage. + + :param storage_client: Storage client object + :type storage_client: Union[ + azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient, + azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient] + :param source: Local path to project directory + :type source: str + :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir) + :type dest: str + :param msg: Message to be shown with progress bar (e.g. "Uploading <source>") + :type msg: str + :param size: Size of the file in bytes + :type size: int + :param show_progress: Whether to show progress bar or not + :type show_progress: bool + :param in_directory: Whether the file is part of a directory of files + :type in_directory: bool + :param callback: Callback to progress bar + :type callback: Any + :return: None + """ + validate_content = size > 0 # don't do checksum for empty files + + if ( + type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME + ): # Only for Gen2StorageClient, Blob Storage doesn't have true directories + if in_directory: + storage_client.temp_sub_directory_client = None + file_name_tail = dest.split(os.path.sep)[-1] + # Indexing from 2 because the first two parts of the remote path will always be LocalUpload/<asset_id> + all_sub_folders = dest.split(os.path.sep)[2:-1] + + # Create remote directories for each nested directory if file is in a nested directory + for sub_folder in all_sub_folders: + if storage_client.temp_sub_directory_client: + storage_client.temp_sub_directory_client = ( + storage_client.temp_sub_directory_client.create_sub_directory(sub_folder) + ) + else: + storage_client.temp_sub_directory_client = storage_client.directory_client.create_sub_directory( + sub_folder + ) + + storage_client.file_client = storage_client.temp_sub_directory_client.create_file(file_name_tail) + else: + storage_client.file_client = storage_client.directory_client.create_file(source.split("/")[-1]) + + with open(source, "rb") as data: + if show_progress and not in_directory: + file_size, _ = get_directory_size(source) + file_size_in_mb = file_size / 10**6 + if file_size_in_mb < 1: + msg += Fore.GREEN + " (< 1 MB)" + else: + msg += Fore.GREEN + f" ({round(file_size_in_mb, 2)} MBs)" + cntx_manager = FileUploadProgressBar(msg=msg) + else: + cntx_manager = suppress() + + with cntx_manager as c: + callback = c.update_to if (show_progress and not in_directory) else None + if type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME: + storage_client.file_client.upload_data( + data=data.read(), + overwrite=True, + validate_content=validate_content, + raw_response_hook=callback, + max_concurrency=MAX_CONCURRENCY, + ) + elif type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME: + storage_client.container_client.upload_blob( + name=dest, + data=data, + validate_content=validate_content, + overwrite=storage_client.overwrite, + raw_response_hook=callback, + max_concurrency=MAX_CONCURRENCY, + connection_timeout=DEFAULT_CONNECTION_TIMEOUT, + ) + + storage_client.uploaded_file_count += 1 + + +def upload_directory( + storage_client: Union["BlobStorageClient", "Gen2StorageClient"], + source: Union[str, os.PathLike], + dest: str, + msg: str, + show_progress: bool, + ignore_file: IgnoreFile, +) -> None: + """Upload directory to remote storage. + + :param storage_client: Storage client object + :type storage_client: Union[ + azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient, + azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient] + :param source: Local path to project directory + :type source: Union[str, os.PathLike] + :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir) + :type dest: str + :param msg: Message to be shown with progress bar (e.g. "Uploading <source>") + :type msg: str + :param show_progress: Whether to show progress bar or not + :type show_progress: bool + :param ignore_file: The .amlignore or .gitignore file in the project directory + :type ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile + :return: None + """ + source_path = Path(source).resolve() + prefix = "" if dest == "" else dest + "/" + prefix += os.path.basename(source_path) + "/" + + if ( + type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME + ): # Only for Gen2StorageClient, Blob Storage doesn't have true directories + storage_client.sub_directory_client = storage_client.directory_client.create_sub_directory( + prefix.strip("/").split("/")[-1] + ) + + # Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage + upload_paths = get_upload_files_from_folder( + source_path, + prefix=prefix, + ignore_file=ignore_file, + ) + size_dict = {} + total_size = 0 + + # Get each file's size for progress bar tracking + for path, _ in upload_paths: + # TODO: symbol links are already resolved + if os.path.islink(path): + path_size = os.path.getsize( + os.readlink(convert_windows_path_to_unix(path)) + ) # ensure we're counting the size of the linked file + else: + path_size = os.path.getsize(path) + size_dict[path] = path_size + total_size += path_size + + upload_paths = sorted(upload_paths) + if len(upload_paths) == 0: + raise EmptyDirectoryError( + message=EMPTY_DIRECTORY_ERROR.format(source), + no_personal_data_message=msg.format("[source]"), + target=ErrorTarget.ARTIFACT, + error_category=ErrorCategory.USER_ERROR, + ) + storage_client.total_file_count = len(upload_paths) + + if ( + type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME + ): # Only for Gen2StorageClient, Blob Storage doesn't have true directories + # Only for BlobStorageClient + # Azure Blob doesn't allow metadata setting at the directory level, so the first + # file in the directory is designated as the file where the confirmation metadata + # will be added at the end of the upload. + storage_client.indicator_file = upload_paths[0][1] + storage_client.check_blob_exists() + + # Submit paths to workers for upload + num_cores = int(cpu_count()) * PROCESSES_PER_CORE + with ThreadPoolExecutor(max_workers=num_cores) as ex: + futures_dict = { + ex.submit( + upload_file, + storage_client=storage_client, + source=src, + dest=dest, + size=size_dict.get(src), + in_directory=True, + show_progress=show_progress, + ): (src, dest) + for (src, dest) in upload_paths + } + if show_progress: + warnings.simplefilter("ignore", category=TqdmWarning) + msg += f" ({round(total_size/10**6, 2)} MBs)" + is_windows = system() == "Windows" # Default unicode progress bar doesn't display well on Windows + with tqdm(total=total_size, desc=msg, ascii=is_windows) as pbar: + for future in as_completed(futures_dict): + future.result() # access result to propagate any exceptions + file_path_name = futures_dict[future][0] + pbar.update(size_dict.get(file_path_name) or 0) + + +@retry( + exceptions=ResourceExistsError, + failure_msg="Asset creation exceeded maximum retries.", + logger=module_logger, + max_attempts=MAX_AUTOINCREMENT_ATTEMPTS, +) +def _create_or_update_autoincrement( + name: str, + body: Any, + version_operation: Any, + container_operation: Any, + resource_group_name: str, + workspace_name: str, + **kwargs, +) -> Any: + try: + container = container_operation.get( + name=name, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + **kwargs, + ) + version = container.properties.next_version + + except ResourceNotFoundError: + version = "1" + + result = version_operation.create_or_update( + name=name, + version=version, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + body=body, + **kwargs, + ) + return result + + +def _get_next_version_from_container( + name: str, + container_operation: Any, + resource_group_name: str, + workspace_name: str, + registry_name: str = None, + **kwargs, +) -> str: + try: + container = ( + container_operation.get( + name=name, + resource_group_name=resource_group_name, + registry_name=registry_name, + **kwargs, + ) + if registry_name + else container_operation.get( + name=name, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + **kwargs, + ) + ) + version = container.properties.next_version + + except ResourceNotFoundError: + version = "1" + return version + + +def _get_next_latest_versions_from_container( + name: str, + container_operation: Any, + resource_group_name: str, + workspace_name: str, + registry_name: str = None, + **kwargs, +) -> str: + try: + container = ( + container_operation.get( + name=name, + resource_group_name=resource_group_name, + registry_name=registry_name, + **kwargs, + ) + if registry_name + else container_operation.get( + name=name, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + **kwargs, + ) + ) + next_version = container.properties.next_version + latest_version = container.properties.latest_version + + except ResourceNotFoundError: + next_version = "1" + latest_version = "1" + return next_version, latest_version + + +def _get_latest_version_from_container( + asset_name: str, + container_operation: Any, + resource_group_name: str, + workspace_name: Optional[str] = None, + registry_name: Optional[str] = None, + **kwargs, +) -> str: + try: + container = ( + container_operation.get( + name=asset_name, + resource_group_name=resource_group_name, + registry_name=registry_name, + **kwargs, + ) + if registry_name + else container_operation.get( + name=asset_name, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + **kwargs, + ) + ) + version = container.properties.latest_version + + except ResourceNotFoundError as e: + message = ( + f"Asset {asset_name} does not exist in registry {registry_name}." + if registry_name + else f"Asset {asset_name} does not exist in workspace {workspace_name}." + ) + no_personal_data_message = ( + "Asset {asset_name} does not exist in registry {registry_name}." + if registry_name + else "Asset {asset_name} does not exist in workspace {workspace_name}." + ) + raise ValidationException( + message=message, + no_personal_data_message=no_personal_data_message, + target=ErrorTarget.ASSET, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.RESOURCE_NOT_FOUND, + ) from e + return version + + +def _get_latest( + asset_name: str, + version_operation: Any, + resource_group_name: str, + workspace_name: Optional[str] = None, + registry_name: Optional[str] = None, + order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC] = OrderString.CREATED_AT_DESC, + **kwargs, +) -> Union[ModelVersionData, DataVersionBaseData]: + """Retrieve the latest version of the asset with the given name. + + Latest is defined as the most recently created, not the most recently updated. + + :param asset_name: The asset name + :type asset_name: str + :param version_operation: Any + :type version_operation: Any + :param resource_group_name: The resource group name + :type resource_group_name: str + :param workspace_name: The workspace name + :type workspace_name: Optional[str] + :param registry_name: The registry name + :type registry_name: Optional[str] + :param order_by: Specifies how to order the results. Defaults to :attr:`OrderString.CREATED_AT_DESC` + :type order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC] + :return: The latest version of the requested asset + :rtype: Union[ModelVersionData, DataVersionBaseData] + """ + result = ( + version_operation.list( + name=asset_name, + resource_group_name=resource_group_name, + registry_name=registry_name, + order_by=order_by, + top=1, + **kwargs, + ) + if registry_name + else version_operation.list( + name=asset_name, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + order_by=order_by, + top=1, + **kwargs, + ) + ) + try: + latest = result.next() + except StopIteration: + latest = None + + if latest and isinstance(latest, ModelVersionResourceArmPaginatedResult): + # Data list return object doesn't require this since its elements are already DatasetVersionResources + latest = cast(ModelVersionData, latest) + if not latest: + message = ( + f"Asset {asset_name} does not exist in registry {registry_name}." + if registry_name + else f"Asset {asset_name} does not exist in workspace {workspace_name}." + ) + no_personal_data_message = ( + "Asset {asset_name} does not exist in registry {registry_name}." + if registry_name + else "Asset {asset_name} does not exist in workspace {workspace_name}." + ) + raise ValidationException( + message=message, + no_personal_data_message=no_personal_data_message, + target=ErrorTarget.ASSET, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.RESOURCE_NOT_FOUND, + ) + return latest + + +def _archive_or_restore( + asset_operations: Union[ + "DataOperations", + "EnvironmentOperations", + "ModelOperations", + "ComponentOperations", + ], + version_operation: Union[ + "DataVersionsOperations", + "EnvironmentVersionsOperations", + "ModelVersionsOperations", + "ComponentVersionsOperations", + ], + container_operation: Union[ + "DataContainersOperations", + "EnvironmentContainersOperations", + "ModelContainersOperations", + "ComponentContainersOperations", + ], + is_archived: bool, + name: str, + version: Optional[str] = None, + label: Optional[str] = None, +) -> None: + resource_group_name = asset_operations._operation_scope._resource_group_name + workspace_name = asset_operations._workspace_name + registry_name = asset_operations._registry_name + if version and label: + msg = "Cannot specify both version and label." + raise ValidationException( + message=msg, + no_personal_data_message=msg, + target=ErrorTarget.ASSET, + error_category=ErrorCategory.USER_ERROR, + error_type=ValidationErrorType.RESOURCE_NOT_FOUND, + ) + if label: + version = _resolve_label_to_asset(asset_operations, name, label).version + + if version: + version_resource = ( + version_operation.get( + name=name, + version=version, + resource_group_name=resource_group_name, + registry_name=registry_name, + ) + if registry_name + else version_operation.get( + name=name, + version=version, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + ) + ) + version_resource.properties.is_archived = is_archived + version_resource.properties.stage = None + ( # pylint: disable=expression-not-assigned + version_operation.begin_create_or_update( + name=name, + version=version, + resource_group_name=resource_group_name, + registry_name=registry_name, + body=version_resource, + ) + if registry_name + else version_operation.create_or_update( + name=name, + version=version, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + body=version_resource, + ) + ) + else: + container_resource = ( + container_operation.get( + name=name, + resource_group_name=resource_group_name, + registry_name=registry_name, + ) + if registry_name + else container_operation.get( + name=name, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + ) + ) + container_resource.properties.is_archived = is_archived + ( # pylint: disable=expression-not-assigned + container_operation.create_or_update( + name=name, + resource_group_name=resource_group_name, + registry_name=registry_name, + body=container_resource, + ) + if registry_name + else container_operation.create_or_update( + name=name, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + body=container_resource, + ) + ) + + +def _resolve_label_to_asset( + assetOperations: Union[ + "DataOperations", + "ComponentOperations", + "EnvironmentOperations", + "ModelOperations", + "IndexOperations", + ], + name: str, + label: str, +) -> Asset: + """Returns the asset referred to by the given label. + + Throws if label does not refer to a version of the named asset + + :param assetOperations: The operations class used to retrieve the asset + :type assetOperations: + Union["DataOperations", "ComponentOperations", "EnvironmentOperations", "ModelOperations", "IndexOperations"] + :param name: The name of the asset + :type name: str + :param label: The label to resolve + :type label: str + :return: The requested asset + :rtype: Asset + """ + + resolver = assetOperations._managed_label_resolver.get(label, None) + if not resolver: + scope = "registry" if assetOperations._registry_name else "workspace" + msg = "Asset {} with version label {} does not exist in {}." + raise ValidationException( + message=msg.format(name, label, scope), + no_personal_data_message=msg.format("[name]", "[label]", "[scope]"), + target=ErrorTarget.ASSET, + error_type=ValidationErrorType.RESOURCE_NOT_FOUND, + ) + return resolver(name) + + +def _check_or_modify_auto_delete_setting( + autoDeleteSetting: Union[Dict, "AutoDeleteSetting"], +): + if autoDeleteSetting is not None: + if hasattr(autoDeleteSetting, "condition"): + condition = getattr(autoDeleteSetting, "condition") + condition = snake_to_camel(condition) + setattr(autoDeleteSetting, "condition", condition) + elif "condition" in autoDeleteSetting: + autoDeleteSetting["condition"] = snake_to_camel(autoDeleteSetting["condition"]) + + +def _validate_workspace_managed_datastore(path: Optional[Union[str, PathLike]]) -> Optional[Union[str, PathLike]]: + # block cumtomer specified path on managed datastore + if path.startswith(WORKSPACE_MANAGED_DATASTORE_WITH_SLASH) or path == WORKSPACE_MANAGED_DATASTORE: + path = path.rstrip("/") + + if path != WORKSPACE_MANAGED_DATASTORE: + raise AssetPathException( + message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA, + tartget=ErrorTarget.DATA, + no_personal_data_message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA, + error_category=ErrorCategory.USER_ERROR, + ) + + return path + "/paths" + return path + + +def _validate_auto_delete_setting_in_data_output( + auto_delete_setting: Optional[Union[Dict, "AutoDeleteSetting"]] +) -> None: + # avoid specifying auto_delete_setting in job output now + if auto_delete_setting: + raise ValidationException( + message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA, + tartget=ErrorTarget.DATA, + no_personal_data_message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA, + error_category=ErrorCategory.USER_ERROR, + ) + + +class FileUploadProgressBar(tqdm): + def __init__(self, msg: Optional[str] = None): + warnings.simplefilter("ignore", category=TqdmWarning) + is_windows = system() == "Windows" # Default unicode progress bar doesn't display well on Windows + super().__init__(unit="B", unit_scale=True, desc=msg, ascii=is_windows) + + def update_to(self, response): + current = response.context["upload_stream_current"] + self.total = response.context["data_stream_total"] + if current: + self.update(current - self.n) + + +class DirectoryUploadProgressBar(tqdm): + def __init__(self, dir_size: int, msg: Optional[str] = None): + super().__init__(unit="B", unit_scale=True, desc=msg, colour="green") + self.total = dir_size + self.completed = 0 + + def update_to(self, response): + current = None + if response.context["upload_stream_current"]: + current = response.context["upload_stream_current"] + self.completed + self.completed = current + if current: + self.update(current - self.n) + + +def get_storage_info_for_non_registry_asset( + service_client, workspace_name: str, name: str, version: str, resource_group: str +) -> Dict[str, str]: + """Get SAS uri and blob uri for non-registry asset. Note that this function won't return the same + SAS uri and blob uri for the same asset. It will return a new SAS uri and blob uri every time it is called. + :param service_client: Service client + :type service_client: AzureMachineLearningWorkspaces + :param workspace_name: The workspace name + :type workspace_name: str + :param name: Asset name + :type name: str + :param version: Asset version + :type version: str + :param resource_group: Resource group + :type resource_group: str + :return: The sas_uri and blob_uri + :rtype: Dict[str, str] + """ + request_body = PendingUploadRequestDto(pending_upload_type="TemporaryBlobReference") + response = service_client.code_versions.create_or_get_start_pending_upload( + resource_group_name=resource_group, + workspace_name=workspace_name, + name=name, + version=version, + body=request_body, + ) + + sas_info = { + "sas_uri": response.blob_reference_for_consumption.credential.sas_uri, + "blob_uri": response.blob_reference_for_consumption.blob_uri, + } + + return sas_info + + +def _get_existing_asset_name_and_version(existing_asset): + import re + + regex = r"/codes/([^/]+)/versions/([^/]+)" + + arm_id = existing_asset.id + match = re.search(regex, arm_id) + name = match.group(1) + version = match.group(2) + + return name, version |