about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/__init__.py5
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io.py435
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io_async.py419
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile.py257
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile_async.py210
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/schema.py1178
6 files changed, 2504 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/__init__.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/__init__.py
new file mode 100644
index 00000000..5b396cd2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/__init__.py
@@ -0,0 +1,5 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io.py
new file mode 100644
index 00000000..3e46f1fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io.py
@@ -0,0 +1,435 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+# pylint: disable=docstring-missing-return, docstring-missing-rtype
+
+"""Input/output utilities.
+
+Includes:
+ - i/o-specific constants
+ - i/o-specific exceptions
+ - schema validation
+ - leaf value encoding and decoding
+ - datum reader/writer stuff (?)
+
+Also includes a generic representation for data, which uses the
+following mapping:
+ - Schema records are implemented as dict.
+ - Schema arrays are implemented as list.
+ - Schema maps are implemented as dict.
+ - Schema strings are implemented as unicode.
+ - Schema bytes are implemented as str.
+ - Schema ints are implemented as int.
+ - Schema longs are implemented as long.
+ - Schema floats are implemented as float.
+ - Schema doubles are implemented as float.
+ - Schema booleans are implemented as bool.
+"""
+
+import json
+import logging
+import struct
+import sys
+
+from ..avro import schema
+
+PY3 = sys.version_info[0] == 3
+
+logger = logging.getLogger(__name__)
+
+# ------------------------------------------------------------------------------
+# Constants
+
+STRUCT_FLOAT = struct.Struct('<f')  # little-endian float
+STRUCT_DOUBLE = struct.Struct('<d')  # little-endian double
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class SchemaResolutionException(schema.AvroException):
+    def __init__(self, fail_msg, writer_schema=None):
+        pretty_writers = json.dumps(json.loads(str(writer_schema)), indent=2)
+        if writer_schema:
+            fail_msg += f"\nWriter's Schema: {pretty_writers}"
+        schema.AvroException.__init__(self, fail_msg)
+
+# ------------------------------------------------------------------------------
+# Decoder
+
+
+class BinaryDecoder(object):
+    """Read leaf values."""
+
+    def __init__(self, reader):
+        """
+        reader is a Python object on which we can call read, seek, and tell.
+        """
+        self._reader = reader
+
+    @property
+    def reader(self):
+        """Reports the reader used by this decoder."""
+        return self._reader
+
+    def read(self, n):
+        """Read n bytes.
+
+        :param int n: Number of bytes to read.
+        :returns: The next n bytes from the input.
+        :rtype: bytes
+        """
+        assert (n >= 0), n
+        input_bytes = self.reader.read(n)
+        if n > 0 and not input_bytes:
+            raise StopIteration
+        assert (len(input_bytes) == n), input_bytes
+        return input_bytes
+
+    @staticmethod
+    def read_null():
+        """
+        null is written as zero bytes
+        """
+        return None
+
+    def read_boolean(self):
+        """
+        a boolean is written as a single byte
+        whose value is either 0 (false) or 1 (true).
+        """
+        b = ord(self.read(1))
+        if b == 1:
+            return True
+        if b == 0:
+            return False
+        fail_msg = f"Invalid value for boolean: {b}"
+        raise schema.AvroException(fail_msg)
+
+    def read_int(self):
+        """
+        int and long values are written using variable-length, zig-zag coding.
+        """
+        return self.read_long()
+
+    def read_long(self):
+        """
+        int and long values are written using variable-length, zig-zag coding.
+        """
+        b = ord(self.read(1))
+        n = b & 0x7F
+        shift = 7
+        while (b & 0x80) != 0:
+            b = ord(self.read(1))
+            n |= (b & 0x7F) << shift
+            shift += 7
+        datum = (n >> 1) ^ -(n & 1)
+        return datum
+
+    def read_float(self):
+        """
+        A float is written as 4 bytes.
+        The float is converted into a 32-bit integer using a method equivalent to
+        Java's floatToIntBits and then encoded in little-endian format.
+        """
+        return STRUCT_FLOAT.unpack(self.read(4))[0]
+
+    def read_double(self):
+        """
+        A double is written as 8 bytes.
+        The double is converted into a 64-bit integer using a method equivalent to
+        Java's doubleToLongBits and then encoded in little-endian format.
+        """
+        return STRUCT_DOUBLE.unpack(self.read(8))[0]
+
+    def read_bytes(self):
+        """
+        Bytes are encoded as a long followed by that many bytes of data.
+        """
+        nbytes = self.read_long()
+        assert (nbytes >= 0), nbytes
+        return self.read(nbytes)
+
+    def read_utf8(self):
+        """
+        A string is encoded as a long followed by
+        that many bytes of UTF-8 encoded character data.
+        """
+        input_bytes = self.read_bytes()
+        if PY3:
+            try:
+                return input_bytes.decode('utf-8')
+            except UnicodeDecodeError as exn:
+                logger.error('Invalid UTF-8 input bytes: %r', input_bytes)
+                raise exn
+        else:
+            # PY2
+            return unicode(input_bytes, "utf-8") # pylint: disable=undefined-variable
+
+    def skip_null(self):
+        pass
+
+    def skip_boolean(self):
+        self.skip(1)
+
+    def skip_int(self):
+        self.skip_long()
+
+    def skip_long(self):
+        b = ord(self.read(1))
+        while (b & 0x80) != 0:
+            b = ord(self.read(1))
+
+    def skip_float(self):
+        self.skip(4)
+
+    def skip_double(self):
+        self.skip(8)
+
+    def skip_bytes(self):
+        self.skip(self.read_long())
+
+    def skip_utf8(self):
+        self.skip_bytes()
+
+    def skip(self, n):
+        self.reader.seek(self.reader.tell() + n)
+
+
+# ------------------------------------------------------------------------------
+# DatumReader
+
+
+class DatumReader(object):
+    """Deserialize Avro-encoded data into a Python data structure."""
+
+    def __init__(self, writer_schema=None):
+        """
+        As defined in the Avro specification, we call the schema encoded
+        in the data the "writer's schema".
+        """
+        self._writer_schema = writer_schema
+
+    # read/write properties
+    def set_writer_schema(self, writer_schema):
+        self._writer_schema = writer_schema
+
+    writer_schema = property(lambda self: self._writer_schema,
+                             set_writer_schema)
+
+    def read(self, decoder):
+        return self.read_data(self.writer_schema, decoder)
+
+    def read_data(self, writer_schema, decoder):
+        # function dispatch for reading data based on type of writer's schema
+        if writer_schema.type == 'null':
+            result = decoder.read_null()
+        elif writer_schema.type == 'boolean':
+            result = decoder.read_boolean()
+        elif writer_schema.type == 'string':
+            result = decoder.read_utf8()
+        elif writer_schema.type == 'int':
+            result = decoder.read_int()
+        elif writer_schema.type == 'long':
+            result = decoder.read_long()
+        elif writer_schema.type == 'float':
+            result = decoder.read_float()
+        elif writer_schema.type == 'double':
+            result = decoder.read_double()
+        elif writer_schema.type == 'bytes':
+            result = decoder.read_bytes()
+        elif writer_schema.type == 'fixed':
+            result = self.read_fixed(writer_schema, decoder)
+        elif writer_schema.type == 'enum':
+            result = self.read_enum(writer_schema, decoder)
+        elif writer_schema.type == 'array':
+            result = self.read_array(writer_schema, decoder)
+        elif writer_schema.type == 'map':
+            result = self.read_map(writer_schema, decoder)
+        elif writer_schema.type in ['union', 'error_union']:
+            result = self.read_union(writer_schema, decoder)
+        elif writer_schema.type in ['record', 'error', 'request']:
+            result = self.read_record(writer_schema, decoder)
+        else:
+            fail_msg = f"Cannot read unknown schema type: {writer_schema.type}"
+            raise schema.AvroException(fail_msg)
+        return result
+
+    def skip_data(self, writer_schema, decoder):
+        if writer_schema.type == 'null':
+            result = decoder.skip_null()
+        elif writer_schema.type == 'boolean':
+            result = decoder.skip_boolean()
+        elif writer_schema.type == 'string':
+            result = decoder.skip_utf8()
+        elif writer_schema.type == 'int':
+            result = decoder.skip_int()
+        elif writer_schema.type == 'long':
+            result = decoder.skip_long()
+        elif writer_schema.type == 'float':
+            result = decoder.skip_float()
+        elif writer_schema.type == 'double':
+            result = decoder.skip_double()
+        elif writer_schema.type == 'bytes':
+            result = decoder.skip_bytes()
+        elif writer_schema.type == 'fixed':
+            result = self.skip_fixed(writer_schema, decoder)
+        elif writer_schema.type == 'enum':
+            result = self.skip_enum(decoder)
+        elif writer_schema.type == 'array':
+            self.skip_array(writer_schema, decoder)
+            result = None
+        elif writer_schema.type == 'map':
+            self.skip_map(writer_schema, decoder)
+            result = None
+        elif writer_schema.type in ['union', 'error_union']:
+            result = self.skip_union(writer_schema, decoder)
+        elif writer_schema.type in ['record', 'error', 'request']:
+            self.skip_record(writer_schema, decoder)
+            result = None
+        else:
+            fail_msg = f"Unknown schema type: {writer_schema.type}"
+            raise schema.AvroException(fail_msg)
+        return result
+
+    # Fixed instances are encoded using the number of bytes declared in the schema.
+    @staticmethod
+    def read_fixed(writer_schema, decoder):
+        return decoder.read(writer_schema.size)
+
+    @staticmethod
+    def skip_fixed(writer_schema, decoder):
+        return decoder.skip(writer_schema.size)
+
+    # An enum is encoded by a int, representing the zero-based position of the symbol in the schema.
+    @staticmethod
+    def read_enum(writer_schema, decoder):
+        # read data
+        index_of_symbol = decoder.read_int()
+        if index_of_symbol >= len(writer_schema.symbols):
+            fail_msg = f"Can't access enum index {index_of_symbol} for enum with {len(writer_schema.symbols)} symbols"
+            raise SchemaResolutionException(fail_msg, writer_schema)
+        read_symbol = writer_schema.symbols[index_of_symbol]
+        return read_symbol
+
+    @staticmethod
+    def skip_enum(decoder):
+        return decoder.skip_int()
+
+    # Arrays are encoded as a series of blocks.
+
+    # Each block consists of a long count value, followed by that many array items.
+    # A block with count zero indicates the end of the array. Each item is encoded per the array's item schema.
+
+    # If a block's count is negative, then the count is followed immediately by a long block size,
+    # indicating the number of bytes in the block.
+    # The actual count in this case is the absolute value of the count written.
+    def read_array(self, writer_schema, decoder):
+        read_items = []
+        block_count = decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                decoder.read_long()
+            for _ in range(block_count):
+                read_items.append(self.read_data(writer_schema.items, decoder))
+            block_count = decoder.read_long()
+        return read_items
+
+    def skip_array(self, writer_schema, decoder):
+        block_count = decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_size = decoder.read_long()
+                decoder.skip(block_size)
+            else:
+                for _ in range(block_count):
+                    self.skip_data(writer_schema.items, decoder)
+            block_count = decoder.read_long()
+
+    # Maps are encoded as a series of blocks.
+
+    # Each block consists of a long count value, followed by that many key/value pairs.
+    # A block with count zero indicates the end of the map. Each item is encoded per the map's value schema.
+
+    # If a block's count is negative, then the count is followed immediately by a long block size,
+    # indicating the number of bytes in the block.
+    # The actual count in this case is the absolute value of the count written.
+    def read_map(self, writer_schema, decoder):
+        read_items = {}
+        block_count = decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                decoder.read_long()
+            for _ in range(block_count):
+                key = decoder.read_utf8()
+                read_items[key] = self.read_data(writer_schema.values, decoder)
+            block_count = decoder.read_long()
+        return read_items
+
+    def skip_map(self, writer_schema, decoder):
+        block_count = decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_size = decoder.read_long()
+                decoder.skip(block_size)
+            else:
+                for _ in range(block_count):
+                    decoder.skip_utf8()
+                    self.skip_data(writer_schema.values, decoder)
+            block_count = decoder.read_long()
+
+    # A union is encoded by first writing a long value indicating
+    # the zero-based position within the union of the schema of its value.
+    # The value is then encoded per the indicated schema within the union.
+    def read_union(self, writer_schema, decoder):
+        # schema resolution
+        index_of_schema = int(decoder.read_long())
+        if index_of_schema >= len(writer_schema.schemas):
+            fail_msg = (f"Can't access branch index {index_of_schema} "
+                        f"for union with {len(writer_schema.schemas)} branches")
+            raise SchemaResolutionException(fail_msg, writer_schema)
+        selected_writer_schema = writer_schema.schemas[index_of_schema]
+
+        # read data
+        return self.read_data(selected_writer_schema, decoder)
+
+    def skip_union(self, writer_schema, decoder):
+        index_of_schema = int(decoder.read_long())
+        if index_of_schema >= len(writer_schema.schemas):
+            fail_msg = (f"Can't access branch index {index_of_schema} "
+                        f"for union with {len(writer_schema.schemas)} branches")
+            raise SchemaResolutionException(fail_msg, writer_schema)
+        return self.skip_data(writer_schema.schemas[index_of_schema], decoder)
+
+    # A record is encoded by encoding the values of its fields
+    # in the order that they are declared. In other words, a record
+    # is encoded as just the concatenation of the encodings of its fields.
+    # Field values are encoded per their schema.
+
+    # Schema Resolution:
+    #     * the ordering of fields may be different: fields are matched by name.
+    #     * schemas for fields with the same name in both records are resolved
+    #     recursively.
+    #     * if the writer's record contains a field with a name not present in the
+    #     reader's record, the writer's value for that field is ignored.
+    #     * if the reader's record schema has a field that contains a default value,
+    #     and writer's schema does not have a field with the same name, then the
+    #     reader should use the default value from its field.
+    #     * if the reader's record schema has a field with no default value, and
+    #     writer's schema does not have a field with the same name, then the
+    #     field's value is unset.
+    def read_record(self, writer_schema, decoder):
+        # schema resolution
+        read_record = {}
+        for field in writer_schema.fields:
+            field_val = self.read_data(field.type, decoder)
+            read_record[field.name] = field_val
+        return read_record
+
+    def skip_record(self, writer_schema, decoder):
+        for field in writer_schema.fields:
+            self.skip_data(field.type, decoder)
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io_async.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io_async.py
new file mode 100644
index 00000000..8688661b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/avro_io_async.py
@@ -0,0 +1,419 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+# pylint: disable=docstring-missing-return, docstring-missing-rtype
+
+"""Input/output utilities.
+
+Includes:
+ - i/o-specific constants
+ - i/o-specific exceptions
+ - schema validation
+ - leaf value encoding and decoding
+ - datum reader/writer stuff (?)
+
+Also includes a generic representation for data, which uses the
+following mapping:
+ - Schema records are implemented as dict.
+ - Schema arrays are implemented as list.
+ - Schema maps are implemented as dict.
+ - Schema strings are implemented as unicode.
+ - Schema bytes are implemented as str.
+ - Schema ints are implemented as int.
+ - Schema longs are implemented as long.
+ - Schema floats are implemented as float.
+ - Schema doubles are implemented as float.
+ - Schema booleans are implemented as bool.
+"""
+
+import logging
+import sys
+
+from ..avro import schema
+
+from .avro_io import STRUCT_FLOAT, STRUCT_DOUBLE, SchemaResolutionException
+
+PY3 = sys.version_info[0] == 3
+
+logger = logging.getLogger(__name__)
+
+# ------------------------------------------------------------------------------
+# Decoder
+
+
+class AsyncBinaryDecoder(object):
+    """Read leaf values."""
+
+    def __init__(self, reader):
+        """
+        reader is a Python object on which we can call read, seek, and tell.
+        """
+        self._reader = reader
+
+    @property
+    def reader(self):
+        """Reports the reader used by this decoder."""
+        return self._reader
+
+    async def read(self, n):
+        """Read n bytes.
+
+        :param int n: Number of bytes to read.
+        :returns: The next n bytes from the input.
+        :rtype: bytes
+        """
+        assert (n >= 0), n
+        input_bytes = await self.reader.read(n)
+        if n > 0 and not input_bytes:
+            raise StopAsyncIteration
+        assert (len(input_bytes) == n), input_bytes
+        return input_bytes
+
+    @staticmethod
+    def read_null():
+        """
+        null is written as zero bytes
+        """
+        return None
+
+    async def read_boolean(self):
+        """
+        a boolean is written as a single byte
+        whose value is either 0 (false) or 1 (true).
+        """
+        b = ord(await self.read(1))
+        if b == 1:
+            return True
+        if b == 0:
+            return False
+        fail_msg = f"Invalid value for boolean: {b}"
+        raise schema.AvroException(fail_msg)
+
+    async def read_int(self):
+        """
+        int and long values are written using variable-length, zig-zag coding.
+        """
+        return await self.read_long()
+
+    async def read_long(self):
+        """
+        int and long values are written using variable-length, zig-zag coding.
+        """
+        b = ord(await self.read(1))
+        n = b & 0x7F
+        shift = 7
+        while (b & 0x80) != 0:
+            b = ord(await self.read(1))
+            n |= (b & 0x7F) << shift
+            shift += 7
+        datum = (n >> 1) ^ -(n & 1)
+        return datum
+
+    async def read_float(self):
+        """
+        A float is written as 4 bytes.
+        The float is converted into a 32-bit integer using a method equivalent to
+        Java's floatToIntBits and then encoded in little-endian format.
+        """
+        return STRUCT_FLOAT.unpack(await self.read(4))[0]
+
+    async def read_double(self):
+        """
+        A double is written as 8 bytes.
+        The double is converted into a 64-bit integer using a method equivalent to
+        Java's doubleToLongBits and then encoded in little-endian format.
+        """
+        return STRUCT_DOUBLE.unpack(await self.read(8))[0]
+
+    async def read_bytes(self):
+        """
+        Bytes are encoded as a long followed by that many bytes of data.
+        """
+        nbytes = await self.read_long()
+        assert (nbytes >= 0), nbytes
+        return await self.read(nbytes)
+
+    async def read_utf8(self):
+        """
+        A string is encoded as a long followed by
+        that many bytes of UTF-8 encoded character data.
+        """
+        input_bytes = await self.read_bytes()
+        if PY3:
+            try:
+                return input_bytes.decode('utf-8')
+            except UnicodeDecodeError as exn:
+                logger.error('Invalid UTF-8 input bytes: %r', input_bytes)
+                raise exn
+        else:
+            # PY2
+            return unicode(input_bytes, "utf-8") # pylint: disable=undefined-variable
+
+    def skip_null(self):
+        pass
+
+    async def skip_boolean(self):
+        await self.skip(1)
+
+    async def skip_int(self):
+        await self.skip_long()
+
+    async def skip_long(self):
+        b = ord(await self.read(1))
+        while (b & 0x80) != 0:
+            b = ord(await self.read(1))
+
+    async def skip_float(self):
+        await self.skip(4)
+
+    async def skip_double(self):
+        await self.skip(8)
+
+    async def skip_bytes(self):
+        await self.skip(await self.read_long())
+
+    async def skip_utf8(self):
+        await self.skip_bytes()
+
+    async def skip(self, n):
+        await self.reader.seek(await self.reader.tell() + n)
+
+
+# ------------------------------------------------------------------------------
+# DatumReader
+
+
+class AsyncDatumReader(object):
+    """Deserialize Avro-encoded data into a Python data structure."""
+
+    def __init__(self, writer_schema=None):
+        """
+        As defined in the Avro specification, we call the schema encoded
+        in the data the "writer's schema", and the schema expected by the
+        reader the "reader's schema".
+        """
+        self._writer_schema = writer_schema
+
+    # read/write properties
+    def set_writer_schema(self, writer_schema):
+        self._writer_schema = writer_schema
+
+    writer_schema = property(lambda self: self._writer_schema,
+                             set_writer_schema)
+
+    async def read(self, decoder):
+        return await self.read_data(self.writer_schema, decoder)
+
+    async def read_data(self, writer_schema, decoder):
+        # function dispatch for reading data based on type of writer's schema
+        if writer_schema.type == 'null':
+            result = decoder.read_null()
+        elif writer_schema.type == 'boolean':
+            result = await decoder.read_boolean()
+        elif writer_schema.type == 'string':
+            result = await decoder.read_utf8()
+        elif writer_schema.type == 'int':
+            result = await decoder.read_int()
+        elif writer_schema.type == 'long':
+            result = await decoder.read_long()
+        elif writer_schema.type == 'float':
+            result = await decoder.read_float()
+        elif writer_schema.type == 'double':
+            result = await decoder.read_double()
+        elif writer_schema.type == 'bytes':
+            result = await decoder.read_bytes()
+        elif writer_schema.type == 'fixed':
+            result = await self.read_fixed(writer_schema, decoder)
+        elif writer_schema.type == 'enum':
+            result = await self.read_enum(writer_schema, decoder)
+        elif writer_schema.type == 'array':
+            result = await self.read_array(writer_schema, decoder)
+        elif writer_schema.type == 'map':
+            result = await self.read_map(writer_schema, decoder)
+        elif writer_schema.type in ['union', 'error_union']:
+            result = await self.read_union(writer_schema, decoder)
+        elif writer_schema.type in ['record', 'error', 'request']:
+            result = await self.read_record(writer_schema, decoder)
+        else:
+            fail_msg = f"Cannot read unknown schema type: {writer_schema.type}"
+            raise schema.AvroException(fail_msg)
+        return result
+
+    async def skip_data(self, writer_schema, decoder):
+        if writer_schema.type == 'null':
+            result = decoder.skip_null()
+        elif writer_schema.type == 'boolean':
+            result = await decoder.skip_boolean()
+        elif writer_schema.type == 'string':
+            result = await decoder.skip_utf8()
+        elif writer_schema.type == 'int':
+            result = await decoder.skip_int()
+        elif writer_schema.type == 'long':
+            result = await decoder.skip_long()
+        elif writer_schema.type == 'float':
+            result = await decoder.skip_float()
+        elif writer_schema.type == 'double':
+            result = await decoder.skip_double()
+        elif writer_schema.type == 'bytes':
+            result = await decoder.skip_bytes()
+        elif writer_schema.type == 'fixed':
+            result = await self.skip_fixed(writer_schema, decoder)
+        elif writer_schema.type == 'enum':
+            result = await self.skip_enum(decoder)
+        elif writer_schema.type == 'array':
+            await self.skip_array(writer_schema, decoder)
+            result = None
+        elif writer_schema.type == 'map':
+            await self.skip_map(writer_schema, decoder)
+            result = None
+        elif writer_schema.type in ['union', 'error_union']:
+            result = await self.skip_union(writer_schema, decoder)
+        elif writer_schema.type in ['record', 'error', 'request']:
+            await self.skip_record(writer_schema, decoder)
+            result = None
+        else:
+            fail_msg = f"Unknown schema type: {writer_schema.type}"
+            raise schema.AvroException(fail_msg)
+        return result
+
+    # Fixed instances are encoded using the number of bytes declared in the schema.
+    @staticmethod
+    async def read_fixed(writer_schema, decoder):
+        return await decoder.read(writer_schema.size)
+
+    @staticmethod
+    async def skip_fixed(writer_schema, decoder):
+        return await decoder.skip(writer_schema.size)
+
+    # An enum is encoded by a int, representing the zero-based position of the symbol in the schema.
+    @staticmethod
+    async def read_enum(writer_schema, decoder):
+        # read data
+        index_of_symbol = await decoder.read_int()
+        if index_of_symbol >= len(writer_schema.symbols):
+            fail_msg = f"Can't access enum index {index_of_symbol} for enum with {len(writer_schema.symbols)} symbols"
+            raise SchemaResolutionException(fail_msg, writer_schema)
+        read_symbol = writer_schema.symbols[index_of_symbol]
+        return read_symbol
+
+    @staticmethod
+    async def skip_enum(decoder):
+        return await decoder.skip_int()
+
+    # Arrays are encoded as a series of blocks.
+
+    # Each block consists of a long count value, followed by that many array items.
+    # A block with count zero indicates the end of the array. Each item is encoded per the array's item schema.
+
+    # If a block's count is negative, then the count is followed immediately by a long block size,
+    # indicating the number of bytes in the block.
+    # The actual count in this case is the absolute value of the count written.
+    async def read_array(self, writer_schema, decoder):
+        read_items = []
+        block_count = await decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                await decoder.read_long()
+            for _ in range(block_count):
+                read_items.append(await self.read_data(writer_schema.items, decoder))
+            block_count = await decoder.read_long()
+        return read_items
+
+    async def skip_array(self, writer_schema, decoder):
+        block_count = await decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_size = await decoder.read_long()
+                await decoder.skip(block_size)
+            else:
+                for _ in range(block_count):
+                    await self.skip_data(writer_schema.items, decoder)
+            block_count = await decoder.read_long()
+
+    # Maps are encoded as a series of blocks.
+
+    # Each block consists of a long count value, followed by that many key/value pairs.
+    # A block with count zero indicates the end of the map. Each item is encoded per the map's value schema.
+
+    # If a block's count is negative, then the count is followed immediately by a long block size,
+    # indicating the number of bytes in the block.
+    # The actual count in this case is the absolute value of the count written.
+    async def read_map(self, writer_schema, decoder):
+        read_items = {}
+        block_count = await decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                await decoder.read_long()
+            for _ in range(block_count):
+                key = await decoder.read_utf8()
+                read_items[key] = await self.read_data(writer_schema.values, decoder)
+            block_count = await decoder.read_long()
+        return read_items
+
+    async def skip_map(self, writer_schema, decoder):
+        block_count = await decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_size = await decoder.read_long()
+                await decoder.skip(block_size)
+            else:
+                for _ in range(block_count):
+                    await decoder.skip_utf8()
+                    await self.skip_data(writer_schema.values, decoder)
+            block_count = await decoder.read_long()
+
+    # A union is encoded by first writing a long value indicating
+    # the zero-based position within the union of the schema of its value.
+    # The value is then encoded per the indicated schema within the union.
+    async def read_union(self, writer_schema, decoder):
+        # schema resolution
+        index_of_schema = int(await decoder.read_long())
+        if index_of_schema >= len(writer_schema.schemas):
+            fail_msg = (f"Can't access branch index {index_of_schema} "
+                    f"for union with {len(writer_schema.schemas)} branches")
+            raise SchemaResolutionException(fail_msg, writer_schema)
+        selected_writer_schema = writer_schema.schemas[index_of_schema]
+
+        # read data
+        return await self.read_data(selected_writer_schema, decoder)
+
+    async def skip_union(self, writer_schema, decoder):
+        index_of_schema = int(await decoder.read_long())
+        if index_of_schema >= len(writer_schema.schemas):
+            fail_msg = (f"Can't access branch index {index_of_schema} "
+                    f"for union with {len(writer_schema.schemas)} branches")
+            raise SchemaResolutionException(fail_msg, writer_schema)
+        return await self.skip_data(writer_schema.schemas[index_of_schema], decoder)
+
+    # A record is encoded by encoding the values of its fields
+    # in the order that they are declared. In other words, a record
+    # is encoded as just the concatenation of the encodings of its fields.
+    # Field values are encoded per their schema.
+
+    # Schema Resolution:
+    #     * the ordering of fields may be different: fields are matched by name.
+    #     * schemas for fields with the same name in both records are resolved
+    #     recursively.
+    #     * if the writer's record contains a field with a name not present in the
+    #     reader's record, the writer's value for that field is ignored.
+    #     * if the reader's record schema has a field that contains a default value,
+    #     and writer's schema does not have a field with the same name, then the
+    #     reader should use the default value from its field.
+    #     * if the reader's record schema has a field with no default value, and
+    #     writer's schema does not have a field with the same name, then the
+    #     field's value is unset.
+    async def read_record(self, writer_schema, decoder):
+        # schema resolution
+        read_record = {}
+        for field in writer_schema.fields:
+            field_val = await self.read_data(field.type, decoder)
+            read_record[field.name] = field_val
+        return read_record
+
+    async def skip_record(self, writer_schema, decoder):
+        for field in writer_schema.fields:
+            await self.skip_data(field.type, decoder)
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile.py
new file mode 100644
index 00000000..757e0329
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile.py
@@ -0,0 +1,257 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+# pylint: disable=docstring-missing-return, docstring-missing-rtype
+
+"""Read/Write Avro File Object Containers."""
+
+import io
+import logging
+import sys
+import zlib
+
+from ..avro import avro_io
+from ..avro import schema
+
+PY3 = sys.version_info[0] == 3
+
+logger = logging.getLogger(__name__)
+
+# ------------------------------------------------------------------------------
+# Constants
+
+# Version of the container file:
+VERSION = 1
+
+if PY3:
+    MAGIC = b'Obj' + bytes([VERSION])
+    MAGIC_SIZE = len(MAGIC)
+else:
+    MAGIC = 'Obj' + chr(VERSION)
+    MAGIC_SIZE = len(MAGIC)
+
+# Size of the synchronization marker, in number of bytes:
+SYNC_SIZE = 16
+
+# Schema of the container header:
+META_SCHEMA = schema.parse("""
+{
+  "type": "record", "name": "org.apache.avro.file.Header",
+  "fields": [{
+    "name": "magic",
+    "type": {"type": "fixed", "name": "magic", "size": %(magic_size)d}
+  }, {
+    "name": "meta",
+    "type": {"type": "map", "values": "bytes"}
+  }, {
+    "name": "sync",
+    "type": {"type": "fixed", "name": "sync", "size": %(sync_size)d}
+  }]
+}
+""" % {
+    'magic_size': MAGIC_SIZE,
+    'sync_size': SYNC_SIZE,
+})
+
+# Codecs supported by container files:
+VALID_CODECS = frozenset(['null', 'deflate'])
+
+# Metadata key associated to the schema:
+SCHEMA_KEY = "avro.schema"
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class DataFileException(schema.AvroException):
+    """Problem reading or writing file object containers."""
+
+# ------------------------------------------------------------------------------
+
+
+class DataFileReader(object):  # pylint: disable=too-many-instance-attributes
+    """Read files written by DataFileWriter."""
+
+    def __init__(self, reader, datum_reader, **kwargs):
+        """Initializes a new data file reader.
+
+        Args:
+          reader: Open file to read from.
+          datum_reader: Avro datum reader.
+        """
+        self._reader = reader
+        self._raw_decoder = avro_io.BinaryDecoder(reader)
+        self._header_reader = kwargs.pop('header_reader', None)
+        self._header_decoder = None if self._header_reader is None else avro_io.BinaryDecoder(self._header_reader)
+        self._datum_decoder = None  # Maybe reset at every block.
+        self._datum_reader = datum_reader
+
+        # In case self._reader only has partial content(without header).
+        # seek(0, 0) to make sure read the (partial)content from beginning.
+        self._reader.seek(0, 0)
+
+        # read the header: magic, meta, sync
+        self._read_header()
+
+        # ensure codec is valid
+        avro_codec_raw = self.get_meta('avro.codec')
+        if avro_codec_raw is None:
+            self.codec = "null"
+        else:
+            self.codec = avro_codec_raw.decode('utf-8')
+        if self.codec not in VALID_CODECS:
+            raise DataFileException(f"Unknown codec: {self.codec}.")
+
+        # get ready to read
+        self._block_count = 0
+
+        # object_position is to support reading from current position in the future read,
+        # no need to downloading from the beginning of avro.
+        if hasattr(self._reader, 'object_position'):
+            self.reader.track_object_position()
+
+        self._cur_object_index = 0
+        # header_reader indicates reader only has partial content. The reader doesn't have block header,
+        # so we read use the block count stored last time.
+        # Also ChangeFeed only has codec==null, so use _raw_decoder is good.
+        if self._header_reader is not None:
+            self._datum_decoder = self._raw_decoder
+
+        self.datum_reader.writer_schema = (
+            schema.parse(self.get_meta(SCHEMA_KEY).decode('utf-8')))
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, data_type, value, traceback):
+        # Perform a close if there's no exception
+        if data_type is None:
+            self.close()
+
+    def __iter__(self):
+        return self
+
+    # read-only properties
+    @property
+    def reader(self):
+        return self._reader
+
+    @property
+    def raw_decoder(self):
+        return self._raw_decoder
+
+    @property
+    def datum_decoder(self):
+        return self._datum_decoder
+
+    @property
+    def datum_reader(self):
+        return self._datum_reader
+
+    @property
+    def sync_marker(self):
+        return self._sync_marker
+
+    @property
+    def meta(self):
+        return self._meta
+
+    # read/write properties
+    @property
+    def block_count(self):
+        return self._block_count
+
+    def get_meta(self, key):
+        """Reports the value of a given metadata key.
+
+        :param str key: Metadata key to report the value of.
+        :returns: Value associated to the metadata key, as bytes.
+        :rtype: bytes
+        """
+        return self._meta.get(key)
+
+    def _read_header(self):
+        header_reader = self._header_reader if self._header_reader else self._reader
+        header_decoder = self._header_decoder if self._header_decoder else self._raw_decoder
+
+        # seek to the beginning of the file to get magic block
+        header_reader.seek(0, 0)
+
+        # read header into a dict
+        header = self.datum_reader.read_data(META_SCHEMA, header_decoder)
+
+        # check magic number
+        if header.get('magic') != MAGIC:
+            fail_msg = f"Not an Avro data file: {header.get('magic')} doesn't match {MAGIC!r}."
+            raise schema.AvroException(fail_msg)
+
+        # set metadata
+        self._meta = header['meta']
+
+        # set sync marker
+        self._sync_marker = header['sync']
+
+    def _read_block_header(self):
+        self._block_count = self.raw_decoder.read_long()
+        if self.codec == "null":
+            # Skip a long; we don't need to use the length.
+            self.raw_decoder.skip_long()
+            self._datum_decoder = self._raw_decoder
+        elif self.codec == 'deflate':
+            # Compressed data is stored as (length, data), which
+            # corresponds to how the "bytes" type is encoded.
+            data = self.raw_decoder.read_bytes()
+            # -15 is the log of the window size; negative indicates
+            # "raw" (no zlib headers) decompression.  See zlib.h.
+            uncompressed = zlib.decompress(data, -15)
+            self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
+        else:
+            raise DataFileException(f"Unknown codec: {self.codec!r}")
+
+    def _skip_sync(self):
+        """
+        Read the length of the sync marker; if it matches the sync marker,
+        return True. Otherwise, seek back to where we started and return False.
+        """
+        proposed_sync_marker = self.reader.read(SYNC_SIZE)
+        if SYNC_SIZE > 0 and not proposed_sync_marker:
+            raise StopIteration
+        if proposed_sync_marker != self.sync_marker:
+            self.reader.seek(-SYNC_SIZE, 1)
+
+    def __next__(self):
+        """Return the next datum in the file."""
+        if self.block_count == 0:
+            self._skip_sync()
+
+            # object_position is to support reading from current position in the future read,
+            # no need to downloading from the beginning of avro file with this attr.
+            if hasattr(self._reader, 'object_position'):
+                self.reader.track_object_position()
+            self._cur_object_index = 0
+
+            self._read_block_header()
+
+        datum = self.datum_reader.read(self.datum_decoder)
+        self._block_count -= 1
+        self._cur_object_index += 1
+
+        # object_position is to support reading from current position in the future read,
+        # This will track the index of the next item to be read.
+        # This will also track the offset before the next sync marker.
+        if hasattr(self._reader, 'object_position'):
+            if self.block_count == 0:
+                # the next event to be read is at index 0 in the new chunk of blocks,
+                self.reader.track_object_position()
+                self.reader.set_object_index(0)
+            else:
+                self.reader.set_object_index(self._cur_object_index)
+
+        return datum
+
+    def close(self):
+        """Close this reader."""
+        self.reader.close()
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile_async.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile_async.py
new file mode 100644
index 00000000..85dc5cb5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/datafile_async.py
@@ -0,0 +1,210 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+# pylint: disable=docstring-missing-return, docstring-missing-rtype
+
+"""Read/Write Avro File Object Containers."""
+
+import logging
+import sys
+
+from ..avro import avro_io_async
+from ..avro import schema
+from .datafile import DataFileException
+from .datafile import MAGIC, SYNC_SIZE, META_SCHEMA, SCHEMA_KEY
+
+
+PY3 = sys.version_info[0] == 3
+
+logger = logging.getLogger(__name__)
+
+# ------------------------------------------------------------------------------
+# Constants
+
+# Codecs supported by container files:
+VALID_CODECS = frozenset(['null'])
+
+
+class AsyncDataFileReader(object):  # pylint: disable=too-many-instance-attributes
+    """Read files written by DataFileWriter."""
+
+    def __init__(self, reader, datum_reader, **kwargs):
+        """Initializes a new data file reader.
+
+        Args:
+          reader: Open file to read from.
+          datum_reader: Avro datum reader.
+        """
+        self._reader = reader
+        self._raw_decoder = avro_io_async.AsyncBinaryDecoder(reader)
+        self._header_reader = kwargs.pop('header_reader', None)
+        self._header_decoder = None if self._header_reader is None else \
+            avro_io_async.AsyncBinaryDecoder(self._header_reader)
+        self._datum_decoder = None  # Maybe reset at every block.
+        self._datum_reader = datum_reader
+        self.codec = "null"
+        self._block_count = 0
+        self._cur_object_index = 0
+        self._meta = None
+        self._sync_marker = None
+
+    async def init(self):
+        # In case self._reader only has partial content(without header).
+        # seek(0, 0) to make sure read the (partial)content from beginning.
+        await self._reader.seek(0, 0)
+
+        # read the header: magic, meta, sync
+        await self._read_header()
+
+        # ensure codec is valid
+        avro_codec_raw = self.get_meta('avro.codec')
+        if avro_codec_raw is None:
+            self.codec = "null"
+        else:
+            self.codec = avro_codec_raw.decode('utf-8')
+        if self.codec not in VALID_CODECS:
+            raise DataFileException(f"Unknown codec: {self.codec}.")
+
+        # get ready to read
+        self._block_count = 0
+
+        # object_position is to support reading from current position in the future read,
+        # no need to downloading from the beginning of avro.
+        if hasattr(self._reader, 'object_position'):
+            self.reader.track_object_position()
+
+        # header_reader indicates reader only has partial content. The reader doesn't have block header,
+        # so we read use the block count stored last time.
+        # Also ChangeFeed only has codec==null, so use _raw_decoder is good.
+        if self._header_reader is not None:
+            self._datum_decoder = self._raw_decoder
+        self.datum_reader.writer_schema = (
+            schema.parse(self.get_meta(SCHEMA_KEY).decode('utf-8')))
+        return self
+
+    async def __aenter__(self):
+        return self
+
+    async def __aexit__(self, data_type, value, traceback):
+        # Perform a close if there's no exception
+        if data_type is None:
+            self.close()
+
+    def __aiter__(self):
+        return self
+
+    # read-only properties
+    @property
+    def reader(self):
+        return self._reader
+
+    @property
+    def raw_decoder(self):
+        return self._raw_decoder
+
+    @property
+    def datum_decoder(self):
+        return self._datum_decoder
+
+    @property
+    def datum_reader(self):
+        return self._datum_reader
+
+    @property
+    def sync_marker(self):
+        return self._sync_marker
+
+    @property
+    def meta(self):
+        return self._meta
+
+    # read/write properties
+    @property
+    def block_count(self):
+        return self._block_count
+
+    def get_meta(self, key):
+        """Reports the value of a given metadata key.
+
+        :param str key: Metadata key to report the value of.
+        :returns: Value associated to the metadata key, as bytes.
+        :rtype: bytes
+        """
+        return self._meta.get(key)
+
+    async def _read_header(self):
+        header_reader = self._header_reader if self._header_reader else self._reader
+        header_decoder = self._header_decoder if self._header_decoder else self._raw_decoder
+
+        # seek to the beginning of the file to get magic block
+        await header_reader.seek(0, 0)
+
+        # read header into a dict
+        header = await self.datum_reader.read_data(META_SCHEMA, header_decoder)
+
+        # check magic number
+        if header.get('magic') != MAGIC:
+            fail_msg = f"Not an Avro data file: {header.get('magic')} doesn't match {MAGIC!r}."
+            raise schema.AvroException(fail_msg)
+
+        # set metadata
+        self._meta = header['meta']
+
+        # set sync marker
+        self._sync_marker = header['sync']
+
+    async def _read_block_header(self):
+        self._block_count = await self.raw_decoder.read_long()
+        if self.codec == "null":
+            # Skip a long; we don't need to use the length.
+            await self.raw_decoder.skip_long()
+            self._datum_decoder = self._raw_decoder
+        else:
+            raise DataFileException(f"Unknown codec: {self.codec!r}")
+
+    async def _skip_sync(self):
+        """
+        Read the length of the sync marker; if it matches the sync marker,
+        return True. Otherwise, seek back to where we started and return False.
+        """
+        proposed_sync_marker = await self.reader.read(SYNC_SIZE)
+        if SYNC_SIZE > 0 and not proposed_sync_marker:
+            raise StopAsyncIteration
+        if proposed_sync_marker != self.sync_marker:
+            await self.reader.seek(-SYNC_SIZE, 1)
+
+    async def __anext__(self):
+        """Return the next datum in the file."""
+        if self.block_count == 0:
+            await self._skip_sync()
+
+            # object_position is to support reading from current position in the future read,
+            # no need to downloading from the beginning of avro file with this attr.
+            if hasattr(self._reader, 'object_position'):
+                await self.reader.track_object_position()
+            self._cur_object_index = 0
+
+            await self._read_block_header()
+
+        datum = await self.datum_reader.read(self.datum_decoder)
+        self._block_count -= 1
+        self._cur_object_index += 1
+
+        # object_position is to support reading from current position in the future read,
+        # This will track the index of the next item to be read.
+        # This will also track the offset before the next sync marker.
+        if hasattr(self._reader, 'object_position'):
+            if self.block_count == 0:
+                # the next event to be read is at index 0 in the new chunk of blocks,
+                await self.reader.track_object_position()
+                await self.reader.set_object_index(0)
+            else:
+                await self.reader.set_object_index(self._cur_object_index)
+
+        return datum
+
+    def close(self):
+        """Close this reader."""
+        self.reader.close()
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/schema.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/schema.py
new file mode 100644
index 00000000..d5484abc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_shared/avro/schema.py
@@ -0,0 +1,1178 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+# pylint: disable=docstring-missing-return, docstring-missing-rtype, too-many-lines
+
+"""Representation of Avro schemas.
+
+A schema may be one of:
+ - A record, mapping field names to field value data;
+ - An error, equivalent to a record;
+ - An enum, containing one of a small set of symbols;
+ - An array of values, all of the same schema;
+ - A map containing string/value pairs, each of a declared schema;
+ - A union of other schemas;
+ - A fixed sized binary object;
+ - A unicode string;
+ - A sequence of bytes;
+ - A 32-bit signed int;
+ - A 64-bit signed long;
+ - A 32-bit floating-point float;
+ - A 64-bit floating-point double;
+ - A boolean;
+ - Null.
+"""
+
+import abc
+import json
+import logging
+import re
+logger = logging.getLogger(__name__)
+
+# ------------------------------------------------------------------------------
+# Constants
+
+# Log level more verbose than DEBUG=10, INFO=20, etc.
+DEBUG_VERBOSE = 5
+
+NULL = 'null'
+BOOLEAN = 'boolean'
+STRING = 'string'
+BYTES = 'bytes'
+INT = 'int'
+LONG = 'long'
+FLOAT = 'float'
+DOUBLE = 'double'
+FIXED = 'fixed'
+ENUM = 'enum'
+RECORD = 'record'
+ERROR = 'error'
+ARRAY = 'array'
+MAP = 'map'
+UNION = 'union'
+
+# Request and error unions are part of Avro protocols:
+REQUEST = 'request'
+ERROR_UNION = 'error_union'
+
+PRIMITIVE_TYPES = frozenset([
+    NULL,
+    BOOLEAN,
+    STRING,
+    BYTES,
+    INT,
+    LONG,
+    FLOAT,
+    DOUBLE,
+])
+
+NAMED_TYPES = frozenset([
+    FIXED,
+    ENUM,
+    RECORD,
+    ERROR,
+])
+
+VALID_TYPES = frozenset.union(
+    PRIMITIVE_TYPES,
+    NAMED_TYPES,
+    [
+        ARRAY,
+        MAP,
+        UNION,
+        REQUEST,
+        ERROR_UNION,
+    ],
+)
+
+SCHEMA_RESERVED_PROPS = frozenset([
+    'type',
+    'name',
+    'namespace',
+    'fields',  # Record
+    'items',  # Array
+    'size',  # Fixed
+    'symbols',  # Enum
+    'values',  # Map
+    'doc',
+])
+
+FIELD_RESERVED_PROPS = frozenset([
+    'default',
+    'name',
+    'doc',
+    'order',
+    'type',
+])
+
+VALID_FIELD_SORT_ORDERS = frozenset([
+    'ascending',
+    'descending',
+    'ignore',
+])
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class Error(Exception):
+    """Base class for errors in this module."""
+
+
+class AvroException(Error):
+    """Generic Avro schema error."""
+
+
+class SchemaParseException(AvroException):
+    """Error while parsing a JSON schema descriptor."""
+
+
+class Schema(metaclass=abc.ABCMeta):
+    """Abstract base class for all Schema classes."""
+
+    def __init__(self, data_type, other_props=None):
+        """Initializes a new schema object.
+
+        Args:
+          data_type: Type of the schema to initialize.
+          other_props: Optional dictionary of additional properties.
+        """
+        if data_type not in VALID_TYPES:
+            raise SchemaParseException(f'{data_type!r} is not a valid Avro type.')
+
+        # All properties of this schema, as a map: property name -> property value
+        self._props = {}
+
+        self._props['type'] = data_type
+        self._type = data_type
+
+        if other_props:
+            self._props.update(other_props)
+
+    @property
+    def namespace(self):
+        """Returns: the namespace this schema belongs to, if any, or None."""
+        return self._props.get('namespace', None)
+
+    @property
+    def type(self):
+        """Returns: the type of this schema."""
+        return self._type
+
+    @property
+    def doc(self):
+        """Returns: the documentation associated to this schema, if any, or None."""
+        return self._props.get('doc', None)
+
+    @property
+    def props(self):
+        """Reports all the properties of this schema.
+
+        Includes all properties, reserved and non reserved.
+        JSON properties of this schema are directly generated from this dict.
+
+        Returns:
+          A dictionary of properties associated to this schema.
+        """
+        return self._props
+
+    @property
+    def other_props(self):
+        """Returns: the dictionary of non-reserved properties."""
+        return dict(filter_keys_out(items=self._props, keys=SCHEMA_RESERVED_PROPS))
+
+    def __str__(self):
+        """Returns: the JSON representation of this schema."""
+        return json.dumps(self.to_json(names=None))
+
+    # Converts the schema object into its AVRO specification representation.
+
+    # Schema types that have names (records, enums, and fixed) must be aware of not
+    # re-defining schemas that are already listed in the parameter names.
+    @abc.abstractmethod
+    def to_json(self, names):
+        ...
+
+
+# ------------------------------------------------------------------------------
+
+
+_RE_NAME = re.compile(r'[A-Za-z_][A-Za-z0-9_]*')
+
+_RE_FULL_NAME = re.compile(
+    r'^'
+    r'[.]?(?:[A-Za-z_][A-Za-z0-9_]*[.])*'  # optional namespace
+    r'([A-Za-z_][A-Za-z0-9_]*)'  # name
+    r'$'
+)
+
+
+class Name(object):
+    """Representation of an Avro name."""
+
+    def __init__(self, name, namespace=None):
+        """Parses an Avro name.
+
+        Args:
+          name: Avro name to parse (relative or absolute).
+          namespace: Optional explicit namespace if the name is relative.
+        """
+        # Normalize: namespace is always defined as a string, possibly empty.
+        if namespace is None:
+            namespace = ''
+
+        if '.' in name:
+            # name is absolute, namespace is ignored:
+            self._fullname = name
+
+            match = _RE_FULL_NAME.match(self._fullname)
+            if match is None:
+                raise SchemaParseException(
+                    f'Invalid absolute schema name: {self._fullname!r}.')
+
+            self._name = match.group(1)
+            self._namespace = self._fullname[:-(len(self._name) + 1)]
+
+        else:
+            # name is relative, combine with explicit namespace:
+            self._name = name
+            self._namespace = namespace
+            self._fullname = (self._name
+                              if (not self._namespace) else
+                              f'{self._namespace}.{self._name}')
+
+            # Validate the fullname:
+            if _RE_FULL_NAME.match(self._fullname) is None:
+                raise SchemaParseException(f"Invalid schema name {self._fullname!r} inferred from "
+                                           f"name {self._name!r} and namespace {self._namespace!r}.")
+
+    def __eq__(self, other):
+        if not isinstance(other, Name):
+            return NotImplemented
+        return self.fullname == other.fullname
+
+    @property
+    def simple_name(self):
+        """Returns: the simple name part of this name."""
+        return self._name
+
+    @property
+    def namespace(self):
+        """Returns: this name's namespace, possible the empty string."""
+        return self._namespace
+
+    @property
+    def fullname(self):
+        """Returns: the full name."""
+        return self._fullname
+
+
+# ------------------------------------------------------------------------------
+
+
+class Names(object):
+    """Tracks Avro named schemas and default namespace during parsing."""
+
+    def __init__(self, default_namespace=None, names=None):
+        """Initializes a new name tracker.
+
+        Args:
+          default_namespace: Optional default namespace.
+          names: Optional initial mapping of known named schemas.
+        """
+        if names is None:
+            names = {}
+        self._names = names
+        self._default_namespace = default_namespace
+
+    @property
+    def names(self):
+        """Returns: the mapping of known named schemas."""
+        return self._names
+
+    @property
+    def default_namespace(self):
+        """Returns: the default namespace, if any, or None."""
+        return self._default_namespace
+
+    def new_with_default_namespace(self, namespace):
+        """Creates a new name tracker from this tracker, but with a new default ns.
+
+        :param Any namespace: New default namespace to use.
+        :returns: New name tracker with the specified default namespace.
+        :rtype: Names
+        """
+        return Names(names=self._names, default_namespace=namespace)
+
+    def get_name(self, name, namespace=None):
+        """Resolves the Avro name according to this name tracker's state.
+
+        :param Any name: Name to resolve (absolute or relative).
+        :param Optional[Any] namespace: Optional explicit namespace.
+        :returns: The specified name, resolved according to this tracker.
+        :rtype: Name
+        """
+        if namespace is None:
+            namespace = self._default_namespace
+        return Name(name=name, namespace=namespace)
+
+    def get_schema(self, name, namespace=None):
+        """Resolves an Avro schema by name.
+
+        :param Any name: Name (absolute or relative) of the Avro schema to look up.
+        :param Optional[Any] namespace: Optional explicit namespace.
+        :returns: The schema with the specified name, if any, or None
+        :rtype: Union[Any, None]
+        """
+        avro_name = self.get_name(name=name, namespace=namespace)
+        return self._names.get(avro_name.fullname, None)
+
+    # Given a properties, return properties with namespace removed if it matches the own default namespace
+    def prune_namespace(self, properties):
+        if self.default_namespace is None:
+            # I have no default -- no change
+            return properties
+        if 'namespace' not in properties:
+            # he has no namespace - no change
+            return properties
+        if properties['namespace'] != self.default_namespace:
+            # we're different - leave his stuff alone
+            return properties
+        # we each have a namespace and it's redundant. delete his.
+        prunable = properties.copy()
+        del prunable['namespace']
+        return prunable
+
+    def register(self, schema):
+        """Registers a new named schema in this tracker.
+
+        :param Any schema: Named Avro schema to register in this tracker.
+        """
+        if schema.fullname in VALID_TYPES:
+            raise SchemaParseException(
+                f'{schema.fullname} is a reserved type name.')
+        if schema.fullname in self.names:
+            raise SchemaParseException(
+                f'Avro name {schema.fullname!r} already exists.')
+
+        logger.log(DEBUG_VERBOSE, 'Register new name for %r', schema.fullname)
+        self._names[schema.fullname] = schema
+
+
+# ------------------------------------------------------------------------------
+
+
+class NamedSchema(Schema):
+    """Abstract base class for named schemas.
+
+    Named schemas are enumerated in NAMED_TYPES.
+    """
+
+    def __init__(
+            self,
+            data_type,
+            name=None,
+            namespace=None,
+            names=None,
+            other_props=None,
+    ):
+        """Initializes a new named schema object.
+
+        Args:
+          data_type: Type of the named schema.
+          name: Name (absolute or relative) of the schema.
+          namespace: Optional explicit namespace if name is relative.
+          names: Tracker to resolve and register Avro names.
+          other_props: Optional map of additional properties of the schema.
+        """
+        assert (data_type in NAMED_TYPES), (f'Invalid named type: {data_type!r}')
+        self._avro_name = names.get_name(name=name, namespace=namespace)
+
+        super(NamedSchema, self).__init__(data_type, other_props)
+
+        names.register(self)
+
+        self._props['name'] = self.name
+        if self.namespace:
+            self._props['namespace'] = self.namespace
+
+    @property
+    def avro_name(self):
+        """Returns: the Name object describing this schema's name."""
+        return self._avro_name
+
+    @property
+    def name(self):
+        return self._avro_name.simple_name
+
+    @property
+    def namespace(self):
+        return self._avro_name.namespace
+
+    @property
+    def fullname(self):
+        return self._avro_name.fullname
+
+    def name_ref(self, names):
+        """Reports this schema name relative to the specified name tracker.
+
+        :param Any names: Avro name tracker to relativize this schema name against.
+        :returns: This schema name, relativized against the specified name tracker.
+        :rtype: Any
+        """
+        if self.namespace == names.default_namespace:
+            return self.name
+        return self.fullname
+
+    # Converts the schema object into its AVRO specification representation.
+
+    # Schema types that have names (records, enums, and fixed) must be aware
+    # of not re-defining schemas that are already listed in the parameter names.
+    @abc.abstractmethod
+    def to_json(self, names):
+        ...
+
+# ------------------------------------------------------------------------------
+
+
+_NO_DEFAULT = object()
+
+
+class Field(object):
+    """Representation of the schema of a field in a record."""
+
+    def __init__(
+            self,
+            data_type,
+            name,
+            index,
+            has_default,
+            default=_NO_DEFAULT,
+            order=None,
+            doc=None,
+            other_props=None
+    ):
+        """Initializes a new Field object.
+
+        Args:
+          data_type: Avro schema of the field.
+          name: Name of the field.
+          index: 0-based position of the field.
+          has_default:
+          default:
+          order:
+          doc:
+          other_props:
+        """
+        if (not isinstance(name, str)) or (not name):
+            raise SchemaParseException(f'Invalid record field name: {name!r}.')
+        if (order is not None) and (order not in VALID_FIELD_SORT_ORDERS):
+            raise SchemaParseException(f'Invalid record field order: {order!r}.')
+
+        # All properties of this record field:
+        self._props = {}
+
+        self._has_default = has_default
+        if other_props:
+            self._props.update(other_props)
+
+        self._index = index
+        self._type = self._props['type'] = data_type
+        self._name = self._props['name'] = name
+
+        if has_default:
+            self._props['default'] = default
+
+        if order is not None:
+            self._props['order'] = order
+
+        if doc is not None:
+            self._props['doc'] = doc
+
+    @property
+    def type(self):
+        """Returns: the schema of this field."""
+        return self._type
+
+    @property
+    def name(self):
+        """Returns: this field name."""
+        return self._name
+
+    @property
+    def index(self):
+        """Returns: the 0-based index of this field in the record."""
+        return self._index
+
+    @property
+    def default(self):
+        return self._props['default']
+
+    @property
+    def has_default(self):
+        return self._has_default
+
+    @property
+    def order(self):
+        return self._props.get('order', None)
+
+    @property
+    def doc(self):
+        return self._props.get('doc', None)
+
+    @property
+    def props(self):
+        return self._props
+
+    @property
+    def other_props(self):
+        return filter_keys_out(items=self._props, keys=FIELD_RESERVED_PROPS)
+
+    def __str__(self):
+        return json.dumps(self.to_json())
+
+    def to_json(self, names=None):
+        if names is None:
+            names = Names()
+        to_dump = self.props.copy()
+        to_dump['type'] = self.type.to_json(names)
+        return to_dump
+
+    def __eq__(self, that):
+        to_cmp = json.loads(str(self))
+        return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+# Primitive Types
+
+
+class PrimitiveSchema(Schema):
+    """Schema of a primitive Avro type.
+
+    Valid primitive types are defined in PRIMITIVE_TYPES.
+    """
+
+    def __init__(self, data_type, other_props=None):
+        """Initializes a new schema object for the specified primitive type.
+
+        Args:
+          data_type: Type of the schema to construct. Must be primitive.
+        """
+        if data_type not in PRIMITIVE_TYPES:
+            raise AvroException(f'{data_type!r} is not a valid primitive type.')
+        super(PrimitiveSchema, self).__init__(data_type, other_props=other_props)
+
+    @property
+    def name(self):
+        """Returns: the simple name of this schema."""
+        # The name of a primitive type is the type itself.
+        return self.type
+
+    @property
+    def fullname(self):
+        """Returns: the fully qualified name of this schema."""
+        # The full name is the simple name for primitive schema.
+        return self.name
+
+    def to_json(self, names=None):
+        if len(self.props) == 1:
+            return self.fullname
+        return self.props
+
+    def __eq__(self, that):
+        return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+# Complex Types (non-recursive)
+
+
+class FixedSchema(NamedSchema):
+    def __init__(
+            self,
+            name,
+            namespace,
+            size,
+            names=None,
+            other_props=None,
+    ):
+        # Ensure valid ctor args
+        if not isinstance(size, int):
+            fail_msg = 'Fixed Schema requires a valid integer for size property.'
+            raise AvroException(fail_msg)
+
+        super(FixedSchema, self).__init__(
+            data_type=FIXED,
+            name=name,
+            namespace=namespace,
+            names=names,
+            other_props=other_props,
+        )
+        self._props['size'] = size
+
+    @property
+    def size(self):
+        """Returns: the size of this fixed schema, in bytes."""
+        return self._props['size']
+
+    def to_json(self, names=None):
+        if names is None:
+            names = Names()
+        if self.fullname in names.names:
+            return self.name_ref(names)
+        names.names[self.fullname] = self
+        return names.prune_namespace(self.props)
+
+    def __eq__(self, that):
+        return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+
+
+class EnumSchema(NamedSchema):
+    def __init__(
+            self,
+            name,
+            namespace,
+            symbols,
+            names=None,
+            doc=None,
+            other_props=None,
+    ):
+        """Initializes a new enumeration schema object.
+
+        Args:
+          name: Simple name of this enumeration.
+          namespace: Optional namespace.
+          symbols: Ordered list of symbols defined in this enumeration.
+          names:
+          doc:
+          other_props:
+        """
+        symbols = tuple(symbols)
+        symbol_set = frozenset(symbols)
+        if (len(symbol_set) != len(symbols)
+                or not all(map(lambda symbol: isinstance(symbol, str), symbols))):
+            raise AvroException(
+                f'Invalid symbols for enum schema: {symbols!r}.')
+
+        super(EnumSchema, self).__init__(
+            data_type=ENUM,
+            name=name,
+            namespace=namespace,
+            names=names,
+            other_props=other_props,
+        )
+
+        self._props['symbols'] = symbols
+        if doc is not None:
+            self._props['doc'] = doc
+
+    @property
+    def symbols(self):
+        """Returns: the symbols defined in this enum."""
+        return self._props['symbols']
+
+    def to_json(self, names=None):
+        if names is None:
+            names = Names()
+        if self.fullname in names.names:
+            return self.name_ref(names)
+        names.names[self.fullname] = self
+        return names.prune_namespace(self.props)
+
+    def __eq__(self, that):
+        return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+# Complex Types (recursive)
+
+
+class ArraySchema(Schema):
+    """Schema of an array."""
+
+    def __init__(self, items, other_props=None):
+        """Initializes a new array schema object.
+
+        Args:
+          items: Avro schema of the array items.
+          other_props:
+        """
+        super(ArraySchema, self).__init__(
+            data_type=ARRAY,
+            other_props=other_props,
+        )
+        self._items_schema = items
+        self._props['items'] = items
+
+    @property
+    def items(self):
+        """Returns: the schema of the items in this array."""
+        return self._items_schema
+
+    def to_json(self, names=None):
+        if names is None:
+            names = Names()
+        to_dump = self.props.copy()
+        item_schema = self.items
+        to_dump['items'] = item_schema.to_json(names)
+        return to_dump
+
+    def __eq__(self, that):
+        to_cmp = json.loads(str(self))
+        return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class MapSchema(Schema):
+    """Schema of a map."""
+
+    def __init__(self, values, other_props=None):
+        """Initializes a new map schema object.
+
+        Args:
+          values: Avro schema of the map values.
+          other_props:
+        """
+        super(MapSchema, self).__init__(
+            data_type=MAP,
+            other_props=other_props,
+        )
+        self._values_schema = values
+        self._props['values'] = values
+
+    @property
+    def values(self):
+        """Returns: the schema of the values in this map."""
+        return self._values_schema
+
+    def to_json(self, names=None):
+        if names is None:
+            names = Names()
+        to_dump = self.props.copy()
+        to_dump['values'] = self.values.to_json(names)
+        return to_dump
+
+    def __eq__(self, that):
+        to_cmp = json.loads(str(self))
+        return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class UnionSchema(Schema):
+    """Schema of a union."""
+
+    def __init__(self, schemas):
+        """Initializes a new union schema object.
+
+        Args:
+          schemas: Ordered collection of schema branches in the union.
+        """
+        super(UnionSchema, self).__init__(data_type=UNION)
+        self._schemas = tuple(schemas)
+
+        # Validate the schema branches:
+
+        # All named schema names are unique:
+        named_branches = tuple(
+            filter(lambda schema: schema.type in NAMED_TYPES, self._schemas))
+        unique_names = frozenset(map(lambda schema: schema.fullname, named_branches))
+        if len(unique_names) != len(named_branches):
+            schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas))
+            raise AvroException(f'Invalid union branches with duplicate schema name:{schemas}')
+
+        # Types are unique within unnamed schemas, and union is not allowed:
+        unnamed_branches = tuple(
+            filter(lambda schema: schema.type not in NAMED_TYPES, self._schemas))
+        unique_types = frozenset(map(lambda schema: schema.type, unnamed_branches))
+        if UNION in unique_types:
+            schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas))
+            raise AvroException(f'Invalid union branches contain other unions:{schemas}')
+        if len(unique_types) != len(unnamed_branches):
+            schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas))
+            raise AvroException(f'Invalid union branches with duplicate type:{schemas}')
+
+    @property
+    def schemas(self):
+        """Returns: the ordered list of schema branches in the union."""
+        return self._schemas
+
+    def to_json(self, names=None):
+        if names is None:
+            names = Names()
+        to_dump = []
+        for schema in self.schemas:
+            to_dump.append(schema.to_json(names))
+        return to_dump
+
+    def __eq__(self, that):
+        to_cmp = json.loads(str(self))
+        return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class ErrorUnionSchema(UnionSchema):
+    """Schema representing the declared errors of a protocol message."""
+
+    def __init__(self, schemas):
+        """Initializes an error-union schema.
+
+        Args:
+          schema: collection of error schema.
+        """
+        # Prepend "string" to handle system errors
+        schemas = [PrimitiveSchema(data_type=STRING)] + list(schemas)
+        super(ErrorUnionSchema, self).__init__(schemas=schemas)
+
+    def to_json(self, names=None):
+        if names is None:
+            names = Names()
+        to_dump = []
+        for schema in self.schemas:
+            # Don't print the system error schema
+            if schema.type == STRING:
+                continue
+            to_dump.append(schema.to_json(names))
+        return to_dump
+
+
+# ------------------------------------------------------------------------------
+
+
+class RecordSchema(NamedSchema):
+    """Schema of a record."""
+
+    @staticmethod
+    def _make_field(index, field_desc, names):
+        """Builds field schemas from a list of field JSON descriptors.
+
+        :param int index: 0-based index of the field in the record.
+        :param Any field_desc: JSON descriptors of a record field.
+        :param Any names: The names for this schema.
+        :returns: The field schema.
+        :rtype: Field
+        """
+        field_schema = schema_from_json_data(
+            json_data=field_desc['type'],
+            names=names,
+        )
+        other_props = (
+            dict(filter_keys_out(items=field_desc, keys=FIELD_RESERVED_PROPS)))
+        return Field(
+            data_type=field_schema,
+            name=field_desc['name'],
+            index=index,
+            has_default=('default' in field_desc),
+            default=field_desc.get('default', _NO_DEFAULT),
+            order=field_desc.get('order', None),
+            doc=field_desc.get('doc', None),
+            other_props=other_props,
+        )
+
+    @staticmethod
+    def make_field_list(field_desc_list, names):
+        """Builds field schemas from a list of field JSON descriptors.
+        Guarantees field name unicity.
+
+        :param Any field_desc_list: Collection of field JSON descriptors.
+        :param Any names: The names for this schema.
+        :returns: Field schemas.
+        :rtype: Field
+        """
+        for index, field_desc in enumerate(field_desc_list):
+            yield RecordSchema._make_field(index, field_desc, names)
+
+    @staticmethod
+    def _make_field_map(fields):
+        """Builds the field map.
+        Guarantees field name unicity.
+
+        :param Any fields: Iterable of field schema.
+        :returns: A map of field schemas, indexed by name.
+        :rtype: Dict[Any, Any]
+        """
+        field_map = {}
+        for field in fields:
+            if field.name in field_map:
+                raise SchemaParseException(
+                    f'Duplicate record field name {field.name!r}.')
+            field_map[field.name] = field
+        return field_map
+
+    def __init__(
+            self,
+            name,
+            namespace,
+            fields=None,
+            make_fields=None,
+            names=None,
+            record_type=RECORD,
+            doc=None,
+            other_props=None
+    ):
+        """Initializes a new record schema object.
+
+        Args:
+          name: Name of the record (absolute or relative).
+          namespace: Optional namespace the record belongs to, if name is relative.
+          fields: collection of fields to add to this record.
+              Exactly one of fields or make_fields must be specified.
+          make_fields: function creating the fields that belong to the record.
+              The function signature is: make_fields(names) -> ordered field list.
+              Exactly one of fields or make_fields must be specified.
+          names:
+          record_type: Type of the record: one of RECORD, ERROR or REQUEST.
+              Protocol requests are not named.
+          doc:
+          other_props:
+        """
+        if record_type == REQUEST:
+            # Protocol requests are not named:
+            super(RecordSchema, self).__init__(
+                data_type=REQUEST,
+                other_props=other_props,
+            )
+        elif record_type in [RECORD, ERROR]:
+            # Register this record name in the tracker:
+            super(RecordSchema, self).__init__(
+                data_type=record_type,
+                name=name,
+                namespace=namespace,
+                names=names,
+                other_props=other_props,
+            )
+        else:
+            raise SchemaParseException(
+                f'Invalid record type: {record_type!r}.')
+
+        nested_names = []
+        if record_type in [RECORD, ERROR]:
+            avro_name = names.get_name(name=name, namespace=namespace)
+            nested_names = names.new_with_default_namespace(namespace=avro_name.namespace)
+        elif record_type == REQUEST:
+            # Protocol request has no name: no need to change default namespace:
+            nested_names = names
+
+        if fields is None:
+            fields = make_fields(names=nested_names)
+        else:
+            assert make_fields is None
+        self._fields = tuple(fields)
+
+        self._field_map = RecordSchema._make_field_map(self._fields)
+
+        self._props['fields'] = fields
+        if doc is not None:
+            self._props['doc'] = doc
+
+    @property
+    def fields(self):
+        """Returns: the field schemas, as an ordered tuple."""
+        return self._fields
+
+    @property
+    def field_map(self):
+        """Returns: a read-only map of the field schemas index by field names."""
+        return self._field_map
+
+    def to_json(self, names=None):
+        if names is None:
+            names = Names()
+        # Request records don't have names
+        if self.type == REQUEST:
+            return [f.to_json(names) for f in self.fields]
+
+        if self.fullname in names.names:
+            return self.name_ref(names)
+        names.names[self.fullname] = self
+
+        to_dump = names.prune_namespace(self.props.copy())
+        to_dump['fields'] = [f.to_json(names) for f in self.fields]
+        return to_dump
+
+    def __eq__(self, that):
+        to_cmp = json.loads(str(self))
+        return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+# Module functions
+
+
+def filter_keys_out(items, keys):
+    """Filters a collection of (key, value) items.
+    Exclude any item whose key belongs to keys.
+
+    :param Dict[Any, Any] items: Dictionary of items to filter the keys out of.
+    :param Dict[Any, Any] keys: Dictionary of keys to filter the extracted keys against.
+    :returns: Filtered items.
+    :rtype: Tuple(Any, Any)
+    """
+    for key, value in items.items():
+        if key in keys:
+            continue
+        yield key, value
+
+
+# ------------------------------------------------------------------------------
+
+
+def _schema_from_json_string(json_string, names):
+    if json_string in PRIMITIVE_TYPES:
+        return PrimitiveSchema(data_type=json_string)
+
+    # Look for a known named schema:
+    schema = names.get_schema(name=json_string)
+    if schema is None:
+        raise SchemaParseException(f"Unknown named schema {json_string!r}, known names: {sorted(names.names)!r}.")
+    return schema
+
+
+def _schema_from_json_array(json_array, names):
+    def MakeSchema(desc):
+        return schema_from_json_data(json_data=desc, names=names)
+
+    return UnionSchema(map(MakeSchema, json_array))
+
+
+def _schema_from_json_object(json_object, names):
+    data_type = json_object.get('type')
+    if data_type is None:
+        raise SchemaParseException(
+            f'Avro schema JSON descriptor has no "type" property: {json_object!r}')
+
+    other_props = dict(
+        filter_keys_out(items=json_object, keys=SCHEMA_RESERVED_PROPS))
+
+    if data_type in PRIMITIVE_TYPES:
+        # FIXME should not ignore other properties
+        result = PrimitiveSchema(data_type, other_props=other_props)
+
+    elif data_type in NAMED_TYPES:
+        name = json_object.get('name')
+        namespace = json_object.get('namespace', names.default_namespace)
+        if data_type == FIXED:
+            size = json_object.get('size')
+            result = FixedSchema(name, namespace, size, names, other_props)
+        elif data_type == ENUM:
+            symbols = json_object.get('symbols')
+            doc = json_object.get('doc')
+            result = EnumSchema(name, namespace, symbols, names, doc, other_props)
+
+        elif data_type in [RECORD, ERROR]:
+            field_desc_list = json_object.get('fields', ())
+
+            def MakeFields(names):
+                return tuple(RecordSchema.make_field_list(field_desc_list, names))
+
+            result = RecordSchema(
+                name=name,
+                namespace=namespace,
+                make_fields=MakeFields,
+                names=names,
+                record_type=data_type,
+                doc=json_object.get('doc'),
+                other_props=other_props,
+            )
+        else:
+            raise ValueError(f'Internal error: unknown type {data_type!r}.')
+
+    elif data_type in VALID_TYPES:
+        # Unnamed, non-primitive Avro type:
+
+        if data_type == ARRAY:
+            items_desc = json_object.get('items')
+            if items_desc is None:
+                raise SchemaParseException(f'Invalid array schema descriptor with no "items" : {json_object!r}.')
+            result = ArraySchema(
+                items=schema_from_json_data(items_desc, names),
+                other_props=other_props,
+            )
+
+        elif data_type == MAP:
+            values_desc = json_object.get('values')
+            if values_desc is None:
+                raise SchemaParseException(f'Invalid map schema descriptor with no "values" : {json_object!r}.')
+            result = MapSchema(
+                values=schema_from_json_data(values_desc, names=names),
+                other_props=other_props,
+            )
+
+        elif data_type == ERROR_UNION:
+            error_desc_list = json_object.get('declared_errors')
+            assert error_desc_list is not None
+            error_schemas = map(
+                lambda desc: schema_from_json_data(desc, names=names),
+                error_desc_list)
+            result = ErrorUnionSchema(schemas=error_schemas)
+
+        else:
+            raise ValueError(f'Internal error: unknown type {data_type!r}.')
+    else:
+        raise SchemaParseException(f'Invalid JSON descriptor for an Avro schema: {json_object!r}')
+    return result
+
+
+# Parsers for the JSON data types:
+_JSONDataParserTypeMap = {
+    str: _schema_from_json_string,
+    list: _schema_from_json_array,
+    dict: _schema_from_json_object,
+}
+
+
+def schema_from_json_data(json_data, names=None):
+    """Builds an Avro Schema from its JSON descriptor.
+    Raises SchemaParseException if the descriptor is invalid.
+
+    :param Any json_data: JSON data representing the descriptor of the Avro schema.
+    :param Any names: Optional tracker for Avro named schemas.
+    :returns: The Avro schema parsed from the JSON descriptor.
+    :rtype: Any
+    """
+    if names is None:
+        names = Names()
+
+    # Select the appropriate parser based on the JSON data type:
+    parser = _JSONDataParserTypeMap.get(type(json_data))
+    if parser is None:
+        raise SchemaParseException(
+            f'Invalid JSON descriptor for an Avro schema: {json_data!r}.')
+    return parser(json_data, names=names)
+
+
+# ------------------------------------------------------------------------------
+
+
+def parse(json_string):
+    """Constructs a Schema from its JSON descriptor in text form.
+    Raises SchemaParseException if a JSON parsing error is met, or if the JSON descriptor is invalid.
+
+    :param str json_string: String representation of the JSON descriptor of the schema.
+    :returns: The parsed schema.
+    :rtype: Any
+    """
+    try:
+        json_data = json.loads(json_string)
+    except Exception as exn:
+        raise SchemaParseException(
+            f'Error parsing schema from JSON: {json_string!r}. '
+            f'Error message: {exn!r}.') from exn
+
+    # Initialize the names object
+    names = Names()
+
+    # construct the Avro Schema object
+    return schema_from_json_data(json_data, names)