about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py105
1 files changed, 105 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py b/.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py
new file mode 100644
index 00000000..6cd89540
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py
@@ -0,0 +1,105 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+
+from ._deserialize import (
+    process_storage_error)
+from ._shared.response_handlers import return_response_headers
+from ._shared.uploads import (
+    upload_data_chunks,
+    DataLakeFileChunkUploader, upload_substream_blocks)
+from ...core.exceptions import HttpResponseError
+
+
+def _any_conditions(modified_access_conditions=None, **kwargs):  # pylint: disable=unused-argument
+    return any([
+        modified_access_conditions.if_modified_since,
+        modified_access_conditions.if_unmodified_since,
+        modified_access_conditions.if_none_match,
+        modified_access_conditions.if_match
+    ])
+
+
+def upload_datalake_file(
+        client=None,
+        stream=None,
+        length=None,
+        overwrite=None,
+        validate_content=None,
+        max_concurrency=None,
+        file_settings=None,
+        **kwargs):
+    try:
+        if length == 0:
+            return {}
+        properties = kwargs.pop('properties', None)
+        umask = kwargs.pop('umask', None)
+        permissions = kwargs.pop('permissions', None)
+        path_http_headers = kwargs.pop('path_http_headers', None)
+        modified_access_conditions = kwargs.pop('modified_access_conditions', None)
+        chunk_size = kwargs.pop('chunk_size', 100 * 1024 * 1024)
+        encryption_context = kwargs.pop('encryption_context', None)
+
+        if not overwrite:
+            # if customers didn't specify access conditions, they cannot flush data to existing file
+            if not _any_conditions(modified_access_conditions):
+                modified_access_conditions.if_none_match = '*'
+            if properties or umask or permissions:
+                raise ValueError("metadata, umask and permissions can be set only when overwrite is enabled")
+
+        if overwrite:
+            response = client.create(
+                resource='file',
+                path_http_headers=path_http_headers,
+                properties=properties,
+                modified_access_conditions=modified_access_conditions,
+                umask=umask,
+                permissions=permissions,
+                encryption_context=encryption_context,
+                cls=return_response_headers,
+                **kwargs)
+
+            # this modified_access_conditions will be applied to flush_data to make sure
+            # no other flush between create and the current flush
+            modified_access_conditions.if_match = response['etag']
+            modified_access_conditions.if_none_match = None
+            modified_access_conditions.if_modified_since = None
+            modified_access_conditions.if_unmodified_since = None
+
+        use_original_upload_path = file_settings.use_byte_buffer or \
+            validate_content or chunk_size < file_settings.min_large_chunk_upload_threshold or \
+            hasattr(stream, 'seekable') and not stream.seekable() or \
+            not hasattr(stream, 'seek') or not hasattr(stream, 'tell')
+
+        if use_original_upload_path:
+            upload_data_chunks(
+                service=client,
+                uploader_class=DataLakeFileChunkUploader,
+                total_size=length,
+                chunk_size=chunk_size,
+                stream=stream,
+                max_concurrency=max_concurrency,
+                validate_content=validate_content,
+                **kwargs)
+        else:
+            upload_substream_blocks(
+                service=client,
+                uploader_class=DataLakeFileChunkUploader,
+                total_size=length,
+                chunk_size=chunk_size,
+                max_concurrency=max_concurrency,
+                stream=stream,
+                validate_content=validate_content,
+                **kwargs
+            )
+
+        return client.flush_data(position=length,
+                                 path_http_headers=path_http_headers,
+                                 modified_access_conditions=modified_access_conditions,
+                                 close=True,
+                                 cls=return_response_headers,
+                                 **kwargs)
+    except HttpResponseError as error:
+        process_storage_error(error)