aboutsummaryrefslogtreecommitdiff
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from ._deserialize import (
    process_storage_error)
from ._shared.response_handlers import return_response_headers
from ._shared.uploads import (
    upload_data_chunks,
    DataLakeFileChunkUploader, upload_substream_blocks)
from ...core.exceptions import HttpResponseError


def _any_conditions(modified_access_conditions=None, **kwargs):  # pylint: disable=unused-argument
    return any([
        modified_access_conditions.if_modified_since,
        modified_access_conditions.if_unmodified_since,
        modified_access_conditions.if_none_match,
        modified_access_conditions.if_match
    ])


def upload_datalake_file(
        client=None,
        stream=None,
        length=None,
        overwrite=None,
        validate_content=None,
        max_concurrency=None,
        file_settings=None,
        **kwargs):
    try:
        if length == 0:
            return {}
        properties = kwargs.pop('properties', None)
        umask = kwargs.pop('umask', None)
        permissions = kwargs.pop('permissions', None)
        path_http_headers = kwargs.pop('path_http_headers', None)
        modified_access_conditions = kwargs.pop('modified_access_conditions', None)
        chunk_size = kwargs.pop('chunk_size', 100 * 1024 * 1024)
        encryption_context = kwargs.pop('encryption_context', None)

        if not overwrite:
            # if customers didn't specify access conditions, they cannot flush data to existing file
            if not _any_conditions(modified_access_conditions):
                modified_access_conditions.if_none_match = '*'
            if properties or umask or permissions:
                raise ValueError("metadata, umask and permissions can be set only when overwrite is enabled")

        if overwrite:
            response = client.create(
                resource='file',
                path_http_headers=path_http_headers,
                properties=properties,
                modified_access_conditions=modified_access_conditions,
                umask=umask,
                permissions=permissions,
                encryption_context=encryption_context,
                cls=return_response_headers,
                **kwargs)

            # this modified_access_conditions will be applied to flush_data to make sure
            # no other flush between create and the current flush
            modified_access_conditions.if_match = response['etag']
            modified_access_conditions.if_none_match = None
            modified_access_conditions.if_modified_since = None
            modified_access_conditions.if_unmodified_since = None

        use_original_upload_path = file_settings.use_byte_buffer or \
            validate_content or chunk_size < file_settings.min_large_chunk_upload_threshold or \
            hasattr(stream, 'seekable') and not stream.seekable() or \
            not hasattr(stream, 'seek') or not hasattr(stream, 'tell')

        if use_original_upload_path:
            upload_data_chunks(
                service=client,
                uploader_class=DataLakeFileChunkUploader,
                total_size=length,
                chunk_size=chunk_size,
                stream=stream,
                max_concurrency=max_concurrency,
                validate_content=validate_content,
                **kwargs)
        else:
            upload_substream_blocks(
                service=client,
                uploader_class=DataLakeFileChunkUploader,
                total_size=length,
                chunk_size=chunk_size,
                max_concurrency=max_concurrency,
                stream=stream,
                validate_content=validate_content,
                **kwargs
            )

        return client.flush_data(position=length,
                                 path_http_headers=path_http_headers,
                                 modified_access_conditions=modified_access_conditions,
                                 close=True,
                                 cls=return_response_headers,
                                 **kwargs)
    except HttpResponseError as error:
        process_storage_error(error)