diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro')
6 files changed, 2504 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/__init__.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/__init__.py new file mode 100644 index 00000000..5b396cd2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/__init__.py @@ -0,0 +1,5 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io.py new file mode 100644 index 00000000..3e46f1fb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io.py @@ -0,0 +1,435 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# pylint: disable=docstring-missing-return, docstring-missing-rtype + +"""Input/output utilities. + +Includes: + - i/o-specific constants + - i/o-specific exceptions + - schema validation + - leaf value encoding and decoding + - datum reader/writer stuff (?) + +Also includes a generic representation for data, which uses the +following mapping: + - Schema records are implemented as dict. + - Schema arrays are implemented as list. + - Schema maps are implemented as dict. + - Schema strings are implemented as unicode. + - Schema bytes are implemented as str. + - Schema ints are implemented as int. + - Schema longs are implemented as long. + - Schema floats are implemented as float. + - Schema doubles are implemented as float. + - Schema booleans are implemented as bool. +""" + +import json +import logging +import struct +import sys + +from ..avro import schema + +PY3 = sys.version_info[0] == 3 + +logger = logging.getLogger(__name__) + +# ------------------------------------------------------------------------------ +# Constants + +STRUCT_FLOAT = struct.Struct('<f') # little-endian float +STRUCT_DOUBLE = struct.Struct('<d') # little-endian double + +# ------------------------------------------------------------------------------ +# Exceptions + + +class SchemaResolutionException(schema.AvroException): + def __init__(self, fail_msg, writer_schema=None): + pretty_writers = json.dumps(json.loads(str(writer_schema)), indent=2) + if writer_schema: + fail_msg += f"\nWriter's Schema: {pretty_writers}" + schema.AvroException.__init__(self, fail_msg) + +# ------------------------------------------------------------------------------ +# Decoder + + +class BinaryDecoder(object): + """Read leaf values.""" + + def __init__(self, reader): + """ + reader is a Python object on which we can call read, seek, and tell. + """ + self._reader = reader + + @property + def reader(self): + """Reports the reader used by this decoder.""" + return self._reader + + def read(self, n): + """Read n bytes. + + :param int n: Number of bytes to read. + :returns: The next n bytes from the input. + :rtype: bytes + """ + assert (n >= 0), n + input_bytes = self.reader.read(n) + if n > 0 and not input_bytes: + raise StopIteration + assert (len(input_bytes) == n), input_bytes + return input_bytes + + @staticmethod + def read_null(): + """ + null is written as zero bytes + """ + return None + + def read_boolean(self): + """ + a boolean is written as a single byte + whose value is either 0 (false) or 1 (true). + """ + b = ord(self.read(1)) + if b == 1: + return True + if b == 0: + return False + fail_msg = f"Invalid value for boolean: {b}" + raise schema.AvroException(fail_msg) + + def read_int(self): + """ + int and long values are written using variable-length, zig-zag coding. + """ + return self.read_long() + + def read_long(self): + """ + int and long values are written using variable-length, zig-zag coding. + """ + b = ord(self.read(1)) + n = b & 0x7F + shift = 7 + while (b & 0x80) != 0: + b = ord(self.read(1)) + n |= (b & 0x7F) << shift + shift += 7 + datum = (n >> 1) ^ -(n & 1) + return datum + + def read_float(self): + """ + A float is written as 4 bytes. + The float is converted into a 32-bit integer using a method equivalent to + Java's floatToIntBits and then encoded in little-endian format. + """ + return STRUCT_FLOAT.unpack(self.read(4))[0] + + def read_double(self): + """ + A double is written as 8 bytes. + The double is converted into a 64-bit integer using a method equivalent to + Java's doubleToLongBits and then encoded in little-endian format. + """ + return STRUCT_DOUBLE.unpack(self.read(8))[0] + + def read_bytes(self): + """ + Bytes are encoded as a long followed by that many bytes of data. + """ + nbytes = self.read_long() + assert (nbytes >= 0), nbytes + return self.read(nbytes) + + def read_utf8(self): + """ + A string is encoded as a long followed by + that many bytes of UTF-8 encoded character data. + """ + input_bytes = self.read_bytes() + if PY3: + try: + return input_bytes.decode('utf-8') + except UnicodeDecodeError as exn: + logger.error('Invalid UTF-8 input bytes: %r', input_bytes) + raise exn + else: + # PY2 + return unicode(input_bytes, "utf-8") # pylint: disable=undefined-variable + + def skip_null(self): + pass + + def skip_boolean(self): + self.skip(1) + + def skip_int(self): + self.skip_long() + + def skip_long(self): + b = ord(self.read(1)) + while (b & 0x80) != 0: + b = ord(self.read(1)) + + def skip_float(self): + self.skip(4) + + def skip_double(self): + self.skip(8) + + def skip_bytes(self): + self.skip(self.read_long()) + + def skip_utf8(self): + self.skip_bytes() + + def skip(self, n): + self.reader.seek(self.reader.tell() + n) + + +# ------------------------------------------------------------------------------ +# DatumReader + + +class DatumReader(object): + """Deserialize Avro-encoded data into a Python data structure.""" + + def __init__(self, writer_schema=None): + """ + As defined in the Avro specification, we call the schema encoded + in the data the "writer's schema". + """ + self._writer_schema = writer_schema + + # read/write properties + def set_writer_schema(self, writer_schema): + self._writer_schema = writer_schema + + writer_schema = property(lambda self: self._writer_schema, + set_writer_schema) + + def read(self, decoder): + return self.read_data(self.writer_schema, decoder) + + def read_data(self, writer_schema, decoder): + # function dispatch for reading data based on type of writer's schema + if writer_schema.type == 'null': + result = decoder.read_null() + elif writer_schema.type == 'boolean': + result = decoder.read_boolean() + elif writer_schema.type == 'string': + result = decoder.read_utf8() + elif writer_schema.type == 'int': + result = decoder.read_int() + elif writer_schema.type == 'long': + result = decoder.read_long() + elif writer_schema.type == 'float': + result = decoder.read_float() + elif writer_schema.type == 'double': + result = decoder.read_double() + elif writer_schema.type == 'bytes': + result = decoder.read_bytes() + elif writer_schema.type == 'fixed': + result = self.read_fixed(writer_schema, decoder) + elif writer_schema.type == 'enum': + result = self.read_enum(writer_schema, decoder) + elif writer_schema.type == 'array': + result = self.read_array(writer_schema, decoder) + elif writer_schema.type == 'map': + result = self.read_map(writer_schema, decoder) + elif writer_schema.type in ['union', 'error_union']: + result = self.read_union(writer_schema, decoder) + elif writer_schema.type in ['record', 'error', 'request']: + result = self.read_record(writer_schema, decoder) + else: + fail_msg = f"Cannot read unknown schema type: {writer_schema.type}" + raise schema.AvroException(fail_msg) + return result + + def skip_data(self, writer_schema, decoder): + if writer_schema.type == 'null': + result = decoder.skip_null() + elif writer_schema.type == 'boolean': + result = decoder.skip_boolean() + elif writer_schema.type == 'string': + result = decoder.skip_utf8() + elif writer_schema.type == 'int': + result = decoder.skip_int() + elif writer_schema.type == 'long': + result = decoder.skip_long() + elif writer_schema.type == 'float': + result = decoder.skip_float() + elif writer_schema.type == 'double': + result = decoder.skip_double() + elif writer_schema.type == 'bytes': + result = decoder.skip_bytes() + elif writer_schema.type == 'fixed': + result = self.skip_fixed(writer_schema, decoder) + elif writer_schema.type == 'enum': + result = self.skip_enum(decoder) + elif writer_schema.type == 'array': + self.skip_array(writer_schema, decoder) + result = None + elif writer_schema.type == 'map': + self.skip_map(writer_schema, decoder) + result = None + elif writer_schema.type in ['union', 'error_union']: + result = self.skip_union(writer_schema, decoder) + elif writer_schema.type in ['record', 'error', 'request']: + self.skip_record(writer_schema, decoder) + result = None + else: + fail_msg = f"Unknown schema type: {writer_schema.type}" + raise schema.AvroException(fail_msg) + return result + + # Fixed instances are encoded using the number of bytes declared in the schema. + @staticmethod + def read_fixed(writer_schema, decoder): + return decoder.read(writer_schema.size) + + @staticmethod + def skip_fixed(writer_schema, decoder): + return decoder.skip(writer_schema.size) + + # An enum is encoded by a int, representing the zero-based position of the symbol in the schema. + @staticmethod + def read_enum(writer_schema, decoder): + # read data + index_of_symbol = decoder.read_int() + if index_of_symbol >= len(writer_schema.symbols): + fail_msg = f"Can't access enum index {index_of_symbol} for enum with {len(writer_schema.symbols)} symbols" + raise SchemaResolutionException(fail_msg, writer_schema) + read_symbol = writer_schema.symbols[index_of_symbol] + return read_symbol + + @staticmethod + def skip_enum(decoder): + return decoder.skip_int() + + # Arrays are encoded as a series of blocks. + + # Each block consists of a long count value, followed by that many array items. + # A block with count zero indicates the end of the array. Each item is encoded per the array's item schema. + + # If a block's count is negative, then the count is followed immediately by a long block size, + # indicating the number of bytes in the block. + # The actual count in this case is the absolute value of the count written. + def read_array(self, writer_schema, decoder): + read_items = [] + block_count = decoder.read_long() + while block_count != 0: + if block_count < 0: + block_count = -block_count + decoder.read_long() + for _ in range(block_count): + read_items.append(self.read_data(writer_schema.items, decoder)) + block_count = decoder.read_long() + return read_items + + def skip_array(self, writer_schema, decoder): + block_count = decoder.read_long() + while block_count != 0: + if block_count < 0: + block_size = decoder.read_long() + decoder.skip(block_size) + else: + for _ in range(block_count): + self.skip_data(writer_schema.items, decoder) + block_count = decoder.read_long() + + # Maps are encoded as a series of blocks. + + # Each block consists of a long count value, followed by that many key/value pairs. + # A block with count zero indicates the end of the map. Each item is encoded per the map's value schema. + + # If a block's count is negative, then the count is followed immediately by a long block size, + # indicating the number of bytes in the block. + # The actual count in this case is the absolute value of the count written. + def read_map(self, writer_schema, decoder): + read_items = {} + block_count = decoder.read_long() + while block_count != 0: + if block_count < 0: + block_count = -block_count + decoder.read_long() + for _ in range(block_count): + key = decoder.read_utf8() + read_items[key] = self.read_data(writer_schema.values, decoder) + block_count = decoder.read_long() + return read_items + + def skip_map(self, writer_schema, decoder): + block_count = decoder.read_long() + while block_count != 0: + if block_count < 0: + block_size = decoder.read_long() + decoder.skip(block_size) + else: + for _ in range(block_count): + decoder.skip_utf8() + self.skip_data(writer_schema.values, decoder) + block_count = decoder.read_long() + + # A union is encoded by first writing a long value indicating + # the zero-based position within the union of the schema of its value. + # The value is then encoded per the indicated schema within the union. + def read_union(self, writer_schema, decoder): + # schema resolution + index_of_schema = int(decoder.read_long()) + if index_of_schema >= len(writer_schema.schemas): + fail_msg = (f"Can't access branch index {index_of_schema} " + f"for union with {len(writer_schema.schemas)} branches") + raise SchemaResolutionException(fail_msg, writer_schema) + selected_writer_schema = writer_schema.schemas[index_of_schema] + + # read data + return self.read_data(selected_writer_schema, decoder) + + def skip_union(self, writer_schema, decoder): + index_of_schema = int(decoder.read_long()) + if index_of_schema >= len(writer_schema.schemas): + fail_msg = (f"Can't access branch index {index_of_schema} " + f"for union with {len(writer_schema.schemas)} branches") + raise SchemaResolutionException(fail_msg, writer_schema) + return self.skip_data(writer_schema.schemas[index_of_schema], decoder) + + # A record is encoded by encoding the values of its fields + # in the order that they are declared. In other words, a record + # is encoded as just the concatenation of the encodings of its fields. + # Field values are encoded per their schema. + + # Schema Resolution: + # * the ordering of fields may be different: fields are matched by name. + # * schemas for fields with the same name in both records are resolved + # recursively. + # * if the writer's record contains a field with a name not present in the + # reader's record, the writer's value for that field is ignored. + # * if the reader's record schema has a field that contains a default value, + # and writer's schema does not have a field with the same name, then the + # reader should use the default value from its field. + # * if the reader's record schema has a field with no default value, and + # writer's schema does not have a field with the same name, then the + # field's value is unset. + def read_record(self, writer_schema, decoder): + # schema resolution + read_record = {} + for field in writer_schema.fields: + field_val = self.read_data(field.type, decoder) + read_record[field.name] = field_val + return read_record + + def skip_record(self, writer_schema, decoder): + for field in writer_schema.fields: + self.skip_data(field.type, decoder) diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io_async.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io_async.py new file mode 100644 index 00000000..8688661b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io_async.py @@ -0,0 +1,419 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# pylint: disable=docstring-missing-return, docstring-missing-rtype + +"""Input/output utilities. + +Includes: + - i/o-specific constants + - i/o-specific exceptions + - schema validation + - leaf value encoding and decoding + - datum reader/writer stuff (?) + +Also includes a generic representation for data, which uses the +following mapping: + - Schema records are implemented as dict. + - Schema arrays are implemented as list. + - Schema maps are implemented as dict. + - Schema strings are implemented as unicode. + - Schema bytes are implemented as str. + - Schema ints are implemented as int. + - Schema longs are implemented as long. + - Schema floats are implemented as float. + - Schema doubles are implemented as float. + - Schema booleans are implemented as bool. +""" + +import logging +import sys + +from ..avro import schema + +from .avro_io import STRUCT_FLOAT, STRUCT_DOUBLE, SchemaResolutionException + +PY3 = sys.version_info[0] == 3 + +logger = logging.getLogger(__name__) + +# ------------------------------------------------------------------------------ +# Decoder + + +class AsyncBinaryDecoder(object): + """Read leaf values.""" + + def __init__(self, reader): + """ + reader is a Python object on which we can call read, seek, and tell. + """ + self._reader = reader + + @property + def reader(self): + """Reports the reader used by this decoder.""" + return self._reader + + async def read(self, n): + """Read n bytes. + + :param int n: Number of bytes to read. + :returns: The next n bytes from the input. + :rtype: bytes + """ + assert (n >= 0), n + input_bytes = await self.reader.read(n) + if n > 0 and not input_bytes: + raise StopAsyncIteration + assert (len(input_bytes) == n), input_bytes + return input_bytes + + @staticmethod + def read_null(): + """ + null is written as zero bytes + """ + return None + + async def read_boolean(self): + """ + a boolean is written as a single byte + whose value is either 0 (false) or 1 (true). + """ + b = ord(await self.read(1)) + if b == 1: + return True + if b == 0: + return False + fail_msg = f"Invalid value for boolean: {b}" + raise schema.AvroException(fail_msg) + + async def read_int(self): + """ + int and long values are written using variable-length, zig-zag coding. + """ + return await self.read_long() + + async def read_long(self): + """ + int and long values are written using variable-length, zig-zag coding. + """ + b = ord(await self.read(1)) + n = b & 0x7F + shift = 7 + while (b & 0x80) != 0: + b = ord(await self.read(1)) + n |= (b & 0x7F) << shift + shift += 7 + datum = (n >> 1) ^ -(n & 1) + return datum + + async def read_float(self): + """ + A float is written as 4 bytes. + The float is converted into a 32-bit integer using a method equivalent to + Java's floatToIntBits and then encoded in little-endian format. + """ + return STRUCT_FLOAT.unpack(await self.read(4))[0] + + async def read_double(self): + """ + A double is written as 8 bytes. + The double is converted into a 64-bit integer using a method equivalent to + Java's doubleToLongBits and then encoded in little-endian format. + """ + return STRUCT_DOUBLE.unpack(await self.read(8))[0] + + async def read_bytes(self): + """ + Bytes are encoded as a long followed by that many bytes of data. + """ + nbytes = await self.read_long() + assert (nbytes >= 0), nbytes + return await self.read(nbytes) + + async def read_utf8(self): + """ + A string is encoded as a long followed by + that many bytes of UTF-8 encoded character data. + """ + input_bytes = await self.read_bytes() + if PY3: + try: + return input_bytes.decode('utf-8') + except UnicodeDecodeError as exn: + logger.error('Invalid UTF-8 input bytes: %r', input_bytes) + raise exn + else: + # PY2 + return unicode(input_bytes, "utf-8") # pylint: disable=undefined-variable + + def skip_null(self): + pass + + async def skip_boolean(self): + await self.skip(1) + + async def skip_int(self): + await self.skip_long() + + async def skip_long(self): + b = ord(await self.read(1)) + while (b & 0x80) != 0: + b = ord(await self.read(1)) + + async def skip_float(self): + await self.skip(4) + + async def skip_double(self): + await self.skip(8) + + async def skip_bytes(self): + await self.skip(await self.read_long()) + + async def skip_utf8(self): + await self.skip_bytes() + + async def skip(self, n): + await self.reader.seek(await self.reader.tell() + n) + + +# ------------------------------------------------------------------------------ +# DatumReader + + +class AsyncDatumReader(object): + """Deserialize Avro-encoded data into a Python data structure.""" + + def __init__(self, writer_schema=None): + """ + As defined in the Avro specification, we call the schema encoded + in the data the "writer's schema", and the schema expected by the + reader the "reader's schema". + """ + self._writer_schema = writer_schema + + # read/write properties + def set_writer_schema(self, writer_schema): + self._writer_schema = writer_schema + + writer_schema = property(lambda self: self._writer_schema, + set_writer_schema) + + async def read(self, decoder): + return await self.read_data(self.writer_schema, decoder) + + async def read_data(self, writer_schema, decoder): + # function dispatch for reading data based on type of writer's schema + if writer_schema.type == 'null': + result = decoder.read_null() + elif writer_schema.type == 'boolean': + result = await decoder.read_boolean() + elif writer_schema.type == 'string': + result = await decoder.read_utf8() + elif writer_schema.type == 'int': + result = await decoder.read_int() + elif writer_schema.type == 'long': + result = await decoder.read_long() + elif writer_schema.type == 'float': + result = await decoder.read_float() + elif writer_schema.type == 'double': + result = await decoder.read_double() + elif writer_schema.type == 'bytes': + result = await decoder.read_bytes() + elif writer_schema.type == 'fixed': + result = await self.read_fixed(writer_schema, decoder) + elif writer_schema.type == 'enum': + result = await self.read_enum(writer_schema, decoder) + elif writer_schema.type == 'array': + result = await self.read_array(writer_schema, decoder) + elif writer_schema.type == 'map': + result = await self.read_map(writer_schema, decoder) + elif writer_schema.type in ['union', 'error_union']: + result = await self.read_union(writer_schema, decoder) + elif writer_schema.type in ['record', 'error', 'request']: + result = await self.read_record(writer_schema, decoder) + else: + fail_msg = f"Cannot read unknown schema type: {writer_schema.type}" + raise schema.AvroException(fail_msg) + return result + + async def skip_data(self, writer_schema, decoder): + if writer_schema.type == 'null': + result = decoder.skip_null() + elif writer_schema.type == 'boolean': + result = await decoder.skip_boolean() + elif writer_schema.type == 'string': + result = await decoder.skip_utf8() + elif writer_schema.type == 'int': + result = await decoder.skip_int() + elif writer_schema.type == 'long': + result = await decoder.skip_long() + elif writer_schema.type == 'float': + result = await decoder.skip_float() + elif writer_schema.type == 'double': + result = await decoder.skip_double() + elif writer_schema.type == 'bytes': + result = await decoder.skip_bytes() + elif writer_schema.type == 'fixed': + result = await self.skip_fixed(writer_schema, decoder) + elif writer_schema.type == 'enum': + result = await self.skip_enum(decoder) + elif writer_schema.type == 'array': + await self.skip_array(writer_schema, decoder) + result = None + elif writer_schema.type == 'map': + await self.skip_map(writer_schema, decoder) + result = None + elif writer_schema.type in ['union', 'error_union']: + result = await self.skip_union(writer_schema, decoder) + elif writer_schema.type in ['record', 'error', 'request']: + await self.skip_record(writer_schema, decoder) + result = None + else: + fail_msg = f"Unknown schema type: {writer_schema.type}" + raise schema.AvroException(fail_msg) + return result + + # Fixed instances are encoded using the number of bytes declared in the schema. + @staticmethod + async def read_fixed(writer_schema, decoder): + return await decoder.read(writer_schema.size) + + @staticmethod + async def skip_fixed(writer_schema, decoder): + return await decoder.skip(writer_schema.size) + + # An enum is encoded by a int, representing the zero-based position of the symbol in the schema. + @staticmethod + async def read_enum(writer_schema, decoder): + # read data + index_of_symbol = await decoder.read_int() + if index_of_symbol >= len(writer_schema.symbols): + fail_msg = f"Can't access enum index {index_of_symbol} for enum with {len(writer_schema.symbols)} symbols" + raise SchemaResolutionException(fail_msg, writer_schema) + read_symbol = writer_schema.symbols[index_of_symbol] + return read_symbol + + @staticmethod + async def skip_enum(decoder): + return await decoder.skip_int() + + # Arrays are encoded as a series of blocks. + + # Each block consists of a long count value, followed by that many array items. + # A block with count zero indicates the end of the array. Each item is encoded per the array's item schema. + + # If a block's count is negative, then the count is followed immediately by a long block size, + # indicating the number of bytes in the block. + # The actual count in this case is the absolute value of the count written. + async def read_array(self, writer_schema, decoder): + read_items = [] + block_count = await decoder.read_long() + while block_count != 0: + if block_count < 0: + block_count = -block_count + await decoder.read_long() + for _ in range(block_count): + read_items.append(await self.read_data(writer_schema.items, decoder)) + block_count = await decoder.read_long() + return read_items + + async def skip_array(self, writer_schema, decoder): + block_count = await decoder.read_long() + while block_count != 0: + if block_count < 0: + block_size = await decoder.read_long() + await decoder.skip(block_size) + else: + for _ in range(block_count): + await self.skip_data(writer_schema.items, decoder) + block_count = await decoder.read_long() + + # Maps are encoded as a series of blocks. + + # Each block consists of a long count value, followed by that many key/value pairs. + # A block with count zero indicates the end of the map. Each item is encoded per the map's value schema. + + # If a block's count is negative, then the count is followed immediately by a long block size, + # indicating the number of bytes in the block. + # The actual count in this case is the absolute value of the count written. + async def read_map(self, writer_schema, decoder): + read_items = {} + block_count = await decoder.read_long() + while block_count != 0: + if block_count < 0: + block_count = -block_count + await decoder.read_long() + for _ in range(block_count): + key = await decoder.read_utf8() + read_items[key] = await self.read_data(writer_schema.values, decoder) + block_count = await decoder.read_long() + return read_items + + async def skip_map(self, writer_schema, decoder): + block_count = await decoder.read_long() + while block_count != 0: + if block_count < 0: + block_size = await decoder.read_long() + await decoder.skip(block_size) + else: + for _ in range(block_count): + await decoder.skip_utf8() + await self.skip_data(writer_schema.values, decoder) + block_count = await decoder.read_long() + + # A union is encoded by first writing a long value indicating + # the zero-based position within the union of the schema of its value. + # The value is then encoded per the indicated schema within the union. + async def read_union(self, writer_schema, decoder): + # schema resolution + index_of_schema = int(await decoder.read_long()) + if index_of_schema >= len(writer_schema.schemas): + fail_msg = (f"Can't access branch index {index_of_schema} " + f"for union with {len(writer_schema.schemas)} branches") + raise SchemaResolutionException(fail_msg, writer_schema) + selected_writer_schema = writer_schema.schemas[index_of_schema] + + # read data + return await self.read_data(selected_writer_schema, decoder) + + async def skip_union(self, writer_schema, decoder): + index_of_schema = int(await decoder.read_long()) + if index_of_schema >= len(writer_schema.schemas): + fail_msg = (f"Can't access branch index {index_of_schema} " + f"for union with {len(writer_schema.schemas)} branches") + raise SchemaResolutionException(fail_msg, writer_schema) + return await self.skip_data(writer_schema.schemas[index_of_schema], decoder) + + # A record is encoded by encoding the values of its fields + # in the order that they are declared. In other words, a record + # is encoded as just the concatenation of the encodings of its fields. + # Field values are encoded per their schema. + + # Schema Resolution: + # * the ordering of fields may be different: fields are matched by name. + # * schemas for fields with the same name in both records are resolved + # recursively. + # * if the writer's record contains a field with a name not present in the + # reader's record, the writer's value for that field is ignored. + # * if the reader's record schema has a field that contains a default value, + # and writer's schema does not have a field with the same name, then the + # reader should use the default value from its field. + # * if the reader's record schema has a field with no default value, and + # writer's schema does not have a field with the same name, then the + # field's value is unset. + async def read_record(self, writer_schema, decoder): + # schema resolution + read_record = {} + for field in writer_schema.fields: + field_val = await self.read_data(field.type, decoder) + read_record[field.name] = field_val + return read_record + + async def skip_record(self, writer_schema, decoder): + for field in writer_schema.fields: + await self.skip_data(field.type, decoder) diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile.py new file mode 100644 index 00000000..757e0329 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile.py @@ -0,0 +1,257 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# pylint: disable=docstring-missing-return, docstring-missing-rtype + +"""Read/Write Avro File Object Containers.""" + +import io +import logging +import sys +import zlib + +from ..avro import avro_io +from ..avro import schema + +PY3 = sys.version_info[0] == 3 + +logger = logging.getLogger(__name__) + +# ------------------------------------------------------------------------------ +# Constants + +# Version of the container file: +VERSION = 1 + +if PY3: + MAGIC = b'Obj' + bytes([VERSION]) + MAGIC_SIZE = len(MAGIC) +else: + MAGIC = 'Obj' + chr(VERSION) + MAGIC_SIZE = len(MAGIC) + +# Size of the synchronization marker, in number of bytes: +SYNC_SIZE = 16 + +# Schema of the container header: +META_SCHEMA = schema.parse(""" +{ + "type": "record", "name": "org.apache.avro.file.Header", + "fields": [{ + "name": "magic", + "type": {"type": "fixed", "name": "magic", "size": %(magic_size)d} + }, { + "name": "meta", + "type": {"type": "map", "values": "bytes"} + }, { + "name": "sync", + "type": {"type": "fixed", "name": "sync", "size": %(sync_size)d} + }] +} +""" % { + 'magic_size': MAGIC_SIZE, + 'sync_size': SYNC_SIZE, +}) + +# Codecs supported by container files: +VALID_CODECS = frozenset(['null', 'deflate']) + +# Metadata key associated to the schema: +SCHEMA_KEY = "avro.schema" + + +# ------------------------------------------------------------------------------ +# Exceptions + + +class DataFileException(schema.AvroException): + """Problem reading or writing file object containers.""" + +# ------------------------------------------------------------------------------ + + +class DataFileReader(object): # pylint: disable=too-many-instance-attributes + """Read files written by DataFileWriter.""" + + def __init__(self, reader, datum_reader, **kwargs): + """Initializes a new data file reader. + + Args: + reader: Open file to read from. + datum_reader: Avro datum reader. + """ + self._reader = reader + self._raw_decoder = avro_io.BinaryDecoder(reader) + self._header_reader = kwargs.pop('header_reader', None) + self._header_decoder = None if self._header_reader is None else avro_io.BinaryDecoder(self._header_reader) + self._datum_decoder = None # Maybe reset at every block. + self._datum_reader = datum_reader + + # In case self._reader only has partial content(without header). + # seek(0, 0) to make sure read the (partial)content from beginning. + self._reader.seek(0, 0) + + # read the header: magic, meta, sync + self._read_header() + + # ensure codec is valid + avro_codec_raw = self.get_meta('avro.codec') + if avro_codec_raw is None: + self.codec = "null" + else: + self.codec = avro_codec_raw.decode('utf-8') + if self.codec not in VALID_CODECS: + raise DataFileException(f"Unknown codec: {self.codec}.") + + # get ready to read + self._block_count = 0 + + # object_position is to support reading from current position in the future read, + # no need to downloading from the beginning of avro. + if hasattr(self._reader, 'object_position'): + self.reader.track_object_position() + + self._cur_object_index = 0 + # header_reader indicates reader only has partial content. The reader doesn't have block header, + # so we read use the block count stored last time. + # Also ChangeFeed only has codec==null, so use _raw_decoder is good. + if self._header_reader is not None: + self._datum_decoder = self._raw_decoder + + self.datum_reader.writer_schema = ( + schema.parse(self.get_meta(SCHEMA_KEY).decode('utf-8'))) + + def __enter__(self): + return self + + def __exit__(self, data_type, value, traceback): + # Perform a close if there's no exception + if data_type is None: + self.close() + + def __iter__(self): + return self + + # read-only properties + @property + def reader(self): + return self._reader + + @property + def raw_decoder(self): + return self._raw_decoder + + @property + def datum_decoder(self): + return self._datum_decoder + + @property + def datum_reader(self): + return self._datum_reader + + @property + def sync_marker(self): + return self._sync_marker + + @property + def meta(self): + return self._meta + + # read/write properties + @property + def block_count(self): + return self._block_count + + def get_meta(self, key): + """Reports the value of a given metadata key. + + :param str key: Metadata key to report the value of. + :returns: Value associated to the metadata key, as bytes. + :rtype: bytes + """ + return self._meta.get(key) + + def _read_header(self): + header_reader = self._header_reader if self._header_reader else self._reader + header_decoder = self._header_decoder if self._header_decoder else self._raw_decoder + + # seek to the beginning of the file to get magic block + header_reader.seek(0, 0) + + # read header into a dict + header = self.datum_reader.read_data(META_SCHEMA, header_decoder) + + # check magic number + if header.get('magic') != MAGIC: + fail_msg = f"Not an Avro data file: {header.get('magic')} doesn't match {MAGIC!r}." + raise schema.AvroException(fail_msg) + + # set metadata + self._meta = header['meta'] + + # set sync marker + self._sync_marker = header['sync'] + + def _read_block_header(self): + self._block_count = self.raw_decoder.read_long() + if self.codec == "null": + # Skip a long; we don't need to use the length. + self.raw_decoder.skip_long() + self._datum_decoder = self._raw_decoder + elif self.codec == 'deflate': + # Compressed data is stored as (length, data), which + # corresponds to how the "bytes" type is encoded. + data = self.raw_decoder.read_bytes() + # -15 is the log of the window size; negative indicates + # "raw" (no zlib headers) decompression. See zlib.h. + uncompressed = zlib.decompress(data, -15) + self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed)) + else: + raise DataFileException(f"Unknown codec: {self.codec!r}") + + def _skip_sync(self): + """ + Read the length of the sync marker; if it matches the sync marker, + return True. Otherwise, seek back to where we started and return False. + """ + proposed_sync_marker = self.reader.read(SYNC_SIZE) + if SYNC_SIZE > 0 and not proposed_sync_marker: + raise StopIteration + if proposed_sync_marker != self.sync_marker: + self.reader.seek(-SYNC_SIZE, 1) + + def __next__(self): + """Return the next datum in the file.""" + if self.block_count == 0: + self._skip_sync() + + # object_position is to support reading from current position in the future read, + # no need to downloading from the beginning of avro file with this attr. + if hasattr(self._reader, 'object_position'): + self.reader.track_object_position() + self._cur_object_index = 0 + + self._read_block_header() + + datum = self.datum_reader.read(self.datum_decoder) + self._block_count -= 1 + self._cur_object_index += 1 + + # object_position is to support reading from current position in the future read, + # This will track the index of the next item to be read. + # This will also track the offset before the next sync marker. + if hasattr(self._reader, 'object_position'): + if self.block_count == 0: + # the next event to be read is at index 0 in the new chunk of blocks, + self.reader.track_object_position() + self.reader.set_object_index(0) + else: + self.reader.set_object_index(self._cur_object_index) + + return datum + + def close(self): + """Close this reader.""" + self.reader.close() diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile_async.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile_async.py new file mode 100644 index 00000000..85dc5cb5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile_async.py @@ -0,0 +1,210 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# pylint: disable=docstring-missing-return, docstring-missing-rtype + +"""Read/Write Avro File Object Containers.""" + +import logging +import sys + +from ..avro import avro_io_async +from ..avro import schema +from .datafile import DataFileException +from .datafile import MAGIC, SYNC_SIZE, META_SCHEMA, SCHEMA_KEY + + +PY3 = sys.version_info[0] == 3 + +logger = logging.getLogger(__name__) + +# ------------------------------------------------------------------------------ +# Constants + +# Codecs supported by container files: +VALID_CODECS = frozenset(['null']) + + +class AsyncDataFileReader(object): # pylint: disable=too-many-instance-attributes + """Read files written by DataFileWriter.""" + + def __init__(self, reader, datum_reader, **kwargs): + """Initializes a new data file reader. + + Args: + reader: Open file to read from. + datum_reader: Avro datum reader. + """ + self._reader = reader + self._raw_decoder = avro_io_async.AsyncBinaryDecoder(reader) + self._header_reader = kwargs.pop('header_reader', None) + self._header_decoder = None if self._header_reader is None else \ + avro_io_async.AsyncBinaryDecoder(self._header_reader) + self._datum_decoder = None # Maybe reset at every block. + self._datum_reader = datum_reader + self.codec = "null" + self._block_count = 0 + self._cur_object_index = 0 + self._meta = None + self._sync_marker = None + + async def init(self): + # In case self._reader only has partial content(without header). + # seek(0, 0) to make sure read the (partial)content from beginning. + await self._reader.seek(0, 0) + + # read the header: magic, meta, sync + await self._read_header() + + # ensure codec is valid + avro_codec_raw = self.get_meta('avro.codec') + if avro_codec_raw is None: + self.codec = "null" + else: + self.codec = avro_codec_raw.decode('utf-8') + if self.codec not in VALID_CODECS: + raise DataFileException(f"Unknown codec: {self.codec}.") + + # get ready to read + self._block_count = 0 + + # object_position is to support reading from current position in the future read, + # no need to downloading from the beginning of avro. + if hasattr(self._reader, 'object_position'): + self.reader.track_object_position() + + # header_reader indicates reader only has partial content. The reader doesn't have block header, + # so we read use the block count stored last time. + # Also ChangeFeed only has codec==null, so use _raw_decoder is good. + if self._header_reader is not None: + self._datum_decoder = self._raw_decoder + self.datum_reader.writer_schema = ( + schema.parse(self.get_meta(SCHEMA_KEY).decode('utf-8'))) + return self + + async def __aenter__(self): + return self + + async def __aexit__(self, data_type, value, traceback): + # Perform a close if there's no exception + if data_type is None: + self.close() + + def __aiter__(self): + return self + + # read-only properties + @property + def reader(self): + return self._reader + + @property + def raw_decoder(self): + return self._raw_decoder + + @property + def datum_decoder(self): + return self._datum_decoder + + @property + def datum_reader(self): + return self._datum_reader + + @property + def sync_marker(self): + return self._sync_marker + + @property + def meta(self): + return self._meta + + # read/write properties + @property + def block_count(self): + return self._block_count + + def get_meta(self, key): + """Reports the value of a given metadata key. + + :param str key: Metadata key to report the value of. + :returns: Value associated to the metadata key, as bytes. + :rtype: bytes + """ + return self._meta.get(key) + + async def _read_header(self): + header_reader = self._header_reader if self._header_reader else self._reader + header_decoder = self._header_decoder if self._header_decoder else self._raw_decoder + + # seek to the beginning of the file to get magic block + await header_reader.seek(0, 0) + + # read header into a dict + header = await self.datum_reader.read_data(META_SCHEMA, header_decoder) + + # check magic number + if header.get('magic') != MAGIC: + fail_msg = f"Not an Avro data file: {header.get('magic')} doesn't match {MAGIC!r}." + raise schema.AvroException(fail_msg) + + # set metadata + self._meta = header['meta'] + + # set sync marker + self._sync_marker = header['sync'] + + async def _read_block_header(self): + self._block_count = await self.raw_decoder.read_long() + if self.codec == "null": + # Skip a long; we don't need to use the length. + await self.raw_decoder.skip_long() + self._datum_decoder = self._raw_decoder + else: + raise DataFileException(f"Unknown codec: {self.codec!r}") + + async def _skip_sync(self): + """ + Read the length of the sync marker; if it matches the sync marker, + return True. Otherwise, seek back to where we started and return False. + """ + proposed_sync_marker = await self.reader.read(SYNC_SIZE) + if SYNC_SIZE > 0 and not proposed_sync_marker: + raise StopAsyncIteration + if proposed_sync_marker != self.sync_marker: + await self.reader.seek(-SYNC_SIZE, 1) + + async def __anext__(self): + """Return the next datum in the file.""" + if self.block_count == 0: + await self._skip_sync() + + # object_position is to support reading from current position in the future read, + # no need to downloading from the beginning of avro file with this attr. + if hasattr(self._reader, 'object_position'): + await self.reader.track_object_position() + self._cur_object_index = 0 + + await self._read_block_header() + + datum = await self.datum_reader.read(self.datum_decoder) + self._block_count -= 1 + self._cur_object_index += 1 + + # object_position is to support reading from current position in the future read, + # This will track the index of the next item to be read. + # This will also track the offset before the next sync marker. + if hasattr(self._reader, 'object_position'): + if self.block_count == 0: + # the next event to be read is at index 0 in the new chunk of blocks, + await self.reader.track_object_position() + await self.reader.set_object_index(0) + else: + await self.reader.set_object_index(self._cur_object_index) + + return datum + + def close(self): + """Close this reader.""" + self.reader.close() diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/schema.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/schema.py new file mode 100644 index 00000000..d5484abc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/schema.py @@ -0,0 +1,1178 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# pylint: disable=docstring-missing-return, docstring-missing-rtype, too-many-lines + +"""Representation of Avro schemas. + +A schema may be one of: + - A record, mapping field names to field value data; + - An error, equivalent to a record; + - An enum, containing one of a small set of symbols; + - An array of values, all of the same schema; + - A map containing string/value pairs, each of a declared schema; + - A union of other schemas; + - A fixed sized binary object; + - A unicode string; + - A sequence of bytes; + - A 32-bit signed int; + - A 64-bit signed long; + - A 32-bit floating-point float; + - A 64-bit floating-point double; + - A boolean; + - Null. +""" + +import abc +import json +import logging +import re +logger = logging.getLogger(__name__) + +# ------------------------------------------------------------------------------ +# Constants + +# Log level more verbose than DEBUG=10, INFO=20, etc. +DEBUG_VERBOSE = 5 + +NULL = 'null' +BOOLEAN = 'boolean' +STRING = 'string' +BYTES = 'bytes' +INT = 'int' +LONG = 'long' +FLOAT = 'float' +DOUBLE = 'double' +FIXED = 'fixed' +ENUM = 'enum' +RECORD = 'record' +ERROR = 'error' +ARRAY = 'array' +MAP = 'map' +UNION = 'union' + +# Request and error unions are part of Avro protocols: +REQUEST = 'request' +ERROR_UNION = 'error_union' + +PRIMITIVE_TYPES = frozenset([ + NULL, + BOOLEAN, + STRING, + BYTES, + INT, + LONG, + FLOAT, + DOUBLE, +]) + +NAMED_TYPES = frozenset([ + FIXED, + ENUM, + RECORD, + ERROR, +]) + +VALID_TYPES = frozenset.union( + PRIMITIVE_TYPES, + NAMED_TYPES, + [ + ARRAY, + MAP, + UNION, + REQUEST, + ERROR_UNION, + ], +) + +SCHEMA_RESERVED_PROPS = frozenset([ + 'type', + 'name', + 'namespace', + 'fields', # Record + 'items', # Array + 'size', # Fixed + 'symbols', # Enum + 'values', # Map + 'doc', +]) + +FIELD_RESERVED_PROPS = frozenset([ + 'default', + 'name', + 'doc', + 'order', + 'type', +]) + +VALID_FIELD_SORT_ORDERS = frozenset([ + 'ascending', + 'descending', + 'ignore', +]) + + +# ------------------------------------------------------------------------------ +# Exceptions + + +class Error(Exception): + """Base class for errors in this module.""" + + +class AvroException(Error): + """Generic Avro schema error.""" + + +class SchemaParseException(AvroException): + """Error while parsing a JSON schema descriptor.""" + + +class Schema(metaclass=abc.ABCMeta): + """Abstract base class for all Schema classes.""" + + def __init__(self, data_type, other_props=None): + """Initializes a new schema object. + + Args: + data_type: Type of the schema to initialize. + other_props: Optional dictionary of additional properties. + """ + if data_type not in VALID_TYPES: + raise SchemaParseException(f'{data_type!r} is not a valid Avro type.') + + # All properties of this schema, as a map: property name -> property value + self._props = {} + + self._props['type'] = data_type + self._type = data_type + + if other_props: + self._props.update(other_props) + + @property + def namespace(self): + """Returns: the namespace this schema belongs to, if any, or None.""" + return self._props.get('namespace', None) + + @property + def type(self): + """Returns: the type of this schema.""" + return self._type + + @property + def doc(self): + """Returns: the documentation associated to this schema, if any, or None.""" + return self._props.get('doc', None) + + @property + def props(self): + """Reports all the properties of this schema. + + Includes all properties, reserved and non reserved. + JSON properties of this schema are directly generated from this dict. + + Returns: + A dictionary of properties associated to this schema. + """ + return self._props + + @property + def other_props(self): + """Returns: the dictionary of non-reserved properties.""" + return dict(filter_keys_out(items=self._props, keys=SCHEMA_RESERVED_PROPS)) + + def __str__(self): + """Returns: the JSON representation of this schema.""" + return json.dumps(self.to_json(names=None)) + + # Converts the schema object into its AVRO specification representation. + + # Schema types that have names (records, enums, and fixed) must be aware of not + # re-defining schemas that are already listed in the parameter names. + @abc.abstractmethod + def to_json(self, names): + ... + + +# ------------------------------------------------------------------------------ + + +_RE_NAME = re.compile(r'[A-Za-z_][A-Za-z0-9_]*') + +_RE_FULL_NAME = re.compile( + r'^' + r'[.]?(?:[A-Za-z_][A-Za-z0-9_]*[.])*' # optional namespace + r'([A-Za-z_][A-Za-z0-9_]*)' # name + r'$' +) + + +class Name(object): + """Representation of an Avro name.""" + + def __init__(self, name, namespace=None): + """Parses an Avro name. + + Args: + name: Avro name to parse (relative or absolute). + namespace: Optional explicit namespace if the name is relative. + """ + # Normalize: namespace is always defined as a string, possibly empty. + if namespace is None: + namespace = '' + + if '.' in name: + # name is absolute, namespace is ignored: + self._fullname = name + + match = _RE_FULL_NAME.match(self._fullname) + if match is None: + raise SchemaParseException( + f'Invalid absolute schema name: {self._fullname!r}.') + + self._name = match.group(1) + self._namespace = self._fullname[:-(len(self._name) + 1)] + + else: + # name is relative, combine with explicit namespace: + self._name = name + self._namespace = namespace + self._fullname = (self._name + if (not self._namespace) else + f'{self._namespace}.{self._name}') + + # Validate the fullname: + if _RE_FULL_NAME.match(self._fullname) is None: + raise SchemaParseException(f"Invalid schema name {self._fullname!r} inferred from " + f"name {self._name!r} and namespace {self._namespace!r}.") + + def __eq__(self, other): + if not isinstance(other, Name): + return NotImplemented + return self.fullname == other.fullname + + @property + def simple_name(self): + """Returns: the simple name part of this name.""" + return self._name + + @property + def namespace(self): + """Returns: this name's namespace, possible the empty string.""" + return self._namespace + + @property + def fullname(self): + """Returns: the full name.""" + return self._fullname + + +# ------------------------------------------------------------------------------ + + +class Names(object): + """Tracks Avro named schemas and default namespace during parsing.""" + + def __init__(self, default_namespace=None, names=None): + """Initializes a new name tracker. + + Args: + default_namespace: Optional default namespace. + names: Optional initial mapping of known named schemas. + """ + if names is None: + names = {} + self._names = names + self._default_namespace = default_namespace + + @property + def names(self): + """Returns: the mapping of known named schemas.""" + return self._names + + @property + def default_namespace(self): + """Returns: the default namespace, if any, or None.""" + return self._default_namespace + + def new_with_default_namespace(self, namespace): + """Creates a new name tracker from this tracker, but with a new default ns. + + :param Any namespace: New default namespace to use. + :returns: New name tracker with the specified default namespace. + :rtype: Names + """ + return Names(names=self._names, default_namespace=namespace) + + def get_name(self, name, namespace=None): + """Resolves the Avro name according to this name tracker's state. + + :param Any name: Name to resolve (absolute or relative). + :param Optional[Any] namespace: Optional explicit namespace. + :returns: The specified name, resolved according to this tracker. + :rtype: Name + """ + if namespace is None: + namespace = self._default_namespace + return Name(name=name, namespace=namespace) + + def get_schema(self, name, namespace=None): + """Resolves an Avro schema by name. + + :param Any name: Name (absolute or relative) of the Avro schema to look up. + :param Optional[Any] namespace: Optional explicit namespace. + :returns: The schema with the specified name, if any, or None + :rtype: Union[Any, None] + """ + avro_name = self.get_name(name=name, namespace=namespace) + return self._names.get(avro_name.fullname, None) + + # Given a properties, return properties with namespace removed if it matches the own default namespace + def prune_namespace(self, properties): + if self.default_namespace is None: + # I have no default -- no change + return properties + if 'namespace' not in properties: + # he has no namespace - no change + return properties + if properties['namespace'] != self.default_namespace: + # we're different - leave his stuff alone + return properties + # we each have a namespace and it's redundant. delete his. + prunable = properties.copy() + del prunable['namespace'] + return prunable + + def register(self, schema): + """Registers a new named schema in this tracker. + + :param Any schema: Named Avro schema to register in this tracker. + """ + if schema.fullname in VALID_TYPES: + raise SchemaParseException( + f'{schema.fullname} is a reserved type name.') + if schema.fullname in self.names: + raise SchemaParseException( + f'Avro name {schema.fullname!r} already exists.') + + logger.log(DEBUG_VERBOSE, 'Register new name for %r', schema.fullname) + self._names[schema.fullname] = schema + + +# ------------------------------------------------------------------------------ + + +class NamedSchema(Schema): + """Abstract base class for named schemas. + + Named schemas are enumerated in NAMED_TYPES. + """ + + def __init__( + self, + data_type, + name=None, + namespace=None, + names=None, + other_props=None, + ): + """Initializes a new named schema object. + + Args: + data_type: Type of the named schema. + name: Name (absolute or relative) of the schema. + namespace: Optional explicit namespace if name is relative. + names: Tracker to resolve and register Avro names. + other_props: Optional map of additional properties of the schema. + """ + assert (data_type in NAMED_TYPES), (f'Invalid named type: {data_type!r}') + self._avro_name = names.get_name(name=name, namespace=namespace) + + super(NamedSchema, self).__init__(data_type, other_props) + + names.register(self) + + self._props['name'] = self.name + if self.namespace: + self._props['namespace'] = self.namespace + + @property + def avro_name(self): + """Returns: the Name object describing this schema's name.""" + return self._avro_name + + @property + def name(self): + return self._avro_name.simple_name + + @property + def namespace(self): + return self._avro_name.namespace + + @property + def fullname(self): + return self._avro_name.fullname + + def name_ref(self, names): + """Reports this schema name relative to the specified name tracker. + + :param Any names: Avro name tracker to relativize this schema name against. + :returns: This schema name, relativized against the specified name tracker. + :rtype: Any + """ + if self.namespace == names.default_namespace: + return self.name + return self.fullname + + # Converts the schema object into its AVRO specification representation. + + # Schema types that have names (records, enums, and fixed) must be aware + # of not re-defining schemas that are already listed in the parameter names. + @abc.abstractmethod + def to_json(self, names): + ... + +# ------------------------------------------------------------------------------ + + +_NO_DEFAULT = object() + + +class Field(object): + """Representation of the schema of a field in a record.""" + + def __init__( + self, + data_type, + name, + index, + has_default, + default=_NO_DEFAULT, + order=None, + doc=None, + other_props=None + ): + """Initializes a new Field object. + + Args: + data_type: Avro schema of the field. + name: Name of the field. + index: 0-based position of the field. + has_default: + default: + order: + doc: + other_props: + """ + if (not isinstance(name, str)) or (not name): + raise SchemaParseException(f'Invalid record field name: {name!r}.') + if (order is not None) and (order not in VALID_FIELD_SORT_ORDERS): + raise SchemaParseException(f'Invalid record field order: {order!r}.') + + # All properties of this record field: + self._props = {} + + self._has_default = has_default + if other_props: + self._props.update(other_props) + + self._index = index + self._type = self._props['type'] = data_type + self._name = self._props['name'] = name + + if has_default: + self._props['default'] = default + + if order is not None: + self._props['order'] = order + + if doc is not None: + self._props['doc'] = doc + + @property + def type(self): + """Returns: the schema of this field.""" + return self._type + + @property + def name(self): + """Returns: this field name.""" + return self._name + + @property + def index(self): + """Returns: the 0-based index of this field in the record.""" + return self._index + + @property + def default(self): + return self._props['default'] + + @property + def has_default(self): + return self._has_default + + @property + def order(self): + return self._props.get('order', None) + + @property + def doc(self): + return self._props.get('doc', None) + + @property + def props(self): + return self._props + + @property + def other_props(self): + return filter_keys_out(items=self._props, keys=FIELD_RESERVED_PROPS) + + def __str__(self): + return json.dumps(self.to_json()) + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = self.props.copy() + to_dump['type'] = self.type.to_json(names) + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ +# Primitive Types + + +class PrimitiveSchema(Schema): + """Schema of a primitive Avro type. + + Valid primitive types are defined in PRIMITIVE_TYPES. + """ + + def __init__(self, data_type, other_props=None): + """Initializes a new schema object for the specified primitive type. + + Args: + data_type: Type of the schema to construct. Must be primitive. + """ + if data_type not in PRIMITIVE_TYPES: + raise AvroException(f'{data_type!r} is not a valid primitive type.') + super(PrimitiveSchema, self).__init__(data_type, other_props=other_props) + + @property + def name(self): + """Returns: the simple name of this schema.""" + # The name of a primitive type is the type itself. + return self.type + + @property + def fullname(self): + """Returns: the fully qualified name of this schema.""" + # The full name is the simple name for primitive schema. + return self.name + + def to_json(self, names=None): + if len(self.props) == 1: + return self.fullname + return self.props + + def __eq__(self, that): + return self.props == that.props + + +# ------------------------------------------------------------------------------ +# Complex Types (non-recursive) + + +class FixedSchema(NamedSchema): + def __init__( + self, + name, + namespace, + size, + names=None, + other_props=None, + ): + # Ensure valid ctor args + if not isinstance(size, int): + fail_msg = 'Fixed Schema requires a valid integer for size property.' + raise AvroException(fail_msg) + + super(FixedSchema, self).__init__( + data_type=FIXED, + name=name, + namespace=namespace, + names=names, + other_props=other_props, + ) + self._props['size'] = size + + @property + def size(self): + """Returns: the size of this fixed schema, in bytes.""" + return self._props['size'] + + def to_json(self, names=None): + if names is None: + names = Names() + if self.fullname in names.names: + return self.name_ref(names) + names.names[self.fullname] = self + return names.prune_namespace(self.props) + + def __eq__(self, that): + return self.props == that.props + + +# ------------------------------------------------------------------------------ + + +class EnumSchema(NamedSchema): + def __init__( + self, + name, + namespace, + symbols, + names=None, + doc=None, + other_props=None, + ): + """Initializes a new enumeration schema object. + + Args: + name: Simple name of this enumeration. + namespace: Optional namespace. + symbols: Ordered list of symbols defined in this enumeration. + names: + doc: + other_props: + """ + symbols = tuple(symbols) + symbol_set = frozenset(symbols) + if (len(symbol_set) != len(symbols) + or not all(map(lambda symbol: isinstance(symbol, str), symbols))): + raise AvroException( + f'Invalid symbols for enum schema: {symbols!r}.') + + super(EnumSchema, self).__init__( + data_type=ENUM, + name=name, + namespace=namespace, + names=names, + other_props=other_props, + ) + + self._props['symbols'] = symbols + if doc is not None: + self._props['doc'] = doc + + @property + def symbols(self): + """Returns: the symbols defined in this enum.""" + return self._props['symbols'] + + def to_json(self, names=None): + if names is None: + names = Names() + if self.fullname in names.names: + return self.name_ref(names) + names.names[self.fullname] = self + return names.prune_namespace(self.props) + + def __eq__(self, that): + return self.props == that.props + + +# ------------------------------------------------------------------------------ +# Complex Types (recursive) + + +class ArraySchema(Schema): + """Schema of an array.""" + + def __init__(self, items, other_props=None): + """Initializes a new array schema object. + + Args: + items: Avro schema of the array items. + other_props: + """ + super(ArraySchema, self).__init__( + data_type=ARRAY, + other_props=other_props, + ) + self._items_schema = items + self._props['items'] = items + + @property + def items(self): + """Returns: the schema of the items in this array.""" + return self._items_schema + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = self.props.copy() + item_schema = self.items + to_dump['items'] = item_schema.to_json(names) + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ + + +class MapSchema(Schema): + """Schema of a map.""" + + def __init__(self, values, other_props=None): + """Initializes a new map schema object. + + Args: + values: Avro schema of the map values. + other_props: + """ + super(MapSchema, self).__init__( + data_type=MAP, + other_props=other_props, + ) + self._values_schema = values + self._props['values'] = values + + @property + def values(self): + """Returns: the schema of the values in this map.""" + return self._values_schema + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = self.props.copy() + to_dump['values'] = self.values.to_json(names) + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ + + +class UnionSchema(Schema): + """Schema of a union.""" + + def __init__(self, schemas): + """Initializes a new union schema object. + + Args: + schemas: Ordered collection of schema branches in the union. + """ + super(UnionSchema, self).__init__(data_type=UNION) + self._schemas = tuple(schemas) + + # Validate the schema branches: + + # All named schema names are unique: + named_branches = tuple( + filter(lambda schema: schema.type in NAMED_TYPES, self._schemas)) + unique_names = frozenset(map(lambda schema: schema.fullname, named_branches)) + if len(unique_names) != len(named_branches): + schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas)) + raise AvroException(f'Invalid union branches with duplicate schema name:{schemas}') + + # Types are unique within unnamed schemas, and union is not allowed: + unnamed_branches = tuple( + filter(lambda schema: schema.type not in NAMED_TYPES, self._schemas)) + unique_types = frozenset(map(lambda schema: schema.type, unnamed_branches)) + if UNION in unique_types: + schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas)) + raise AvroException(f'Invalid union branches contain other unions:{schemas}') + if len(unique_types) != len(unnamed_branches): + schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas)) + raise AvroException(f'Invalid union branches with duplicate type:{schemas}') + + @property + def schemas(self): + """Returns: the ordered list of schema branches in the union.""" + return self._schemas + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = [] + for schema in self.schemas: + to_dump.append(schema.to_json(names)) + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ + + +class ErrorUnionSchema(UnionSchema): + """Schema representing the declared errors of a protocol message.""" + + def __init__(self, schemas): + """Initializes an error-union schema. + + Args: + schema: collection of error schema. + """ + # Prepend "string" to handle system errors + schemas = [PrimitiveSchema(data_type=STRING)] + list(schemas) + super(ErrorUnionSchema, self).__init__(schemas=schemas) + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = [] + for schema in self.schemas: + # Don't print the system error schema + if schema.type == STRING: + continue + to_dump.append(schema.to_json(names)) + return to_dump + + +# ------------------------------------------------------------------------------ + + +class RecordSchema(NamedSchema): + """Schema of a record.""" + + @staticmethod + def _make_field(index, field_desc, names): + """Builds field schemas from a list of field JSON descriptors. + + :param int index: 0-based index of the field in the record. + :param Any field_desc: JSON descriptors of a record field. + :param Any names: The names for this schema. + :returns: The field schema. + :rtype: Field + """ + field_schema = schema_from_json_data( + json_data=field_desc['type'], + names=names, + ) + other_props = ( + dict(filter_keys_out(items=field_desc, keys=FIELD_RESERVED_PROPS))) + return Field( + data_type=field_schema, + name=field_desc['name'], + index=index, + has_default=('default' in field_desc), + default=field_desc.get('default', _NO_DEFAULT), + order=field_desc.get('order', None), + doc=field_desc.get('doc', None), + other_props=other_props, + ) + + @staticmethod + def make_field_list(field_desc_list, names): + """Builds field schemas from a list of field JSON descriptors. + Guarantees field name unicity. + + :param Any field_desc_list: Collection of field JSON descriptors. + :param Any names: The names for this schema. + :returns: Field schemas. + :rtype: Field + """ + for index, field_desc in enumerate(field_desc_list): + yield RecordSchema._make_field(index, field_desc, names) + + @staticmethod + def _make_field_map(fields): + """Builds the field map. + Guarantees field name unicity. + + :param Any fields: Iterable of field schema. + :returns: A map of field schemas, indexed by name. + :rtype: Dict[Any, Any] + """ + field_map = {} + for field in fields: + if field.name in field_map: + raise SchemaParseException( + f'Duplicate record field name {field.name!r}.') + field_map[field.name] = field + return field_map + + def __init__( + self, + name, + namespace, + fields=None, + make_fields=None, + names=None, + record_type=RECORD, + doc=None, + other_props=None + ): + """Initializes a new record schema object. + + Args: + name: Name of the record (absolute or relative). + namespace: Optional namespace the record belongs to, if name is relative. + fields: collection of fields to add to this record. + Exactly one of fields or make_fields must be specified. + make_fields: function creating the fields that belong to the record. + The function signature is: make_fields(names) -> ordered field list. + Exactly one of fields or make_fields must be specified. + names: + record_type: Type of the record: one of RECORD, ERROR or REQUEST. + Protocol requests are not named. + doc: + other_props: + """ + if record_type == REQUEST: + # Protocol requests are not named: + super(RecordSchema, self).__init__( + data_type=REQUEST, + other_props=other_props, + ) + elif record_type in [RECORD, ERROR]: + # Register this record name in the tracker: + super(RecordSchema, self).__init__( + data_type=record_type, + name=name, + namespace=namespace, + names=names, + other_props=other_props, + ) + else: + raise SchemaParseException( + f'Invalid record type: {record_type!r}.') + + nested_names = [] + if record_type in [RECORD, ERROR]: + avro_name = names.get_name(name=name, namespace=namespace) + nested_names = names.new_with_default_namespace(namespace=avro_name.namespace) + elif record_type == REQUEST: + # Protocol request has no name: no need to change default namespace: + nested_names = names + + if fields is None: + fields = make_fields(names=nested_names) + else: + assert make_fields is None + self._fields = tuple(fields) + + self._field_map = RecordSchema._make_field_map(self._fields) + + self._props['fields'] = fields + if doc is not None: + self._props['doc'] = doc + + @property + def fields(self): + """Returns: the field schemas, as an ordered tuple.""" + return self._fields + + @property + def field_map(self): + """Returns: a read-only map of the field schemas index by field names.""" + return self._field_map + + def to_json(self, names=None): + if names is None: + names = Names() + # Request records don't have names + if self.type == REQUEST: + return [f.to_json(names) for f in self.fields] + + if self.fullname in names.names: + return self.name_ref(names) + names.names[self.fullname] = self + + to_dump = names.prune_namespace(self.props.copy()) + to_dump['fields'] = [f.to_json(names) for f in self.fields] + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ +# Module functions + + +def filter_keys_out(items, keys): + """Filters a collection of (key, value) items. + Exclude any item whose key belongs to keys. + + :param Dict[Any, Any] items: Dictionary of items to filter the keys out of. + :param Dict[Any, Any] keys: Dictionary of keys to filter the extracted keys against. + :returns: Filtered items. + :rtype: Tuple(Any, Any) + """ + for key, value in items.items(): + if key in keys: + continue + yield key, value + + +# ------------------------------------------------------------------------------ + + +def _schema_from_json_string(json_string, names): + if json_string in PRIMITIVE_TYPES: + return PrimitiveSchema(data_type=json_string) + + # Look for a known named schema: + schema = names.get_schema(name=json_string) + if schema is None: + raise SchemaParseException(f"Unknown named schema {json_string!r}, known names: {sorted(names.names)!r}.") + return schema + + +def _schema_from_json_array(json_array, names): + def MakeSchema(desc): + return schema_from_json_data(json_data=desc, names=names) + + return UnionSchema(map(MakeSchema, json_array)) + + +def _schema_from_json_object(json_object, names): + data_type = json_object.get('type') + if data_type is None: + raise SchemaParseException( + f'Avro schema JSON descriptor has no "type" property: {json_object!r}') + + other_props = dict( + filter_keys_out(items=json_object, keys=SCHEMA_RESERVED_PROPS)) + + if data_type in PRIMITIVE_TYPES: + # FIXME should not ignore other properties + result = PrimitiveSchema(data_type, other_props=other_props) + + elif data_type in NAMED_TYPES: + name = json_object.get('name') + namespace = json_object.get('namespace', names.default_namespace) + if data_type == FIXED: + size = json_object.get('size') + result = FixedSchema(name, namespace, size, names, other_props) + elif data_type == ENUM: + symbols = json_object.get('symbols') + doc = json_object.get('doc') + result = EnumSchema(name, namespace, symbols, names, doc, other_props) + + elif data_type in [RECORD, ERROR]: + field_desc_list = json_object.get('fields', ()) + + def MakeFields(names): + return tuple(RecordSchema.make_field_list(field_desc_list, names)) + + result = RecordSchema( + name=name, + namespace=namespace, + make_fields=MakeFields, + names=names, + record_type=data_type, + doc=json_object.get('doc'), + other_props=other_props, + ) + else: + raise ValueError(f'Internal error: unknown type {data_type!r}.') + + elif data_type in VALID_TYPES: + # Unnamed, non-primitive Avro type: + + if data_type == ARRAY: + items_desc = json_object.get('items') + if items_desc is None: + raise SchemaParseException(f'Invalid array schema descriptor with no "items" : {json_object!r}.') + result = ArraySchema( + items=schema_from_json_data(items_desc, names), + other_props=other_props, + ) + + elif data_type == MAP: + values_desc = json_object.get('values') + if values_desc is None: + raise SchemaParseException(f'Invalid map schema descriptor with no "values" : {json_object!r}.') + result = MapSchema( + values=schema_from_json_data(values_desc, names=names), + other_props=other_props, + ) + + elif data_type == ERROR_UNION: + error_desc_list = json_object.get('declared_errors') + assert error_desc_list is not None + error_schemas = map( + lambda desc: schema_from_json_data(desc, names=names), + error_desc_list) + result = ErrorUnionSchema(schemas=error_schemas) + + else: + raise ValueError(f'Internal error: unknown type {data_type!r}.') + else: + raise SchemaParseException(f'Invalid JSON descriptor for an Avro schema: {json_object!r}') + return result + + +# Parsers for the JSON data types: +_JSONDataParserTypeMap = { + str: _schema_from_json_string, + list: _schema_from_json_array, + dict: _schema_from_json_object, +} + + +def schema_from_json_data(json_data, names=None): + """Builds an Avro Schema from its JSON descriptor. + Raises SchemaParseException if the descriptor is invalid. + + :param Any json_data: JSON data representing the descriptor of the Avro schema. + :param Any names: Optional tracker for Avro named schemas. + :returns: The Avro schema parsed from the JSON descriptor. + :rtype: Any + """ + if names is None: + names = Names() + + # Select the appropriate parser based on the JSON data type: + parser = _JSONDataParserTypeMap.get(type(json_data)) + if parser is None: + raise SchemaParseException( + f'Invalid JSON descriptor for an Avro schema: {json_data!r}.') + return parser(json_data, names=names) + + +# ------------------------------------------------------------------------------ + + +def parse(json_string): + """Constructs a Schema from its JSON descriptor in text form. + Raises SchemaParseException if a JSON parsing error is met, or if the JSON descriptor is invalid. + + :param str json_string: String representation of the JSON descriptor of the schema. + :returns: The parsed schema. + :rtype: Any + """ + try: + json_data = json.loads(json_string) + except Exception as exn: + raise SchemaParseException( + f'Error parsing schema from JSON: {json_string!r}. ' + f'Error message: {exn!r}.') from exn + + # Initialize the names object + names = Names() + + # construct the Avro Schema object + return schema_from_json_data(json_data, names) |
