# ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # -------------------------------------------------------------------------- import asyncio import inspect import threading from asyncio import Lock from io import UnsupportedOperation from itertools import islice from math import ceil from typing import AsyncGenerator, Union from .import encode_base64, url_quote from .request_handlers import get_length from .response_handlers import return_response_headers from .uploads import SubStream, IterStreamer # pylint: disable=unused-import async def _async_parallel_uploads(uploader, pending, running): range_ids = [] while True: # Wait for some download to finish before adding a new one done, running = await asyncio.wait(running, return_when=asyncio.FIRST_COMPLETED) range_ids.extend([chunk.result() for chunk in done]) try: for _ in range(0, len(done)): next_chunk = await pending.__anext__() running.add(asyncio.ensure_future(uploader(next_chunk))) except StopAsyncIteration: break # Wait for the remaining uploads to finish if running: done, _running = await asyncio.wait(running) range_ids.extend([chunk.result() for chunk in done]) return range_ids async def _parallel_uploads(uploader, pending, running): range_ids = [] while True: # Wait for some download to finish before adding a new one done, running = await asyncio.wait(running, return_when=asyncio.FIRST_COMPLETED) range_ids.extend([chunk.result() for chunk in done]) try: for _ in range(0, len(done)): next_chunk = next(pending) running.add(asyncio.ensure_future(uploader(next_chunk))) except StopIteration: break # Wait for the remaining uploads to finish if running: done, _running = await asyncio.wait(running) range_ids.extend([chunk.result() for chunk in done]) return range_ids async def upload_data_chunks( service=None, uploader_class=None, total_size=None, chunk_size=None, max_concurrency=None, stream=None, progress_hook=None, **kwargs): parallel = max_concurrency > 1 if parallel and 'modified_access_conditions' in kwargs: # Access conditions do not work with parallelism kwargs['modified_access_conditions'] = None uploader = uploader_class( service=service, total_size=total_size, chunk_size=chunk_size, stream=stream, parallel=parallel, progress_hook=progress_hook, **kwargs) if parallel: upload_tasks = uploader.get_chunk_streams() running_futures = [] for _ in range(max_concurrency): try: chunk = await upload_tasks.__anext__() running_futures.append(asyncio.ensure_future(uploader.process_chunk(chunk))) except StopAsyncIteration: break range_ids = await _async_parallel_uploads(uploader.process_chunk, upload_tasks, running_futures) else: range_ids = [] async for chunk in uploader.get_chunk_streams(): range_ids.append(await uploader.process_chunk(chunk)) if any(range_ids): return [r[1] for r in sorted(range_ids, key=lambda r: r[0])] return uploader.response_headers async def upload_substream_blocks( service=None, uploader_class=None, total_size=None, chunk_size=None, max_concurrency=None, stream=None, progress_hook=None, **kwargs): parallel = max_concurrency > 1 if parallel and 'modified_access_conditions' in kwargs: # Access conditions do not work with parallelism kwargs['modified_access_conditions'] = None uploader = uploader_class( service=service, total_size=total_size, chunk_size=chunk_size, stream=stream, parallel=parallel, progress_hook=progress_hook, **kwargs) if parallel: upload_tasks = uploader.get_substream_blocks() running_futures = [ asyncio.ensure_future(uploader.process_substream_block(u)) for u in islice(upload_tasks, 0, max_concurrency) ] range_ids = await _parallel_uploads(uploader.process_substream_block, upload_tasks, running_futures) else: range_ids = [] for block in uploader.get_substream_blocks(): range_ids.append(await uploader.process_substream_block(block)) if any(range_ids): return sorted(range_ids) return class _ChunkUploader(object): # pylint: disable=too-many-instance-attributes def __init__( self, service, total_size, chunk_size, stream, parallel, encryptor=None, padder=None, progress_hook=None, **kwargs): self.service = service self.total_size = total_size self.chunk_size = chunk_size self.stream = stream self.parallel = parallel # Stream management self.stream_lock = threading.Lock() if parallel else None # Progress feedback self.progress_total = 0 self.progress_lock = Lock() if parallel else None self.progress_hook = progress_hook # Encryption self.encryptor = encryptor self.padder = padder self.response_headers = None self.etag = None self.last_modified = None self.request_options = kwargs async def get_chunk_streams(self): index = 0 while True: data = b'' read_size = self.chunk_size # Buffer until we either reach the end of the stream or get a whole chunk. while True: if self.total_size: read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data))) temp = self.stream.read(read_size) if inspect.isawaitable(temp): temp = await temp if not isinstance(temp, bytes): raise TypeError('Blob data should be of type bytes.') data += temp or b"" # We have read an empty string and so are at the end # of the buffer or we have read a full chunk. if temp == b'' or len(data) == self.chunk_size: break if len(data) == self.chunk_size: if self.padder: data = self.padder.update(data) if self.encryptor: data = self.encryptor.update(data) yield index, data else: if self.padder: data = self.padder.update(data) + self.padder.finalize() if self.encryptor: data = self.encryptor.update(data) + self.encryptor.finalize() if data: yield index, data break index += len(data) async def process_chunk(self, chunk_data): chunk_bytes = chunk_data[1] chunk_offset = chunk_data[0] return await self._upload_chunk_with_progress(chunk_offset, chunk_bytes) async def _update_progress(self, length): if self.progress_lock is not None: async with self.progress_lock: self.progress_total += length else: self.progress_total += length if self.progress_hook: await self.progress_hook(self.progress_total, self.total_size) async def _upload_chunk(self, chunk_offset, chunk_data): raise NotImplementedError("Must be implemented by child class.") async def _upload_chunk_with_progress(self, chunk_offset, chunk_data): range_id = await self._upload_chunk(chunk_offset, chunk_data) await self._update_progress(len(chunk_data)) return range_id def get_substream_blocks(self): assert self.chunk_size is not None lock = self.stream_lock blob_length = self.total_size if blob_length is None: blob_length = get_length(self.stream) if blob_length is None: raise ValueError("Unable to determine content length of upload data.") blocks = int(ceil(blob_length / (self.chunk_size * 1.0))) last_block_size = self.chunk_size if blob_length % self.chunk_size == 0 else blob_length % self.chunk_size for i in range(blocks): index = i * self.chunk_size length = last_block_size if i == blocks - 1 else self.chunk_size yield index, SubStream(self.stream, index, length, lock) async def process_substream_block(self, block_data): return await self._upload_substream_block_with_progress(block_data[0], block_data[1]) async def _upload_substream_block(self, index, block_stream): raise NotImplementedError("Must be implemented by child class.") async def _upload_substream_block_with_progress(self, index, block_stream): range_id = await self._upload_substream_block(index, block_stream) await self._update_progress(len(block_stream)) return range_id def set_response_properties(self, resp): self.etag = resp.etag self.last_modified = resp.last_modified class BlockBlobChunkUploader(_ChunkUploader): def __init__(self, *args, **kwargs): kwargs.pop('modified_access_conditions', None) super(BlockBlobChunkUploader, self).__init__(*args, **kwargs) self.current_length = None async def _upload_chunk(self, chunk_offset, chunk_data): # TODO: This is incorrect, but works with recording. index = f'{chunk_offset:032d}' block_id = encode_base64(url_quote(encode_base64(index))) await self.service.stage_block( block_id, len(chunk_data), body=chunk_data, data_stream_total=self.total_size, upload_stream_current=self.progress_total, **self.request_options) return index, block_id async def _upload_substream_block(self, index, block_stream): try: block_id = f'BlockId{(index//self.chunk_size):05}' await self.service.stage_block( block_id, len(block_stream), block_stream, data_stream_total=self.total_size, upload_stream_current=self.progress_total, **self.request_options) finally: block_stream.close() return block_id class PageBlobChunkUploader(_ChunkUploader): def _is_chunk_empty(self, chunk_data): # read until non-zero byte is encountered # if reached the end without returning, then chunk_data is all 0's for each_byte in chunk_data: if each_byte not in [0, b'\x00']: return False return True async def _upload_chunk(self, chunk_offset, chunk_data): # avoid uploading the empty pages if not self._is_chunk_empty(chunk_data): chunk_end = chunk_offset + len(chunk_data) - 1 content_range = f'bytes={chunk_offset}-{chunk_end}' computed_md5 = None self.response_headers = await self.service.upload_pages( body=chunk_data, content_length=len(chunk_data), transactional_content_md5=computed_md5, range=content_range, cls=return_response_headers, data_stream_total=self.total_size, upload_stream_current=self.progress_total, **self.request_options) if not self.parallel and self.request_options.get('modified_access_conditions'): self.request_options['modified_access_conditions'].if_match = self.response_headers['etag'] async def _upload_substream_block(self, index, block_stream): pass class AppendBlobChunkUploader(_ChunkUploader): def __init__(self, *args, **kwargs): super(AppendBlobChunkUploader, self).__init__(*args, **kwargs) self.current_length = None async def _upload_chunk(self, chunk_offset, chunk_data): if self.current_length is None: self.response_headers = await self.service.append_block( body=chunk_data, content_length=len(chunk_data), cls=return_response_headers, data_stream_total=self.total_size, upload_stream_current=self.progress_total, **self.request_options) self.current_length = int(self.response_headers['blob_append_offset']) else: self.request_options['append_position_access_conditions'].append_position = \ self.current_length + chunk_offset self.response_headers = await self.service.append_block( body=chunk_data, content_length=len(chunk_data), cls=return_response_headers, data_stream_total=self.total_size, upload_stream_current=self.progress_total, **self.request_options) async def _upload_substream_block(self, index, block_stream): pass class DataLakeFileChunkUploader(_ChunkUploader): async def _upload_chunk(self, chunk_offset, chunk_data): self.response_headers = await self.service.append_data( body=chunk_data, position=chunk_offset, content_length=len(chunk_data), cls=return_response_headers, data_stream_total=self.total_size, upload_stream_current=self.progress_total, **self.request_options ) if not self.parallel and self.request_options.get('modified_access_conditions'): self.request_options['modified_access_conditions'].if_match = self.response_headers['etag'] async def _upload_substream_block(self, index, block_stream): try: await self.service.append_data( body=block_stream, position=index, content_length=len(block_stream), cls=return_response_headers, data_stream_total=self.total_size, upload_stream_current=self.progress_total, **self.request_options ) finally: block_stream.close() class FileChunkUploader(_ChunkUploader): async def _upload_chunk(self, chunk_offset, chunk_data): length = len(chunk_data) chunk_end = chunk_offset + length - 1 response = await self.service.upload_range( chunk_data, chunk_offset, length, data_stream_total=self.total_size, upload_stream_current=self.progress_total, **self.request_options ) range_id = f'bytes={chunk_offset}-{chunk_end}' return range_id, response # TODO: Implement this method. async def _upload_substream_block(self, index, block_stream): pass class AsyncIterStreamer(): """ File-like streaming object for AsyncGenerators. """ def __init__(self, generator: AsyncGenerator[Union[bytes, str], None], encoding: str = "UTF-8"): self.iterator = generator.__aiter__() self.leftover = b"" self.encoding = encoding def seekable(self): return False def tell(self, *args, **kwargs): raise UnsupportedOperation("Data generator does not support tell.") def seek(self, *args, **kwargs): raise UnsupportedOperation("Data generator is not seekable.") async def read(self, size: int) -> bytes: data = self.leftover count = len(self.leftover) try: while count < size: chunk = await self.iterator.__anext__() if isinstance(chunk, str): chunk = chunk.encode(self.encoding) data += chunk count += len(chunk) # This means count < size and what's leftover will be returned in this call. except StopAsyncIteration: self.leftover = b"" if count >= size: self.leftover = data[size:] return data[:size]