aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/ml/_utils/_asset_utils.py
blob: 7081d9d6f70f4e373344eff07690fd80af1693b7 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access,too-many-lines

import hashlib
import logging
import os
import json
import uuid
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import suppress
from multiprocessing import cpu_count
from os import PathLike
from pathlib import Path
from platform import system
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Union,
    cast,
)

from colorama import Fore
from tqdm import TqdmWarning, tqdm
from typing_extensions import Literal

from azure.ai.ml._artifacts._constants import (
    AML_IGNORE_FILE_NAME,
    ARTIFACT_ORIGIN,
    AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
    BLOB_STORAGE_CLIENT_NAME,
    CHUNK_SIZE,
    DEFAULT_CONNECTION_TIMEOUT,
    EMPTY_DIRECTORY_ERROR,
    GEN2_STORAGE_CLIENT_NAME,
    GIT_IGNORE_FILE_NAME,
    INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
    MAX_CONCURRENCY,
    PROCESSES_PER_CORE,
    UPLOAD_CONFIRMATION,
    WORKSPACE_MANAGED_DATASTORE,
    WORKSPACE_MANAGED_DATASTORE_WITH_SLASH,
)
from azure.ai.ml._restclient.v2022_02_01_preview.operations import (
    ComponentContainersOperations,
    ComponentVersionsOperations,
    DataContainersOperations,
    DataVersionsOperations,
    EnvironmentContainersOperations,
    EnvironmentVersionsOperations,
    ModelContainersOperations,
    ModelVersionsOperations,
)
from azure.ai.ml._restclient.v2022_05_01.models import (
    DataVersionBaseData,
    ModelVersionData,
    ModelVersionResourceArmPaginatedResult,
)
from azure.ai.ml._restclient.v2023_04_01.models import PendingUploadRequestDto
from azure.ai.ml._utils._pathspec import GitWildMatchPattern, normalize_file
from azure.ai.ml._utils.utils import convert_windows_path_to_unix, retry, snake_to_camel
from azure.ai.ml.constants._common import (
    MAX_AUTOINCREMENT_ATTEMPTS,
    DefaultOpenEncoding,
    OrderString,
)
from azure.ai.ml.entities._assets.asset import Asset
from azure.ai.ml.exceptions import (
    AssetPathException,
    EmptyDirectoryError,
    ErrorCategory,
    ErrorTarget,
    ValidationErrorType,
    ValidationException,
)
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError

if TYPE_CHECKING:
    from azure.ai.ml.operations import (
        ComponentOperations,
        DataOperations,
        EnvironmentOperations,
        IndexOperations,
        ModelOperations,
    )

hash_type = type(hashlib.md5())  # nosec

module_logger = logging.getLogger(__name__)


class AssetNotChangedError(Exception):
    pass


class IgnoreFile(object):
    def __init__(self, file_path: Optional[Union[str, os.PathLike]] = None):
        """Base class for handling .gitignore and .amlignore files.

        :param file_path: Relative path, or absolute path to the ignore file.
        """
        path = Path(file_path).resolve() if file_path else None
        self._path = path
        self._path_spec = None

    def exists(self) -> bool:
        """Checks if ignore file exists.
        :return: True if file exists. False Otherwise
        :rtype: bool
        """
        return self._file_exists()

    def _file_exists(self) -> bool:
        return self._path and self._path.exists()

    @property
    def base_path(self) -> Path:
        return self._path.parent

    def _get_ignore_list(self) -> List[str]:
        """Get ignore list from ignore file contents.

        :return: The lines of the ignore file
        :rtype: List[str]
        """
        if not self.exists():
            return []
        if self._file_exists():
            with open(self._path, "r", encoding=DefaultOpenEncoding.READ) as fh:
                return [line.rstrip() for line in fh if line]
        return []

    def _create_pathspec(self) -> List[GitWildMatchPattern]:
        """Creates path specification based on ignore list.

        :return: Path specification
        :rtype: List[GitWildMatchPattern]
        """
        return [GitWildMatchPattern(ignore) for ignore in self._get_ignore_list()]

    def _get_rel_path(self, file_path: Union[str, os.PathLike]) -> Optional[str]:
        """Get relative path of given file_path.

        :param file_path: A file path
        :type file_path: Union[str, os.PathLike]
        :return: file_path relative to base_path, if computable. None otherwise
        :rtype: Optional[str]
        """
        file_path = Path(file_path).absolute()
        try:
            # use os.path.relpath instead of Path.relative_to in case file_path is not a child of self.base_path
            return os.path.relpath(file_path, self.base_path)
        except ValueError:
            # 2 paths are on different drives
            return None

    def is_file_excluded(self, file_path: Union[str, os.PathLike]) -> bool:
        """Checks if given file_path is excluded.

        :param file_path: File path to be checked against ignore file specifications
        :type file_path: Union[str, os.PathLike]
        :return: Whether the file is excluded by ignore file
        :rtype: bool
        """
        # TODO: current design of ignore file can't distinguish between files and directories of the same name
        if self._path_spec is None:
            self._path_spec = self._create_pathspec()
        if not self._path_spec:
            return False
        file_path = self._get_rel_path(file_path)
        if file_path is None:
            return True

        norm_file = normalize_file(file_path)
        matched = False
        for pattern in self._path_spec:
            if pattern.include is not None:
                if pattern.match_file(norm_file) is not None:
                    matched = pattern.include

        return matched

    @property
    def path(self) -> Union[Path, str]:
        return self._path


class AmlIgnoreFile(IgnoreFile):
    def __init__(self, directory_path: Union[Path, str]):
        file_path = Path(directory_path).joinpath(AML_IGNORE_FILE_NAME)
        super(AmlIgnoreFile, self).__init__(file_path)


class GitIgnoreFile(IgnoreFile):
    def __init__(self, directory_path: Union[Path, str]):
        file_path = Path(directory_path).joinpath(GIT_IGNORE_FILE_NAME)
        super(GitIgnoreFile, self).__init__(file_path)


def get_ignore_file(directory_path: Union[Path, str]) -> IgnoreFile:
    """Finds and returns IgnoreFile object based on ignore file found in directory_path.

    .amlignore takes precedence over .gitignore and if no file is found, an empty
    IgnoreFile object will be returned.

    The ignore file must be in the root directory.

    :param directory_path: Path to the (root) directory where ignore file is located
    :type directory_path: Union[Path, str]
    :return: The IgnoreFile found in the directory
    :rtype: IgnoreFile
    """
    aml_ignore = AmlIgnoreFile(directory_path)
    git_ignore = GitIgnoreFile(directory_path)

    if aml_ignore.exists():
        return aml_ignore
    if git_ignore.exists():
        return git_ignore
    return IgnoreFile()


def _validate_path(path: Union[str, os.PathLike], _type: str) -> None:
    path = Path(path)  # Okay to do this since Path is idempotent
    if not path.is_file() and not path.is_dir():
        raise ValidationException(
            message=f"No such file or directory: {path}",
            target=_type,
            error_type=ValidationErrorType.FILE_OR_FOLDER_NOT_FOUND,
            no_personal_data_message="No such file or directory",
            error_category=ErrorCategory.USER_ERROR,
        )


def _parse_name_version(
    name: Optional[str] = None, version_as_int: bool = True
) -> Tuple[Optional[str], Optional[Union[str, int]]]:
    if not name:
        return None, None

    token_list = name.split(":")
    if len(token_list) == 1:
        return name, None
    *name, version = token_list
    if version_as_int:
        version = int(version)
    return ":".join(name), version


def _get_file_hash(filename: Union[str, os.PathLike], _hash: hash_type) -> hash_type:
    with open(str(filename), "rb") as f:
        for chunk in iter(lambda: f.read(CHUNK_SIZE), b""):
            _hash.update(chunk)
    return _hash


def delete_two_catalog_files(path):
    """
    Function that deletes the "catalog.json" and "catalog.json.sig" files located at 'path', if they exist

    :param path: Path to the folder for signing
    :type path: Union[Path, str]
    :return: None
    """
    # catalog.json
    file_path_json = os.path.join(path, "catalog.json")
    if os.path.exists(file_path_json):
        module_logger.warning("%s already exists. Deleting it", file_path_json)
        os.remove(file_path_json)
    # catalog.json.sig
    file_path_json_sig = os.path.join(path, "catalog.json.sig")
    if os.path.exists(file_path_json_sig):
        module_logger.warning("%s already exists. Deleting it", file_path_json_sig)
        os.remove(file_path_json_sig)


def create_catalog_files(path, json_stub):
    with open(os.path.join(path, "catalog.json"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile1:
        json.dump(json_stub, jsonFile1)
    with open(os.path.join(path, "catalog.json.sig"), "w", encoding=DefaultOpenEncoding.WRITE) as jsonFile2:
        json.dump(json_stub, jsonFile2)


def _get_dir_hash(directory: Union[str, os.PathLike], _hash: hash_type, ignore_file: IgnoreFile) -> hash_type:
    dir_contents = Path(directory).iterdir()
    sorted_contents = sorted(dir_contents, key=lambda path: str(path).lower())
    for path in sorted_contents:
        if ignore_file.is_file_excluded(path):
            continue
        _hash.update(path.name.encode())
        if os.path.islink(path):  # ensure we're hashing the contents of the linked file
            path = _resolve_path(path)
        if path.is_file():
            _hash = _get_file_hash(path, _hash)
        elif path.is_dir():
            _hash = _get_dir_hash(path, _hash, ignore_file)
    return _hash


def _build_metadata_dict(name: str, version: str) -> Dict[str, str]:
    """Build metadata dictionary to attach to uploaded data.

    Metadata includes an upload confirmation field, and for code uploads only, the name and version of the code asset
    being created for that data.

    :param name: The name of the uploaded data
    :type name: str
    :param version: The version of the uploaded data
    :type version: str
    :return: Metadata dict
    :rtype: Dict[str, str]
    """
    if name:
        linked_asset_arm_id = {"name": name, "version": version}
    else:
        msg = "'name' cannot be NoneType for asset artifact upload."
        raise ValidationException(
            message=msg,
            no_personal_data_message=msg,
            target=ErrorTarget.ASSET,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.INVALID_VALUE,
        )

    metadata_dict = {**UPLOAD_CONFIRMATION, **linked_asset_arm_id}
    return metadata_dict


def get_object_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> str:
    _hash = hashlib.md5(b"Initialize for october 2021 AML CLI version")  # nosec
    if Path(path).is_dir():
        object_hash = _get_dir_hash(directory=path, _hash=_hash, ignore_file=ignore_file)
    else:
        if os.path.islink(path):  # ensure we're hashing the contents of the linked file
            path = _resolve_path(Path(path))
        object_hash = _get_file_hash(filename=path, _hash=_hash)
    return str(object_hash.hexdigest())


def get_content_hash_version():
    return 202208


def get_content_hash(path: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile()) -> Optional[str]:
    """Generating sha256 hash for file/folder,

    e.g. Code snapshot fingerprints to prevent tampering.

    The process of hashing is:
    1. If it's a link, get the actual path of the link.
    2. If it's a file, append file content.
    3. If it's a folder:
        1. list all files under the folder
        2. convert file count to str and append to hash
        3. sort the files by lower case of relative path
        4. for each file append '#'+relative path+'#' and file size to hash
        5. do another iteration on file list to append each files content to hash.
        The example of absolute path to relative path mapping is:
        [
            ('/mnt/c/codehash/code/file1.txt', 'file1.txt'),
            ('/mnt/c/codehash/code/folder1/file1.txt', 'folder1/file1.txt'),
            ('/mnt/c/codehash/code/Folder2/file1.txt', 'Folder2/file1.txt'),
            ('/mnt/c/codehash/code/Folder2/folder1/file1.txt', 'Folder2/folder1/file1.txt')
        ]
    4. Hash the content and convert to hex digest string.

    :param path: The directory to calculate the size of
    :type path: Union[str, os.PathLike]
    :param ignore_file: An ignore file that specifies files to ignore when computing the size
    :type ignore_file: IgnoreFile
    :return: The content hash if the content is a link, directory, or file. None otherwise
    :rtype: Optional[str]
    """
    # DO NOT change this function unless you change the verification logic together
    actual_path = path
    if os.path.islink(path):
        actual_path = _resolve_path(Path(path)).as_posix()
    if os.path.isdir(actual_path):
        return _get_file_list_content_hash(get_upload_files_from_folder(actual_path, ignore_file=ignore_file))
    if os.path.isfile(actual_path):
        return _get_file_list_content_hash([(actual_path, Path(actual_path).name)])
    return None


def get_upload_files_from_folder(
    path: Union[str, os.PathLike],
    *,
    prefix: str = "",
    ignore_file: IgnoreFile = IgnoreFile(),
) -> List[str]:
    path = Path(path)
    upload_paths = []
    for root, _, files in os.walk(path, followlinks=True):
        upload_paths += list(
            traverse_directory(
                root,
                files,
                prefix=Path(prefix).joinpath(Path(root).relative_to(path)).as_posix(),
                ignore_file=ignore_file,
            )
        )
    return upload_paths


def _get_file_list_content_hash(file_list) -> str:
    # file_list is a list of tuples, (absolute_path, relative_path)

    _hash = hashlib.sha256()
    # Add file count to the hash and add '#' around file name then add each file's size to avoid collision like:
    # Case 1:
    # 'a.txt' with contents 'a'
    # 'b.txt' with contents 'b'
    #
    # Case 2:
    # cspell:disable-next-line
    # 'a.txt' with contents 'ab.txtb'
    _hash.update(str(len(file_list)).encode())
    # Sort by "destination" path, since in this function destination prefix is empty and keep the link name in path.
    for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
        _hash.update(("#" + file_name + "#").encode())
        _hash.update(str(os.path.getsize(file_path)).encode())
    for file_path, file_name in sorted(file_list, key=lambda x: str(x[1]).lower()):
        _hash = _get_file_hash(file_path, _hash)
    return str(_hash.hexdigest())


def traverse_directory(  # pylint: disable=unused-argument
    root: str,
    files: List[str],
    *,
    prefix: str,
    ignore_file: IgnoreFile = IgnoreFile(),
    # keep this for backward compatibility
    **kwargs: Any,
) -> Iterable[Tuple[str, Union[str, Any]]]:
    """Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage.
    e.g.

    [/mnt/c/Users/dipeck/upload_files/my_file1.txt,
    /mnt/c/Users/dipeck/upload_files/my_file2.txt] -->

        [(/mnt/c/Users/dipeck/upload_files/my_file1.txt, LocalUpload/<guid>/upload_files/my_file1.txt),
        (/mnt/c/Users/dipeck/upload_files/my_file2.txt, LocalUpload/<guid>/upload_files/my_file2.txt))]

    :param root: Root directory path
    :type root: str
    :param files: List of all file paths in the directory
    :type files: List[str]
    :keyword prefix: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
    :paramtype prefix: str
    :keyword ignore_file: The .amlignore or .gitignore file in the project directory
    :paramtype ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile
    :return: Zipped list of tuples representing the local path and remote destination path for each file
    :rtype: Iterable[Tuple[str, Union[str, Any]]]
    """
    # Normalize Windows paths. Note that path should be resolved first as long part will be converted to a shortcut in
    # Windows. For example, C:\Users\too-long-user-name\test will be converted to C:\Users\too-lo~1\test by default.
    # Refer to https://en.wikipedia.org/wiki/8.3_filename for more details.
    root = Path(root).resolve().absolute()

    # filter out files excluded by the ignore file
    # TODO: inner ignore file won't take effect. A merged IgnoreFile need to be generated in code resolution.
    origin_file_paths = [
        root.joinpath(filename)
        for filename in files
        if not ignore_file.is_file_excluded(root.joinpath(filename).as_posix())
    ]

    result = []
    for origin_file_path in origin_file_paths:
        relative_path = origin_file_path.relative_to(root)
        result.append(
            (
                _resolve_path(origin_file_path).as_posix(),
                Path(prefix).joinpath(relative_path).as_posix(),
            )
        )
    return result


def _resolve_path(path: Path) -> Path:
    if not path.is_symlink():
        return path

    link_path = path.resolve()
    if not link_path.is_absolute():
        link_path = path.parent.joinpath(link_path).resolve()
    return _resolve_path(link_path)


def generate_asset_id(asset_hash: str, include_directory=True) -> str:
    asset_id = asset_hash or str(uuid.uuid4())
    if include_directory:
        asset_id = "/".join((ARTIFACT_ORIGIN, asset_id))
    return asset_id


def get_directory_size(
    root: Union[str, os.PathLike], ignore_file: IgnoreFile = IgnoreFile(None)
) -> Tuple[int, Dict[str, int]]:
    """Returns total size of a directory and a dictionary itemizing each sub- path and its size.

    If an optional ignore_file argument is provided, then files specified in the ignore file are not included in the
    directory size calculation.

    :param root: The directory to calculate the size of
    :type root: Union[str, os.PathLike]
    :param ignore_file: An ignore file that specifies files to ignore when computing the size
    :type ignore_file: IgnoreFile
    :return: The computed size of the directory, and the sizes of the child paths
    :rtype: Tuple[int, Dict[str, int]]
    """
    total_size = 0
    size_list = {}
    for dirpath, _, filenames in os.walk(root, followlinks=True):
        for name in filenames:
            full_path = os.path.join(dirpath, name)
            # Don't count files that are excluded by an ignore file
            if ignore_file.is_file_excluded(full_path):
                continue
            if not os.path.islink(full_path):
                path_size = os.path.getsize(full_path)
            else:
                # ensure we're counting the size of the linked file
                # os.readlink returns a file path relative to dirpath, and must be
                # re-joined to get a workable result
                path_size = os.path.getsize(os.path.join(dirpath, os.readlink(convert_windows_path_to_unix(full_path))))
            size_list[full_path] = path_size
            total_size += path_size
    return total_size, size_list


def upload_file(
    storage_client: Union["BlobStorageClient", "Gen2StorageClient"],
    source: str,
    dest: Optional[str] = None,
    msg: Optional[str] = None,
    size: int = 0,
    show_progress: Optional[bool] = None,
    in_directory: bool = False,
    callback: Optional[Any] = None,
) -> None:
    """Upload a single file to remote storage.

    :param storage_client: Storage client object
    :type storage_client: Union[
        azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient,
        azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient]
    :param source: Local path to project directory
    :type source: str
    :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
    :type dest: str
    :param msg: Message to be shown with progress bar (e.g. "Uploading <source>")
    :type msg: str
    :param size: Size of the file in bytes
    :type size: int
    :param show_progress: Whether to show progress bar or not
    :type show_progress: bool
    :param in_directory: Whether the file is part of a directory of files
    :type in_directory: bool
    :param callback: Callback to progress bar
    :type callback: Any
    :return: None
    """
    validate_content = size > 0  # don't do checksum for empty files

    if (
        type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME
    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
        if in_directory:
            storage_client.temp_sub_directory_client = None
            file_name_tail = dest.split(os.path.sep)[-1]
            # Indexing from 2 because the first two parts of the remote path will always be LocalUpload/<asset_id>
            all_sub_folders = dest.split(os.path.sep)[2:-1]

            # Create remote directories for each nested directory if file is in a nested directory
            for sub_folder in all_sub_folders:
                if storage_client.temp_sub_directory_client:
                    storage_client.temp_sub_directory_client = (
                        storage_client.temp_sub_directory_client.create_sub_directory(sub_folder)
                    )
                else:
                    storage_client.temp_sub_directory_client = storage_client.directory_client.create_sub_directory(
                        sub_folder
                    )

            storage_client.file_client = storage_client.temp_sub_directory_client.create_file(file_name_tail)
        else:
            storage_client.file_client = storage_client.directory_client.create_file(source.split("/")[-1])

    with open(source, "rb") as data:
        if show_progress and not in_directory:
            file_size, _ = get_directory_size(source)
            file_size_in_mb = file_size / 10**6
            if file_size_in_mb < 1:
                msg += Fore.GREEN + " (< 1 MB)"
            else:
                msg += Fore.GREEN + f" ({round(file_size_in_mb, 2)} MBs)"
            cntx_manager = FileUploadProgressBar(msg=msg)
        else:
            cntx_manager = suppress()

        with cntx_manager as c:
            callback = c.update_to if (show_progress and not in_directory) else None
            if type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME:
                storage_client.file_client.upload_data(
                    data=data.read(),
                    overwrite=True,
                    validate_content=validate_content,
                    raw_response_hook=callback,
                    max_concurrency=MAX_CONCURRENCY,
                )
            elif type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME:
                storage_client.container_client.upload_blob(
                    name=dest,
                    data=data,
                    validate_content=validate_content,
                    overwrite=storage_client.overwrite,
                    raw_response_hook=callback,
                    max_concurrency=MAX_CONCURRENCY,
                    connection_timeout=DEFAULT_CONNECTION_TIMEOUT,
                )

    storage_client.uploaded_file_count += 1


def upload_directory(
    storage_client: Union["BlobStorageClient", "Gen2StorageClient"],
    source: Union[str, os.PathLike],
    dest: str,
    msg: str,
    show_progress: bool,
    ignore_file: IgnoreFile,
) -> None:
    """Upload directory to remote storage.

    :param storage_client: Storage client object
    :type storage_client: Union[
        azure.ai.ml._artifacts._blob_storage_helper.BlobStorageClient,
        azure.ai.ml._artifacts._gen2_storage_helper.Gen2StorageClient]
    :param source: Local path to project directory
    :type source: Union[str, os.PathLike]
    :param dest: Remote upload path for project directory (e.g. LocalUpload/<guid>/project_dir)
    :type dest: str
    :param msg: Message to be shown with progress bar (e.g. "Uploading <source>")
    :type msg: str
    :param show_progress: Whether to show progress bar or not
    :type show_progress: bool
    :param ignore_file: The .amlignore or .gitignore file in the project directory
    :type ignore_file: azure.ai.ml._utils._asset_utils.IgnoreFile
    :return: None
    """
    source_path = Path(source).resolve()
    prefix = "" if dest == "" else dest + "/"
    prefix += os.path.basename(source_path) + "/"

    if (
        type(storage_client).__name__ == GEN2_STORAGE_CLIENT_NAME
    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
        storage_client.sub_directory_client = storage_client.directory_client.create_sub_directory(
            prefix.strip("/").split("/")[-1]
        )

    # Enumerate all files in the given directory and compose paths for them to be uploaded to in the remote storage
    upload_paths = get_upload_files_from_folder(
        source_path,
        prefix=prefix,
        ignore_file=ignore_file,
    )
    size_dict = {}
    total_size = 0

    # Get each file's size for progress bar tracking
    for path, _ in upload_paths:
        # TODO: symbol links are already resolved
        if os.path.islink(path):
            path_size = os.path.getsize(
                os.readlink(convert_windows_path_to_unix(path))
            )  # ensure we're counting the size of the linked file
        else:
            path_size = os.path.getsize(path)
        size_dict[path] = path_size
        total_size += path_size

    upload_paths = sorted(upload_paths)
    if len(upload_paths) == 0:
        raise EmptyDirectoryError(
            message=EMPTY_DIRECTORY_ERROR.format(source),
            no_personal_data_message=msg.format("[source]"),
            target=ErrorTarget.ARTIFACT,
            error_category=ErrorCategory.USER_ERROR,
        )
    storage_client.total_file_count = len(upload_paths)

    if (
        type(storage_client).__name__ == BLOB_STORAGE_CLIENT_NAME
    ):  # Only for Gen2StorageClient, Blob Storage doesn't have true directories
        # Only for BlobStorageClient
        # Azure Blob doesn't allow metadata setting at the directory level, so the first
        # file in the directory is designated as the file where the confirmation metadata
        # will be added at the end of the upload.
        storage_client.indicator_file = upload_paths[0][1]
        storage_client.check_blob_exists()

    # Submit paths to workers for upload
    num_cores = int(cpu_count()) * PROCESSES_PER_CORE
    with ThreadPoolExecutor(max_workers=num_cores) as ex:
        futures_dict = {
            ex.submit(
                upload_file,
                storage_client=storage_client,
                source=src,
                dest=dest,
                size=size_dict.get(src),
                in_directory=True,
                show_progress=show_progress,
            ): (src, dest)
            for (src, dest) in upload_paths
        }
        if show_progress:
            warnings.simplefilter("ignore", category=TqdmWarning)
            msg += f" ({round(total_size/10**6, 2)} MBs)"
            is_windows = system() == "Windows"  # Default unicode progress bar doesn't display well on Windows
            with tqdm(total=total_size, desc=msg, ascii=is_windows) as pbar:
                for future in as_completed(futures_dict):
                    future.result()  # access result to propagate any exceptions
                    file_path_name = futures_dict[future][0]
                    pbar.update(size_dict.get(file_path_name) or 0)


@retry(
    exceptions=ResourceExistsError,
    failure_msg="Asset creation exceeded maximum retries.",
    logger=module_logger,
    max_attempts=MAX_AUTOINCREMENT_ATTEMPTS,
)
def _create_or_update_autoincrement(
    name: str,
    body: Any,
    version_operation: Any,
    container_operation: Any,
    resource_group_name: str,
    workspace_name: str,
    **kwargs,
) -> Any:
    try:
        container = container_operation.get(
            name=name,
            resource_group_name=resource_group_name,
            workspace_name=workspace_name,
            **kwargs,
        )
        version = container.properties.next_version

    except ResourceNotFoundError:
        version = "1"

    result = version_operation.create_or_update(
        name=name,
        version=version,
        resource_group_name=resource_group_name,
        workspace_name=workspace_name,
        body=body,
        **kwargs,
    )
    return result


def _get_next_version_from_container(
    name: str,
    container_operation: Any,
    resource_group_name: str,
    workspace_name: str,
    registry_name: str = None,
    **kwargs,
) -> str:
    try:
        container = (
            container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                **kwargs,
            )
            if registry_name
            else container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                **kwargs,
            )
        )
        version = container.properties.next_version

    except ResourceNotFoundError:
        version = "1"
    return version


def _get_next_latest_versions_from_container(
    name: str,
    container_operation: Any,
    resource_group_name: str,
    workspace_name: str,
    registry_name: str = None,
    **kwargs,
) -> str:
    try:
        container = (
            container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                **kwargs,
            )
            if registry_name
            else container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                **kwargs,
            )
        )
        next_version = container.properties.next_version
        latest_version = container.properties.latest_version

    except ResourceNotFoundError:
        next_version = "1"
        latest_version = "1"
    return next_version, latest_version


def _get_latest_version_from_container(
    asset_name: str,
    container_operation: Any,
    resource_group_name: str,
    workspace_name: Optional[str] = None,
    registry_name: Optional[str] = None,
    **kwargs,
) -> str:
    try:
        container = (
            container_operation.get(
                name=asset_name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                **kwargs,
            )
            if registry_name
            else container_operation.get(
                name=asset_name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                **kwargs,
            )
        )
        version = container.properties.latest_version

    except ResourceNotFoundError as e:
        message = (
            f"Asset {asset_name} does not exist in registry {registry_name}."
            if registry_name
            else f"Asset {asset_name} does not exist in workspace {workspace_name}."
        )
        no_personal_data_message = (
            "Asset {asset_name} does not exist in registry {registry_name}."
            if registry_name
            else "Asset {asset_name} does not exist in workspace {workspace_name}."
        )
        raise ValidationException(
            message=message,
            no_personal_data_message=no_personal_data_message,
            target=ErrorTarget.ASSET,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
        ) from e
    return version


def _get_latest(
    asset_name: str,
    version_operation: Any,
    resource_group_name: str,
    workspace_name: Optional[str] = None,
    registry_name: Optional[str] = None,
    order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC] = OrderString.CREATED_AT_DESC,
    **kwargs,
) -> Union[ModelVersionData, DataVersionBaseData]:
    """Retrieve the latest version of the asset with the given name.

    Latest is defined as the most recently created, not the most recently updated.

    :param asset_name: The asset name
    :type asset_name: str
    :param version_operation: Any
    :type version_operation: Any
    :param resource_group_name: The resource group name
    :type resource_group_name: str
    :param workspace_name: The workspace name
    :type workspace_name: Optional[str]
    :param registry_name: The registry name
    :type registry_name: Optional[str]
    :param order_by: Specifies how to order the results. Defaults to :attr:`OrderString.CREATED_AT_DESC`
    :type order_by: Literal[OrderString.CREATED_AT, OrderString.CREATED_AT_DESC]
    :return: The latest version of the requested asset
    :rtype: Union[ModelVersionData, DataVersionBaseData]
    """
    result = (
        version_operation.list(
            name=asset_name,
            resource_group_name=resource_group_name,
            registry_name=registry_name,
            order_by=order_by,
            top=1,
            **kwargs,
        )
        if registry_name
        else version_operation.list(
            name=asset_name,
            resource_group_name=resource_group_name,
            workspace_name=workspace_name,
            order_by=order_by,
            top=1,
            **kwargs,
        )
    )
    try:
        latest = result.next()
    except StopIteration:
        latest = None

    if latest and isinstance(latest, ModelVersionResourceArmPaginatedResult):
        # Data list return object doesn't require this since its elements are already DatasetVersionResources
        latest = cast(ModelVersionData, latest)
    if not latest:
        message = (
            f"Asset {asset_name} does not exist in registry {registry_name}."
            if registry_name
            else f"Asset {asset_name} does not exist in workspace {workspace_name}."
        )
        no_personal_data_message = (
            "Asset {asset_name} does not exist in registry {registry_name}."
            if registry_name
            else "Asset {asset_name} does not exist in workspace {workspace_name}."
        )
        raise ValidationException(
            message=message,
            no_personal_data_message=no_personal_data_message,
            target=ErrorTarget.ASSET,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
        )
    return latest


def _archive_or_restore(
    asset_operations: Union[
        "DataOperations",
        "EnvironmentOperations",
        "ModelOperations",
        "ComponentOperations",
    ],
    version_operation: Union[
        "DataVersionsOperations",
        "EnvironmentVersionsOperations",
        "ModelVersionsOperations",
        "ComponentVersionsOperations",
    ],
    container_operation: Union[
        "DataContainersOperations",
        "EnvironmentContainersOperations",
        "ModelContainersOperations",
        "ComponentContainersOperations",
    ],
    is_archived: bool,
    name: str,
    version: Optional[str] = None,
    label: Optional[str] = None,
) -> None:
    resource_group_name = asset_operations._operation_scope._resource_group_name
    workspace_name = asset_operations._workspace_name
    registry_name = asset_operations._registry_name
    if version and label:
        msg = "Cannot specify both version and label."
        raise ValidationException(
            message=msg,
            no_personal_data_message=msg,
            target=ErrorTarget.ASSET,
            error_category=ErrorCategory.USER_ERROR,
            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
        )
    if label:
        version = _resolve_label_to_asset(asset_operations, name, label).version

    if version:
        version_resource = (
            version_operation.get(
                name=name,
                version=version,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
            )
            if registry_name
            else version_operation.get(
                name=name,
                version=version,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
            )
        )
        version_resource.properties.is_archived = is_archived
        version_resource.properties.stage = None
        (  # pylint: disable=expression-not-assigned
            version_operation.begin_create_or_update(
                name=name,
                version=version,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                body=version_resource,
            )
            if registry_name
            else version_operation.create_or_update(
                name=name,
                version=version,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                body=version_resource,
            )
        )
    else:
        container_resource = (
            container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
            )
            if registry_name
            else container_operation.get(
                name=name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
            )
        )
        container_resource.properties.is_archived = is_archived
        (  # pylint: disable=expression-not-assigned
            container_operation.create_or_update(
                name=name,
                resource_group_name=resource_group_name,
                registry_name=registry_name,
                body=container_resource,
            )
            if registry_name
            else container_operation.create_or_update(
                name=name,
                resource_group_name=resource_group_name,
                workspace_name=workspace_name,
                body=container_resource,
            )
        )


def _resolve_label_to_asset(
    assetOperations: Union[
        "DataOperations",
        "ComponentOperations",
        "EnvironmentOperations",
        "ModelOperations",
        "IndexOperations",
    ],
    name: str,
    label: str,
) -> Asset:
    """Returns the asset referred to by the given label.

    Throws if label does not refer to a version of the named asset

    :param assetOperations: The operations class used to retrieve the asset
    :type assetOperations:
        Union["DataOperations", "ComponentOperations", "EnvironmentOperations", "ModelOperations", "IndexOperations"]
    :param name: The name of the asset
    :type name: str
    :param label: The label to resolve
    :type label: str
    :return: The requested asset
    :rtype: Asset
    """

    resolver = assetOperations._managed_label_resolver.get(label, None)
    if not resolver:
        scope = "registry" if assetOperations._registry_name else "workspace"
        msg = "Asset {} with version label {} does not exist in {}."
        raise ValidationException(
            message=msg.format(name, label, scope),
            no_personal_data_message=msg.format("[name]", "[label]", "[scope]"),
            target=ErrorTarget.ASSET,
            error_type=ValidationErrorType.RESOURCE_NOT_FOUND,
        )
    return resolver(name)


def _check_or_modify_auto_delete_setting(
    autoDeleteSetting: Union[Dict, "AutoDeleteSetting"],
):
    if autoDeleteSetting is not None:
        if hasattr(autoDeleteSetting, "condition"):
            condition = getattr(autoDeleteSetting, "condition")
            condition = snake_to_camel(condition)
            setattr(autoDeleteSetting, "condition", condition)
        elif "condition" in autoDeleteSetting:
            autoDeleteSetting["condition"] = snake_to_camel(autoDeleteSetting["condition"])


def _validate_workspace_managed_datastore(path: Optional[Union[str, PathLike]]) -> Optional[Union[str, PathLike]]:
    # block cumtomer specified path on managed datastore
    if path.startswith(WORKSPACE_MANAGED_DATASTORE_WITH_SLASH) or path == WORKSPACE_MANAGED_DATASTORE:
        path = path.rstrip("/")

        if path != WORKSPACE_MANAGED_DATASTORE:
            raise AssetPathException(
                message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
                tartget=ErrorTarget.DATA,
                no_personal_data_message=INVALID_MANAGED_DATASTORE_PATH_ERROR_NO_PERSONAL_DATA,
                error_category=ErrorCategory.USER_ERROR,
            )

        return path + "/paths"
    return path


def _validate_auto_delete_setting_in_data_output(
    auto_delete_setting: Optional[Union[Dict, "AutoDeleteSetting"]]
) -> None:
    # avoid specifying auto_delete_setting in job output now
    if auto_delete_setting:
        raise ValidationException(
            message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
            tartget=ErrorTarget.DATA,
            no_personal_data_message=AUTO_DELETE_SETTING_NOT_ALLOWED_ERROR_NO_PERSONAL_DATA,
            error_category=ErrorCategory.USER_ERROR,
        )


class FileUploadProgressBar(tqdm):
    def __init__(self, msg: Optional[str] = None):
        warnings.simplefilter("ignore", category=TqdmWarning)
        is_windows = system() == "Windows"  # Default unicode progress bar doesn't display well on Windows
        super().__init__(unit="B", unit_scale=True, desc=msg, ascii=is_windows)

    def update_to(self, response):
        current = response.context["upload_stream_current"]
        self.total = response.context["data_stream_total"]
        if current:
            self.update(current - self.n)


class DirectoryUploadProgressBar(tqdm):
    def __init__(self, dir_size: int, msg: Optional[str] = None):
        super().__init__(unit="B", unit_scale=True, desc=msg, colour="green")
        self.total = dir_size
        self.completed = 0

    def update_to(self, response):
        current = None
        if response.context["upload_stream_current"]:
            current = response.context["upload_stream_current"] + self.completed
            self.completed = current
        if current:
            self.update(current - self.n)


def get_storage_info_for_non_registry_asset(
    service_client, workspace_name: str, name: str, version: str, resource_group: str
) -> Dict[str, str]:
    """Get SAS uri and blob uri for non-registry asset. Note that this function won't return the same
    SAS uri and blob uri for the same asset. It will return a new SAS uri and blob uri every time it is called.
    :param service_client: Service client
    :type service_client: AzureMachineLearningWorkspaces
    :param workspace_name: The workspace name
    :type workspace_name: str
    :param name: Asset name
    :type name: str
    :param version: Asset version
    :type version: str
    :param resource_group: Resource group
    :type resource_group: str
    :return: The sas_uri and blob_uri
    :rtype: Dict[str, str]
    """
    request_body = PendingUploadRequestDto(pending_upload_type="TemporaryBlobReference")
    response = service_client.code_versions.create_or_get_start_pending_upload(
        resource_group_name=resource_group,
        workspace_name=workspace_name,
        name=name,
        version=version,
        body=request_body,
    )

    sas_info = {
        "sas_uri": response.blob_reference_for_consumption.credential.sas_uri,
        "blob_uri": response.blob_reference_for_consumption.blob_uri,
    }

    return sas_info


def _get_existing_asset_name_and_version(existing_asset):
    import re

    regex = r"/codes/([^/]+)/versions/([^/]+)"

    arm_id = existing_asset.id
    match = re.search(regex, arm_id)
    name = match.group(1)
    version = match.group(2)

    return name, version