diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py b/.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py new file mode 100644 index 00000000..6cd89540 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/storage/filedatalake/_upload_helper.py @@ -0,0 +1,105 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +from ._deserialize import ( + process_storage_error) +from ._shared.response_handlers import return_response_headers +from ._shared.uploads import ( + upload_data_chunks, + DataLakeFileChunkUploader, upload_substream_blocks) +from ...core.exceptions import HttpResponseError + + +def _any_conditions(modified_access_conditions=None, **kwargs): # pylint: disable=unused-argument + return any([ + modified_access_conditions.if_modified_since, + modified_access_conditions.if_unmodified_since, + modified_access_conditions.if_none_match, + modified_access_conditions.if_match + ]) + + +def upload_datalake_file( + client=None, + stream=None, + length=None, + overwrite=None, + validate_content=None, + max_concurrency=None, + file_settings=None, + **kwargs): + try: + if length == 0: + return {} + properties = kwargs.pop('properties', None) + umask = kwargs.pop('umask', None) + permissions = kwargs.pop('permissions', None) + path_http_headers = kwargs.pop('path_http_headers', None) + modified_access_conditions = kwargs.pop('modified_access_conditions', None) + chunk_size = kwargs.pop('chunk_size', 100 * 1024 * 1024) + encryption_context = kwargs.pop('encryption_context', None) + + if not overwrite: + # if customers didn't specify access conditions, they cannot flush data to existing file + if not _any_conditions(modified_access_conditions): + modified_access_conditions.if_none_match = '*' + if properties or umask or permissions: + raise ValueError("metadata, umask and permissions can be set only when overwrite is enabled") + + if overwrite: + response = client.create( + resource='file', + path_http_headers=path_http_headers, + properties=properties, + modified_access_conditions=modified_access_conditions, + umask=umask, + permissions=permissions, + encryption_context=encryption_context, + cls=return_response_headers, + **kwargs) + + # this modified_access_conditions will be applied to flush_data to make sure + # no other flush between create and the current flush + modified_access_conditions.if_match = response['etag'] + modified_access_conditions.if_none_match = None + modified_access_conditions.if_modified_since = None + modified_access_conditions.if_unmodified_since = None + + use_original_upload_path = file_settings.use_byte_buffer or \ + validate_content or chunk_size < file_settings.min_large_chunk_upload_threshold or \ + hasattr(stream, 'seekable') and not stream.seekable() or \ + not hasattr(stream, 'seek') or not hasattr(stream, 'tell') + + if use_original_upload_path: + upload_data_chunks( + service=client, + uploader_class=DataLakeFileChunkUploader, + total_size=length, + chunk_size=chunk_size, + stream=stream, + max_concurrency=max_concurrency, + validate_content=validate_content, + **kwargs) + else: + upload_substream_blocks( + service=client, + uploader_class=DataLakeFileChunkUploader, + total_size=length, + chunk_size=chunk_size, + max_concurrency=max_concurrency, + stream=stream, + validate_content=validate_content, + **kwargs + ) + + return client.flush_data(position=length, + path_http_headers=path_http_headers, + modified_access_conditions=modified_access_conditions, + close=True, + cls=return_response_headers, + **kwargs) + except HttpResponseError as error: + process_storage_error(error) |