aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py1224
1 files changed, 1224 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py
new file mode 100644
index 00000000..7081d9d6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py
@@ -0,0 +1,1224 @@
+# ---------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# ---------------------------------------------------------
+
+# pylint: disable=protected-access,too-many-lines
+
+import hashlib
+import logging
+import os
+import json
+import uuid
+import warnings
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from contextlib import suppress
+from multiprocessing import cpu_count
+from os import PathLike
+from pathlib import Path
+from platform import system
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Tuple,
+ Union,
+ cast,
+)
+
+from colorama import Fore
+from tqdm import TqdmWarning, tqdm
+from typing_extensions import Literal
+
+from azure.ai.ml._artifacts._constants import (
+ AML_IGNORE_FILE_NAME,
+ ARTIFACT_ORIGIN,
+ AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
+ BLOB_STORAGE_CLIENT_NAME,
+ CHUNK_SIZE,
+ DEFAULT_CONNECTION_TIMEOUT,
+ EMPTY_DIRECTORY_ERROR,
+ GEN2_STORAGE_CLIENT_NAME,
+ GIT_IGNORE_FILE_NAME,
+ INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
+ MAX_CONCURRENCY,
+ PROCESSES_PER_CORE,
+ UPLOAD_CONFIRMATION,
+ WORKSPACE_MANAGED_DATASTORE,
+ WORKSPACE_MANAGED_DATASTORE_WITH_SLASH,
+)
+from azure.ai.ml._restclient.v2022_02_01_preview.operations import (
+ ComponentContainersOperations,
+ ComponentVersionsOperations,
+ DataContainersOperations,
+ DataVersionsOperations,
+ EnvironmentContainersOperations,
+ EnvironmentVersionsOperations,
+ ModelContainersOperations,
+ ModelVersionsOperations,
+)
+from azure.ai.ml._restclient.v2022_05_01.models import (
+ DataVersionBaseData,
+ ModelVersionData,
+ ModelVersionResourceArmPaginatedResult,
+)
+from azure.ai.ml._restclient.v2023_04_01.models import PendingUploadRequestDto
+from azure.ai.ml._utils._pathspec import GitWildMatchPattern, normalize_file
+from azure.ai.ml._utils.utils import convert_windows_path_to_unix, retry, snake_to_camel
+from azure.ai.ml.constants._common import (
+ MAX_AUTOINCREMENT_ATTEMPTS,
+ DefaultOpenEncoding,
+ OrderString,
+)
+from azure.ai.ml.entities._assets.asset import Asset
+from azure.ai.ml.exceptions import (
+ AssetPathException,
+ EmptyDirectoryError,
+ ErrorCategory,
+ ErrorTarget,
+ ValidationErrorType,
+ ValidationException,
+)
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
+
+if TYPE_CHECKING:
+ from azure.ai.ml.operations import (
+ ComponentOperations,
+ DataOperations,
+ EnvironmentOperations,
+ IndexOperations,
+ ModelOperations,
+ )
+
+hash_type = type(hashlib.md5()) # nosec
+
+module_logger = logging.getLogger(__name__)
+
+
+class AssetNotChangedError(Exception):
+ pass
+
+
+class IgnoreFile(object):
+ def __init__(self, file_path: Optional[Union[str, os.PathLike]] = None):
+ """Base class for handling .gitignore and .amlignore files.
+
+ :param file_path: Relative path, or absolute path to the ignore file.
+ """
+ path = Path(file_path).resolve() if file_path else None
+ self._path = path
+ self._path_spec = None
+
+ def exists(self) -> bool:
+ """Checks if ignore file exists.
+ :return: True if file exists. False Otherwise
+ :rtype: bool
+ """
+ return self._file_exists()
+
+ def _file_exists(self) -> bool:
+ return self._path and self._path.exists()
+
+ @property
+ def base_path(self) -> Path:
+ return self._path.parent
+
+ def _get_ignore_list(self) -> List[str]:
+ """Get ignore list from ignore file contents.
+
+ :return: The lines of the ignore file
+ :rtype: List[str]
+ """
+ if not self.exists():
+ return []
+ if self._file_exists():
+ with open(self._path, "r", encoding=DefaultOpenEncoding.READ) as fh:
+ return [line.rstrip() for line in fh if line]
+ return []
+
+ def _create_pathspec(self) -> List[GitWildMatchPattern]:
+ """Creates path specification based on ignore list.
+
+ :return: Path specification
+ :rtype: List[GitWildMatchPattern]
+ """
+ return [GitWildMatchPattern(ignore) for ignore in self._get_ignore_list()]
+
+ def _get_rel_path(self, file_path: Union[str, os.PathLike]) -> Optional[str]:
+ """Get relative path of given file_path.
+
+ :param file_path: A file path
+ :type file_path: Union[str, os.PathLike]
+ :return: file_path relative to base_path, if computable. None otherwise
+ :rtype: Optional[str]
+ """
+ file_path = Path(file_path).absolute()
+ try:
+ # use os.path.relpath instead of Path.relative_to in case file_path is not a child of self.base_path
+ return os.path.relpath(file_path, self.base_path)
+ except ValueError:
+ # 2 paths are on different drives
+ return None
+
+ def is_file_excluded(self, file_path: Union[str, os.PathLike]) -> bool:
+ """Checks if given file_path is excluded.
+
+ :param file_path: File path to be checked against ignore file specifications
+ :type file_path: Union[str, os.PathLike]
+ :return: Whether the file is excluded by ignore file
+ :rtype: bool
+ """
+ # TODO: current design of ignore file can't distinguish between files and directories of the same name
+ if self._path_spec is None:
+ self._path_spec = self._create_pathspec()
+ if not self._path_spec:
+ return False
+ file_path = self._get_rel_path(file_path)
+ if file_path is None:
+ return True
+
+ norm_file = normalize_file(file_path)
+ matched = False
+ for pattern in self._path_spec:
+ if pattern.include is not None:
+ if pattern.match_file(norm_file) is not None:
+ matched = pattern.include
+
+ return matched
+
+ @property
+ def path(self) -> Union[Path, str]:
+ return self._path
+
+
+class AmlIgnoreFile(IgnoreFile):
+ def __init__(self, directory_path: Union[Path, str]):
+ file_path = Path(directory_path).joinpath(AML_IGNORE_FILE_NAME)
+ super(AmlIgnoreFile, self).__init__(file_path)
+
+
+class GitIgnoreFile(IgnoreFile):
+ def __init__(self, directory_path: Union[Path, str]):
+ file_path = Path(directory_path).joinpath(GIT_IGNORE_FILE_NAME)
+ super(GitIgnoreFile, self).__init__(file_path)
+
+
+def get_ignore_file(directory_path: Union[Path, str]) -> IgnoreFile:
+ """Finds and returns IgnoreFile object based on ignore file found in directory_path.
+
+ .amlignore takes precedence over .gitignore and if no file is found, an empty
+ IgnoreFile object will be returned.
+
+ The ignore file must be in the root directory.
+
+ :param directory_path: Path to the (root) directory where ignore file is located
+ :type directory_path: Union[Path, str]
+ :return: The IgnoreFile found in the directory
+ :rtype: IgnoreFile
+ """
+ aml_ignore = AmlIgnoreFile(directory_path)
+ git_ignore = GitIgnoreFile(directory_path)
+
+ if aml_ignore.exists():
+ return aml_ignore
+ if git_ignore.exists():
+ return git_ignore
+ return IgnoreFile()
+
+
+def _validate_path(path: Union[str, os.PathLike], _type: str) -> None:
+ path = Path(path) # Okay to do this since Path is idempotent
+ if not path.is_file() and not path.is_dir():
+ raise ValidationException(
+ message=f"No such file or directory: {path}",
+ target=_type,
+ error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
+ no_personal_data_message="No such file or directory",
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+
+def _parse_name_version(
+ name: Optional[str] = None, version_as_int: bool = True
+) -> Tuple[Optional[str], Optional[Union[str, int]]]:
+ if not name:
+ return None, None
+
+ token_list = name.split(":")
+ if len(token_list) == 1:
+ return name, None
+ *name, version = token_list
+ if version_as_int:
+ version = int(version)
+ return ":".join(name), version
+
+
+def _get_file_hash(filename: Union[str, os.PathLike], _hash: hash_type) -> hash_type:
+ with open(str(filename), "rb") as f:
+ for chunk in iter(lambda: f.read(CHUNK_SIZE), b""):
+ _hash.update(chunk)
+ return _hash
+
+
+def delete_two_catalog_files(path):
+ """
+ Function that deletes the "catalog.json" and "catalog.json.sig" files located at 'path', if they exist
+
+ :param path: Path to the folder for signing
+ :type path: Union[Path, str]
+ :return: None
+ """
+ # catalog.json
+ file_path_json = os.path.join(path, "catalog.json")
+ if os.path.exists(file_path_json):
+ module_logger.warning("%s already exists. Deleting it", file_path_json)
+ os.remove(file_path_json)
+ # catalog.json.sig
+ file_path_json_sig = os.path.join(path, "catalog.json.sig")
+ if os.path.exists(file_path_json_sig):
+ module_logger.warning("%s already exists. Deleting it", file_path_json_sig)
+ os.remove(file_path_json_sig)
+
+
+def create_catalog_files(path, json_stub):
+ with open(os.path.join(path, "catalog.json"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile1:
+ json.dump(json_stub, jsonFile1)
+ with open(os.path.join(path, "catalog.json.sig"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile2:
+ json.dump(json_stub, jsonFile2)
+
+
+def _get_dir_hash(directory: Union[str, os.PathLike], _hash: hash_type, ignore_file: IgnoreFile) -> hash_type:
+ dir_contents = Path(directory).iterdir()
+ sorted_contents = sorted(dir_contents, key=lambda path: str(path).lower())
+ for path in sorted_contents:
+ if ignore_file.is_file_excluded(path):
+ continue
+ _hash.update(path.name.encode())
+ if os.path.islink(path): # ensure we're hashing the contents of the linked file
+ path = _resolve_path(path)
+ if path.is_file():
+ _hash = _get_file_hash(path, _hash)
+ elif path.is_dir():
+ _hash = _get_dir_hash(path, _hash, ignore_file)
+ return _hash
+
+
+def _build_metadata_dict(name: str, version: str) -> Dict[str, str]:
+ """Build metadata dictionary to attach to uploaded data.
+
+ Metadata includes an upload confirmation field, and for code uploads only, the name and version of the code asset
+ being created for that data.
+
+ :param name: The name of the uploaded data
+ :type name: str
+ :param version: The version of the uploaded data
+ :type version: str
+ :return: Metadata dict
+ :rtype: Dict[str, str]
+ """
+ if name:
+ linked_asset_arm_id = {"name": name, "version": version}
+ else:
+ msg = "'name' cannot be NoneType for asset artifact upload."
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.ASSET,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.INVALID_VALUE,
+ )
+
+ metadata_dict = {**UPLOAD_CONFIRMATION, **linked_asset_arm_id}
+ return metadata_dict
+
+
+def get_object_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> str:
+ _hash = hashlib.md5(b"Initialize for october 2021 AML CLI version") # nosec
+ if Path(path).is_dir():
+ object_hash = _get_dir_hash(directory=path, _hash=_hash, ignore_file=ignore_file)
+ else:
+ if os.path.islink(path): # ensure we're hashing the contents of the linked file
+ path = _resolve_path(Path(path))
+ object_hash = _get_file_hash(filename=path, _hash=_hash)
+ return str(object_hash.hexdigest())
+
+
+def get_content_hash_version():
+ return 202208
+
+
+def get_content_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> Optional[str]:
+ """Generating sha256 hash for file/folder,
+
+ e.g. Code snapshot fingerprints to prevent tampering.
+
+ The process of hashing is:
+ 1. If it's a link, get the actual path of the link.
+ 2. If it's a file, append file content.
+ 3. If it's a folder:
+ 1. list all files under the folder
+ 2. convert file count to str and append to hash
+ 3. sort the files by lower case of relative path
+ 4. for each file append '#'+relative path+'#' and file size to hash
+ 5. do another iteration on file list to append each files content to hash.
+ The example of absolute path to relative path mapping is:
+ [
+ ('/mnt/c/codehash/code/file1.txt', 'file1.txt'),
+ ('/mnt/c/codehash/code/folder1/file1.txt', 'folder1/file1.txt'),
+ ('/mnt/c/codehash/code/Folder2/file1.txt', 'Folder2/file1.txt'),
+ ('/mnt/c/codehash/code/Folder2/folder1/file1.txt', 'Folder2/folder1/file1.txt')
+ ]
+ 4. Hash the content and convert to hex digest string.
+
+ :param path: The directory to calculate the size of
+ :type path: Union[str, os.PathLike]
+ :param ignore_file: An ignore file that specifies files to ignore when computing the size
+ :type ignore_file: IgnoreFile
+ :return: The content hash if the content is a link, directory, or file. None otherwise
+ :rtype: Optional[str]
+ """
+ # DO NOT change this function unless you change the verification logic together
+ actual_path = path
+ if os.path.islink(path):
+ actual_path = _resolve_path(Path(path)).as_posix()
+ if os.path.isdir(actual_path):
+ return _get_file_list_content_hash(get_upload_files_from_folder(actual_path, ignore_file=ignore_file))
+ if os.path.isfile(actual_path):
+ return _get_file_list_content_hash([(actual_path, Path(actual_path).name)])
+ return None
+
+
+def get_upload_files_from_folder(
+ path: Union[str, os.PathLike],
+ *,
+ prefix: str = "",
+ ignore_file: IgnoreFile = IgnoreFile(),
+) -> List[str]:
+ path = Path(path)
+ upload_paths = []
+ for root, _, files in os.walk(path, followlinks=True):
+ upload_paths += list(
+ traverse_directory(
+ root,
+ files,
+ prefix=Path(prefix).joinpath(Path(root).relative_to(path)).as_posix(),
+ ignore_file=ignore_file,
+ )
+ )
+ return upload_paths
+
+
+def _get_file_list_content_hash(file_list) -> str:
+ # file_list is a list of tuples, (absolute_path, relative_path)
+
+ _hash = hashlib.sha256()
+ # Add file count to the hash and add '#' around file name then add each file's size to avoid collision like:
+ # Case 1:
+ # 'a.txt' with contents 'a'
+ # 'b.txt' with contents 'b'
+ #
+ # Case 2:
+ # cspell:disable-next-line
+ # 'a.txt' with contents 'ab.txtb'
+ _hash.update(str(len(file_list)).encode())
+ # Sort by "destination" path, since in this function destination prefix is empty and keep the link name in path.
+ for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
+ _hash.update(("#" + file_name + "#").encode())
+ _hash.update(str(os.path.getsize(file_path)).encode())
+ for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
+ _hash = _get_file_hash(file_path, _hash)
+ return str(_hash.hexdigest())
+
+
+def traverse_directory( # pylint: disable=unused-argument
+ root: str,
+ files: List[str],
+ *,
+ prefix: str,
+ ignore_file: IgnoreFile = IgnoreFile(),
+ # keep this for backward compatibility
+ **kwargs: Any,
+) -> Iterable[Tuple[str, Union[str, Any]]]:
+ """Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage.
+ e.g.
+
+ [/mnt/c/Users/dipeck/upload_files/my_file1.txt,
+ /mnt/c/Users/dipeck/upload_files/my_file2.txt] -->
+
+ [(/mnt/c/Users/dipeck/upload_files/my_file1.txt, LocalUpload/<guid>/upload_files/my_file1.txt),
+ (/mnt/c/Users/dipeck/upload_files/my_file2.txt, LocalUpload/<guid>/upload_files/my_file2.txt))]
+
+ :param root: Root directory path
+ :type root: str
+ :param files: List of all file paths in the directory
+ :type files: List[str]
+ :keyword prefix: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
+ :paramtype prefix: str
+ :keyword ignore_file: The .amlignore or .gitignore file in the project directory
+ :paramtype ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile
+ :return: Zipped list of tuples representing the local path and remote destination path for each file
+ :rtype: Iterable[Tuple[str, Union[str, Any]]]
+ """
+ # Normalize Windows paths. Note that path should be resolved first as long part will be converted to a shortcut in
+ # Windows. For example, C:\Users\too-long-user-name\test will be converted to C:\Users\too-lo~1\test by default.
+ # Refer to https://en.wikipedia.org/wiki/8.3_filename for more details.
+ root = Path(root).resolve().absolute()
+
+ # filter out files excluded by the ignore file
+ # TODO: inner ignore file won't take effect. A merged IgnoreFile need to be generated in code resolution.
+ origin_file_paths = [
+ root.joinpath(filename)
+ for filename in files
+ if not ignore_file.is_file_excluded(root.joinpath(filename).as_posix())
+ ]
+
+ result = []
+ for origin_file_path in origin_file_paths:
+ relative_path = origin_file_path.relative_to(root)
+ result.append(
+ (
+ _resolve_path(origin_file_path).as_posix(),
+ Path(prefix).joinpath(relative_path).as_posix(),
+ )
+ )
+ return result
+
+
+def _resolve_path(path: Path) -> Path:
+ if not path.is_symlink():
+ return path
+
+ link_path = path.resolve()
+ if not link_path.is_absolute():
+ link_path = path.parent.joinpath(link_path).resolve()
+ return _resolve_path(link_path)
+
+
+def generate_asset_id(asset_hash: str, include_directory=True) -> str:
+ asset_id = asset_hash or str(uuid.uuid4())
+ if include_directory:
+ asset_id = "/".join((ARTIFACT_ORIGIN, asset_id))
+ return asset_id
+
+
+def get_directory_size(
+ root: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile(None)
+) -> Tuple[int, Dict[str, int]]:
+ """Returns total size of a directory and a dictionary itemizing each sub- path and its size.
+
+ If an optional ignore_file argument is provided, then files specified in the ignore file are not included in the
+ directory size calculation.
+
+ :param root: The directory to calculate the size of
+ :type root: Union[str, os.PathLike]
+ :param ignore_file: An ignore file that specifies files to ignore when computing the size
+ :type ignore_file: IgnoreFile
+ :return: The computed size of the directory, and the sizes of the child paths
+ :rtype: Tuple[int, Dict[str, int]]
+ """
+ total_size = 0
+ size_list = {}
+ for dirpath, _, filenames in os.walk(root, followlinks=True):
+ for name in filenames:
+ full_path = os.path.join(dirpath, name)
+ # Don't count files that are excluded by an ignore file
+ if ignore_file.is_file_excluded(full_path):
+ continue
+ if not os.path.islink(full_path):
+ path_size = os.path.getsize(full_path)
+ else:
+ # ensure we're counting the size of the linked file
+ # os.readlink returns a file path relative to dirpath, and must be
+ # re-joined to get a workable result
+ path_size = os.path.getsize(os.path.join(dirpath, os.readlink(convert_windows_path_to_unix(full_path))))
+ size_list[full_path] = path_size
+ total_size += path_size
+ return total_size, size_list
+
+
+def upload_file(
+ storage_client: Union["BlobStorageClient", "Gen2StorageClient"],
+ source: str,
+ dest: Optional[str] = None,
+ msg: Optional[str] = None,
+ size: int = 0,
+ show_progress: Optional[bool] = None,
+ in_directory: bool = False,
+ callback: Optional[Any] = None,
+) -> None:
+ """Upload a single file to remote storage.
+
+ :param storage_client: Storage client object
+ :type storage_client: Union[
+ azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient,
+ azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient]
+ :param source: Local path to project directory
+ :type source: str
+ :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
+ :type dest: str
+ :param msg: Message to be shown with progress bar (e.g. "Uploading <source>")
+ :type msg: str
+ :param size: Size of the file in bytes
+ :type size: int
+ :param show_progress: Whether to show progress bar or not
+ :type show_progress: bool
+ :param in_directory: Whether the file is part of a directory of files
+ :type in_directory: bool
+ :param callback: Callback to progress bar
+ :type callback: Any
+ :return: None
+ """
+ validate_content = size > 0 # don't do checksum for empty files
+
+ if (
+ type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME
+ ): # Only for Gen2StorageClient, Blob Storage doesn't have true directories
+ if in_directory:
+ storage_client.temp_sub_directory_client = None
+ file_name_tail = dest.split(os.path.sep)[-1]
+ # Indexing from 2 because the first two parts of the remote path will always be LocalUpload/<asset_id>
+ all_sub_folders = dest.split(os.path.sep)[2:-1]
+
+ # Create remote directories for each nested directory if file is in a nested directory
+ for sub_folder in all_sub_folders:
+ if storage_client.temp_sub_directory_client:
+ storage_client.temp_sub_directory_client = (
+ storage_client.temp_sub_directory_client.create_sub_directory(sub_folder)
+ )
+ else:
+ storage_client.temp_sub_directory_client = storage_client.directory_client.create_sub_directory(
+ sub_folder
+ )
+
+ storage_client.file_client = storage_client.temp_sub_directory_client.create_file(file_name_tail)
+ else:
+ storage_client.file_client = storage_client.directory_client.create_file(source.split("/")[-1])
+
+ with open(source, "rb") as data:
+ if show_progress and not in_directory:
+ file_size, _ = get_directory_size(source)
+ file_size_in_mb = file_size / 10**6
+ if file_size_in_mb < 1:
+ msg += Fore.GREEN + " (< 1 MB)"
+ else:
+ msg += Fore.GREEN + f" ({round(file_size_in_mb, 2)} MBs)"
+ cntx_manager = FileUploadProgressBar(msg=msg)
+ else:
+ cntx_manager = suppress()
+
+ with cntx_manager as c:
+ callback = c.update_to if (show_progress and not in_directory) else None
+ if type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME:
+ storage_client.file_client.upload_data(
+ data=data.read(),
+ overwrite=True,
+ validate_content=validate_content,
+ raw_response_hook=callback,
+ max_concurrency=MAX_CONCURRENCY,
+ )
+ elif type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME:
+ storage_client.container_client.upload_blob(
+ name=dest,
+ data=data,
+ validate_content=validate_content,
+ overwrite=storage_client.overwrite,
+ raw_response_hook=callback,
+ max_concurrency=MAX_CONCURRENCY,
+ connection_timeout=DEFAULT_CONNECTION_TIMEOUT,
+ )
+
+ storage_client.uploaded_file_count += 1
+
+
+def upload_directory(
+ storage_client: Union["BlobStorageClient", "Gen2StorageClient"],
+ source: Union[str, os.PathLike],
+ dest: str,
+ msg: str,
+ show_progress: bool,
+ ignore_file: IgnoreFile,
+) -> None:
+ """Upload directory to remote storage.
+
+ :param storage_client: Storage client object
+ :type storage_client: Union[
+ azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient,
+ azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient]
+ :param source: Local path to project directory
+ :type source: Union[str, os.PathLike]
+ :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
+ :type dest: str
+ :param msg: Message to be shown with progress bar (e.g. "Uploading <source>")
+ :type msg: str
+ :param show_progress: Whether to show progress bar or not
+ :type show_progress: bool
+ :param ignore_file: The .amlignore or .gitignore file in the project directory
+ :type ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile
+ :return: None
+ """
+ source_path = Path(source).resolve()
+ prefix = "" if dest == "" else dest + "/"
+ prefix += os.path.basename(source_path) + "/"
+
+ if (
+ type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME
+ ): # Only for Gen2StorageClient, Blob Storage doesn't have true directories
+ storage_client.sub_directory_client = storage_client.directory_client.create_sub_directory(
+ prefix.strip("/").split("/")[-1]
+ )
+
+ # Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage
+ upload_paths = get_upload_files_from_folder(
+ source_path,
+ prefix=prefix,
+ ignore_file=ignore_file,
+ )
+ size_dict = {}
+ total_size = 0
+
+ # Get each file's size for progress bar tracking
+ for path, _ in upload_paths:
+ # TODO: symbol links are already resolved
+ if os.path.islink(path):
+ path_size = os.path.getsize(
+ os.readlink(convert_windows_path_to_unix(path))
+ ) # ensure we're counting the size of the linked file
+ else:
+ path_size = os.path.getsize(path)
+ size_dict[path] = path_size
+ total_size += path_size
+
+ upload_paths = sorted(upload_paths)
+ if len(upload_paths) == 0:
+ raise EmptyDirectoryError(
+ message=EMPTY_DIRECTORY_ERROR.format(source),
+ no_personal_data_message=msg.format("[source]"),
+ target=ErrorTarget.ARTIFACT,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+ storage_client.total_file_count = len(upload_paths)
+
+ if (
+ type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME
+ ): # Only for Gen2StorageClient, Blob Storage doesn't have true directories
+ # Only for BlobStorageClient
+ # Azure Blob doesn't allow metadata setting at the directory level, so the first
+ # file in the directory is designated as the file where the confirmation metadata
+ # will be added at the end of the upload.
+ storage_client.indicator_file = upload_paths[0][1]
+ storage_client.check_blob_exists()
+
+ # Submit paths to workers for upload
+ num_cores = int(cpu_count()) * PROCESSES_PER_CORE
+ with ThreadPoolExecutor(max_workers=num_cores) as ex:
+ futures_dict = {
+ ex.submit(
+ upload_file,
+ storage_client=storage_client,
+ source=src,
+ dest=dest,
+ size=size_dict.get(src),
+ in_directory=True,
+ show_progress=show_progress,
+ ): (src, dest)
+ for (src, dest) in upload_paths
+ }
+ if show_progress:
+ warnings.simplefilter("ignore", category=TqdmWarning)
+ msg += f" ({round(total_size/10**6, 2)} MBs)"
+ is_windows = system() == "Windows" # Default unicode progress bar doesn't display well on Windows
+ with tqdm(total=total_size, desc=msg, ascii=is_windows) as pbar:
+ for future in as_completed(futures_dict):
+ future.result() # access result to propagate any exceptions
+ file_path_name = futures_dict[future][0]
+ pbar.update(size_dict.get(file_path_name) or 0)
+
+
+@retry(
+ exceptions=ResourceExistsError,
+ failure_msg="Asset creation exceeded maximum retries.",
+ logger=module_logger,
+ max_attempts=MAX_AUTOINCREMENT_ATTEMPTS,
+)
+def _create_or_update_autoincrement(
+ name: str,
+ body: Any,
+ version_operation: Any,
+ container_operation: Any,
+ resource_group_name: str,
+ workspace_name: str,
+ **kwargs,
+) -> Any:
+ try:
+ container = container_operation.get(
+ name=name,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ **kwargs,
+ )
+ version = container.properties.next_version
+
+ except ResourceNotFoundError:
+ version = "1"
+
+ result = version_operation.create_or_update(
+ name=name,
+ version=version,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ body=body,
+ **kwargs,
+ )
+ return result
+
+
+def _get_next_version_from_container(
+ name: str,
+ container_operation: Any,
+ resource_group_name: str,
+ workspace_name: str,
+ registry_name: str = None,
+ **kwargs,
+) -> str:
+ try:
+ container = (
+ container_operation.get(
+ name=name,
+ resource_group_name=resource_group_name,
+ registry_name=registry_name,
+ **kwargs,
+ )
+ if registry_name
+ else container_operation.get(
+ name=name,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ **kwargs,
+ )
+ )
+ version = container.properties.next_version
+
+ except ResourceNotFoundError:
+ version = "1"
+ return version
+
+
+def _get_next_latest_versions_from_container(
+ name: str,
+ container_operation: Any,
+ resource_group_name: str,
+ workspace_name: str,
+ registry_name: str = None,
+ **kwargs,
+) -> str:
+ try:
+ container = (
+ container_operation.get(
+ name=name,
+ resource_group_name=resource_group_name,
+ registry_name=registry_name,
+ **kwargs,
+ )
+ if registry_name
+ else container_operation.get(
+ name=name,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ **kwargs,
+ )
+ )
+ next_version = container.properties.next_version
+ latest_version = container.properties.latest_version
+
+ except ResourceNotFoundError:
+ next_version = "1"
+ latest_version = "1"
+ return next_version, latest_version
+
+
+def _get_latest_version_from_container(
+ asset_name: str,
+ container_operation: Any,
+ resource_group_name: str,
+ workspace_name: Optional[str] = None,
+ registry_name: Optional[str] = None,
+ **kwargs,
+) -> str:
+ try:
+ container = (
+ container_operation.get(
+ name=asset_name,
+ resource_group_name=resource_group_name,
+ registry_name=registry_name,
+ **kwargs,
+ )
+ if registry_name
+ else container_operation.get(
+ name=asset_name,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ **kwargs,
+ )
+ )
+ version = container.properties.latest_version
+
+ except ResourceNotFoundError as e:
+ message = (
+ f"Asset {asset_name} does not exist in registry {registry_name}."
+ if registry_name
+ else f"Asset {asset_name} does not exist in workspace {workspace_name}."
+ )
+ no_personal_data_message = (
+ "Asset {asset_name} does not exist in registry {registry_name}."
+ if registry_name
+ else "Asset {asset_name} does not exist in workspace {workspace_name}."
+ )
+ raise ValidationException(
+ message=message,
+ no_personal_data_message=no_personal_data_message,
+ target=ErrorTarget.ASSET,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+ ) from e
+ return version
+
+
+def _get_latest(
+ asset_name: str,
+ version_operation: Any,
+ resource_group_name: str,
+ workspace_name: Optional[str] = None,
+ registry_name: Optional[str] = None,
+ order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC] = OrderString.CREATED_AT_DESC,
+ **kwargs,
+) -> Union[ModelVersionData, DataVersionBaseData]:
+ """Retrieve the latest version of the asset with the given name.
+
+ Latest is defined as the most recently created, not the most recently updated.
+
+ :param asset_name: The asset name
+ :type asset_name: str
+ :param version_operation: Any
+ :type version_operation: Any
+ :param resource_group_name: The resource group name
+ :type resource_group_name: str
+ :param workspace_name: The workspace name
+ :type workspace_name: Optional[str]
+ :param registry_name: The registry name
+ :type registry_name: Optional[str]
+ :param order_by: Specifies how to order the results. Defaults to :attr:`OrderString.CREATED_AT_DESC`
+ :type order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC]
+ :return: The latest version of the requested asset
+ :rtype: Union[ModelVersionData, DataVersionBaseData]
+ """
+ result = (
+ version_operation.list(
+ name=asset_name,
+ resource_group_name=resource_group_name,
+ registry_name=registry_name,
+ order_by=order_by,
+ top=1,
+ **kwargs,
+ )
+ if registry_name
+ else version_operation.list(
+ name=asset_name,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ order_by=order_by,
+ top=1,
+ **kwargs,
+ )
+ )
+ try:
+ latest = result.next()
+ except StopIteration:
+ latest = None
+
+ if latest and isinstance(latest, ModelVersionResourceArmPaginatedResult):
+ # Data list return object doesn't require this since its elements are already DatasetVersionResources
+ latest = cast(ModelVersionData, latest)
+ if not latest:
+ message = (
+ f"Asset {asset_name} does not exist in registry {registry_name}."
+ if registry_name
+ else f"Asset {asset_name} does not exist in workspace {workspace_name}."
+ )
+ no_personal_data_message = (
+ "Asset {asset_name} does not exist in registry {registry_name}."
+ if registry_name
+ else "Asset {asset_name} does not exist in workspace {workspace_name}."
+ )
+ raise ValidationException(
+ message=message,
+ no_personal_data_message=no_personal_data_message,
+ target=ErrorTarget.ASSET,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+ )
+ return latest
+
+
+def _archive_or_restore(
+ asset_operations: Union[
+ "DataOperations",
+ "EnvironmentOperations",
+ "ModelOperations",
+ "ComponentOperations",
+ ],
+ version_operation: Union[
+ "DataVersionsOperations",
+ "EnvironmentVersionsOperations",
+ "ModelVersionsOperations",
+ "ComponentVersionsOperations",
+ ],
+ container_operation: Union[
+ "DataContainersOperations",
+ "EnvironmentContainersOperations",
+ "ModelContainersOperations",
+ "ComponentContainersOperations",
+ ],
+ is_archived: bool,
+ name: str,
+ version: Optional[str] = None,
+ label: Optional[str] = None,
+) -> None:
+ resource_group_name = asset_operations._operation_scope._resource_group_name
+ workspace_name = asset_operations._workspace_name
+ registry_name = asset_operations._registry_name
+ if version and label:
+ msg = "Cannot specify both version and label."
+ raise ValidationException(
+ message=msg,
+ no_personal_data_message=msg,
+ target=ErrorTarget.ASSET,
+ error_category=ErrorCategory.USER_ERROR,
+ error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+ )
+ if label:
+ version = _resolve_label_to_asset(asset_operations, name, label).version
+
+ if version:
+ version_resource = (
+ version_operation.get(
+ name=name,
+ version=version,
+ resource_group_name=resource_group_name,
+ registry_name=registry_name,
+ )
+ if registry_name
+ else version_operation.get(
+ name=name,
+ version=version,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ )
+ )
+ version_resource.properties.is_archived = is_archived
+ version_resource.properties.stage = None
+ ( # pylint: disable=expression-not-assigned
+ version_operation.begin_create_or_update(
+ name=name,
+ version=version,
+ resource_group_name=resource_group_name,
+ registry_name=registry_name,
+ body=version_resource,
+ )
+ if registry_name
+ else version_operation.create_or_update(
+ name=name,
+ version=version,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ body=version_resource,
+ )
+ )
+ else:
+ container_resource = (
+ container_operation.get(
+ name=name,
+ resource_group_name=resource_group_name,
+ registry_name=registry_name,
+ )
+ if registry_name
+ else container_operation.get(
+ name=name,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ )
+ )
+ container_resource.properties.is_archived = is_archived
+ ( # pylint: disable=expression-not-assigned
+ container_operation.create_or_update(
+ name=name,
+ resource_group_name=resource_group_name,
+ registry_name=registry_name,
+ body=container_resource,
+ )
+ if registry_name
+ else container_operation.create_or_update(
+ name=name,
+ resource_group_name=resource_group_name,
+ workspace_name=workspace_name,
+ body=container_resource,
+ )
+ )
+
+
+def _resolve_label_to_asset(
+ assetOperations: Union[
+ "DataOperations",
+ "ComponentOperations",
+ "EnvironmentOperations",
+ "ModelOperations",
+ "IndexOperations",
+ ],
+ name: str,
+ label: str,
+) -> Asset:
+ """Returns the asset referred to by the given label.
+
+ Throws if label does not refer to a version of the named asset
+
+ :param assetOperations: The operations class used to retrieve the asset
+ :type assetOperations:
+ Union["DataOperations", "ComponentOperations", "EnvironmentOperations", "ModelOperations", "IndexOperations"]
+ :param name: The name of the asset
+ :type name: str
+ :param label: The label to resolve
+ :type label: str
+ :return: The requested asset
+ :rtype: Asset
+ """
+
+ resolver = assetOperations._managed_label_resolver.get(label, None)
+ if not resolver:
+ scope = "registry" if assetOperations._registry_name else "workspace"
+ msg = "Asset {} with version label {} does not exist in {}."
+ raise ValidationException(
+ message=msg.format(name, label, scope),
+ no_personal_data_message=msg.format("[name]", "[label]", "[scope]"),
+ target=ErrorTarget.ASSET,
+ error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
+ )
+ return resolver(name)
+
+
+def _check_or_modify_auto_delete_setting(
+ autoDeleteSetting: Union[Dict, "AutoDeleteSetting"],
+):
+ if autoDeleteSetting is not None:
+ if hasattr(autoDeleteSetting, "condition"):
+ condition = getattr(autoDeleteSetting, "condition")
+ condition = snake_to_camel(condition)
+ setattr(autoDeleteSetting, "condition", condition)
+ elif "condition" in autoDeleteSetting:
+ autoDeleteSetting["condition"] = snake_to_camel(autoDeleteSetting["condition"])
+
+
+def _validate_workspace_managed_datastore(path: Optional[Union[str, PathLike]]) -> Optional[Union[str, PathLike]]:
+ # block cumtomer specified path on managed datastore
+ if path.startswith(WORKSPACE_MANAGED_DATASTORE_WITH_SLASH) or path == WORKSPACE_MANAGED_DATASTORE:
+ path = path.rstrip("/")
+
+ if path != WORKSPACE_MANAGED_DATASTORE:
+ raise AssetPathException(
+ message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
+ tartget=ErrorTarget.DATA,
+ no_personal_data_message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+ return path + "/paths"
+ return path
+
+
+def _validate_auto_delete_setting_in_data_output(
+ auto_delete_setting: Optional[Union[Dict, "AutoDeleteSetting"]]
+) -> None:
+ # avoid specifying auto_delete_setting in job output now
+ if auto_delete_setting:
+ raise ValidationException(
+ message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
+ tartget=ErrorTarget.DATA,
+ no_personal_data_message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
+ error_category=ErrorCategory.USER_ERROR,
+ )
+
+
+class FileUploadProgressBar(tqdm):
+ def __init__(self, msg: Optional[str] = None):
+ warnings.simplefilter("ignore", category=TqdmWarning)
+ is_windows = system() == "Windows" # Default unicode progress bar doesn't display well on Windows
+ super().__init__(unit="B", unit_scale=True, desc=msg, ascii=is_windows)
+
+ def update_to(self, response):
+ current = response.context["upload_stream_current"]
+ self.total = response.context["data_stream_total"]
+ if current:
+ self.update(current - self.n)
+
+
+class DirectoryUploadProgressBar(tqdm):
+ def __init__(self, dir_size: int, msg: Optional[str] = None):
+ super().__init__(unit="B", unit_scale=True, desc=msg, colour="green")
+ self.total = dir_size
+ self.completed = 0
+
+ def update_to(self, response):
+ current = None
+ if response.context["upload_stream_current"]:
+ current = response.context["upload_stream_current"] + self.completed
+ self.completed = current
+ if current:
+ self.update(current - self.n)
+
+
+def get_storage_info_for_non_registry_asset(
+ service_client, workspace_name: str, name: str, version: str, resource_group: str
+) -> Dict[str, str]:
+ """Get SAS uri and blob uri for non-registry asset. Note that this function won't return the same
+ SAS uri and blob uri for the same asset. It will return a new SAS uri and blob uri every time it is called.
+ :param service_client: Service client
+ :type service_client: AzureMachineLearningWorkspaces
+ :param workspace_name: The workspace name
+ :type workspace_name: str
+ :param name: Asset name
+ :type name: str
+ :param version: Asset version
+ :type version: str
+ :param resource_group: Resource group
+ :type resource_group: str
+ :return: The sas_uri and blob_uri
+ :rtype: Dict[str, str]
+ """
+ request_body = PendingUploadRequestDto(pending_upload_type="TemporaryBlobReference")
+ response = service_client.code_versions.create_or_get_start_pending_upload(
+ resource_group_name=resource_group,
+ workspace_name=workspace_name,
+ name=name,
+ version=version,
+ body=request_body,
+ )
+
+ sas_info = {
+ "sas_uri": response.blob_reference_for_consumption.credential.sas_uri,
+ "blob_uri": response.blob_reference_for_consumption.blob_uri,
+ }
+
+ return sas_info
+
+
+def _get_existing_asset_name_and_version(existing_asset):
+ import re
+
+ regex = r"/codes/([^/]+)/versions/([^/]+)"
+
+ arm_id = existing_asset.id
+ match = re.search(regex, arm_id)
+ name = match.group(1)
+ version = match.group(2)
+
+ return name, version