about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/caching.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/fsspec/caching.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/caching.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/caching.py966
1 files changed, 966 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/caching.py b/.venv/lib/python3.12/site-packages/fsspec/caching.py
new file mode 100644
index 00000000..34231c88
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/caching.py
@@ -0,0 +1,966 @@
+from __future__ import annotations
+
+import collections
+import functools
+import logging
+import math
+import os
+import threading
+import warnings
+from concurrent.futures import Future, ThreadPoolExecutor
+from itertools import groupby
+from operator import itemgetter
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Callable,
+    ClassVar,
+    Generic,
+    NamedTuple,
+    Optional,
+    OrderedDict,
+    TypeVar,
+)
+
+if TYPE_CHECKING:
+    import mmap
+
+    from typing_extensions import ParamSpec
+
+    P = ParamSpec("P")
+else:
+    P = TypeVar("P")
+
+T = TypeVar("T")
+
+
+logger = logging.getLogger("fsspec")
+
+Fetcher = Callable[[int, int], bytes]  # Maps (start, end) to bytes
+
+
+class BaseCache:
+    """Pass-though cache: doesn't keep anything, calls every time
+
+    Acts as base class for other cachers
+
+    Parameters
+    ----------
+    blocksize: int
+        How far to read ahead in numbers of bytes
+    fetcher: func
+        Function of the form f(start, end) which gets bytes from remote as
+        specified
+    size: int
+        How big this file is
+    """
+
+    name: ClassVar[str] = "none"
+
+    def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
+        self.blocksize = blocksize
+        self.nblocks = 0
+        self.fetcher = fetcher
+        self.size = size
+        self.hit_count = 0
+        self.miss_count = 0
+        # the bytes that we actually requested
+        self.total_requested_bytes = 0
+
+    def _fetch(self, start: int | None, stop: int | None) -> bytes:
+        if start is None:
+            start = 0
+        if stop is None:
+            stop = self.size
+        if start >= self.size or start >= stop:
+            return b""
+        return self.fetcher(start, stop)
+
+    def _reset_stats(self) -> None:
+        """Reset hit and miss counts for a more ganular report e.g. by file."""
+        self.hit_count = 0
+        self.miss_count = 0
+        self.total_requested_bytes = 0
+
+    def _log_stats(self) -> str:
+        """Return a formatted string of the cache statistics."""
+        if self.hit_count == 0 and self.miss_count == 0:
+            # a cache that does nothing, this is for logs only
+            return ""
+        return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes"
+
+    def __repr__(self) -> str:
+        # TODO: use rich for better formatting
+        return f"""
+        <{self.__class__.__name__}:
+            block size  :   {self.blocksize}
+            block count :   {self.nblocks}
+            file size   :   {self.size}
+            cache hits  :   {self.hit_count}
+            cache misses:   {self.miss_count}
+            total requested bytes: {self.total_requested_bytes}>
+        """
+
+
+class MMapCache(BaseCache):
+    """memory-mapped sparse file cache
+
+    Opens temporary file, which is filled blocks-wise when data is requested.
+    Ensure there is enough disc space in the temporary location.
+
+    This cache method might only work on posix
+    """
+
+    name = "mmap"
+
+    def __init__(
+        self,
+        blocksize: int,
+        fetcher: Fetcher,
+        size: int,
+        location: str | None = None,
+        blocks: set[int] | None = None,
+    ) -> None:
+        super().__init__(blocksize, fetcher, size)
+        self.blocks = set() if blocks is None else blocks
+        self.location = location
+        self.cache = self._makefile()
+
+    def _makefile(self) -> mmap.mmap | bytearray:
+        import mmap
+        import tempfile
+
+        if self.size == 0:
+            return bytearray()
+
+        # posix version
+        if self.location is None or not os.path.exists(self.location):
+            if self.location is None:
+                fd = tempfile.TemporaryFile()
+                self.blocks = set()
+            else:
+                fd = open(self.location, "wb+")
+            fd.seek(self.size - 1)
+            fd.write(b"1")
+            fd.flush()
+        else:
+            fd = open(self.location, "r+b")
+
+        return mmap.mmap(fd.fileno(), self.size)
+
+    def _fetch(self, start: int | None, end: int | None) -> bytes:
+        logger.debug(f"MMap cache fetching {start}-{end}")
+        if start is None:
+            start = 0
+        if end is None:
+            end = self.size
+        if start >= self.size or start >= end:
+            return b""
+        start_block = start // self.blocksize
+        end_block = end // self.blocksize
+        block_range = range(start_block, end_block + 1)
+        # Determine which blocks need to be fetched. This sequence is sorted by construction.
+        need = (i for i in block_range if i not in self.blocks)
+        # Count the number of blocks already cached
+        self.hit_count += sum(1 for i in block_range if i in self.blocks)
+
+        # Consolidate needed blocks.
+        # Algorithm adapted from Python 2.x itertools documentation.
+        # We are grouping an enumerated sequence of blocks. By comparing when the difference
+        # between an ascending range (provided by enumerate) and the needed block numbers
+        # we can detect when the block number skips values. The key computes this difference.
+        # Whenever the difference changes, we know that we have previously cached block(s),
+        # and a new group is started. In other words, this algorithm neatly groups
+        # runs of consecutive block numbers so they can be fetched together.
+        for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
+            # Extract the blocks from the enumerated sequence
+            _blocks = tuple(map(itemgetter(1), _blocks))
+            # Compute start of first block
+            sstart = _blocks[0] * self.blocksize
+            # Compute the end of the last block. Last block may not be full size.
+            send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)
+
+            # Fetch bytes (could be multiple consecutive blocks)
+            self.total_requested_bytes += send - sstart
+            logger.debug(
+                f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
+            )
+            self.cache[sstart:send] = self.fetcher(sstart, send)
+
+            # Update set of cached blocks
+            self.blocks.update(_blocks)
+            # Update cache statistics with number of blocks we had to cache
+            self.miss_count += len(_blocks)
+
+        return self.cache[start:end]
+
+    def __getstate__(self) -> dict[str, Any]:
+        state = self.__dict__.copy()
+        # Remove the unpicklable entries.
+        del state["cache"]
+        return state
+
+    def __setstate__(self, state: dict[str, Any]) -> None:
+        # Restore instance attributes
+        self.__dict__.update(state)
+        self.cache = self._makefile()
+
+
+class ReadAheadCache(BaseCache):
+    """Cache which reads only when we get beyond a block of data
+
+    This is a much simpler version of BytesCache, and does not attempt to
+    fill holes in the cache or keep fragments alive. It is best suited to
+    many small reads in a sequential order (e.g., reading lines from a file).
+    """
+
+    name = "readahead"
+
+    def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
+        super().__init__(blocksize, fetcher, size)
+        self.cache = b""
+        self.start = 0
+        self.end = 0
+
+    def _fetch(self, start: int | None, end: int | None) -> bytes:
+        if start is None:
+            start = 0
+        if end is None or end > self.size:
+            end = self.size
+        if start >= self.size or start >= end:
+            return b""
+        l = end - start
+        if start >= self.start and end <= self.end:
+            # cache hit
+            self.hit_count += 1
+            return self.cache[start - self.start : end - self.start]
+        elif self.start <= start < self.end:
+            # partial hit
+            self.miss_count += 1
+            part = self.cache[start - self.start :]
+            l -= len(part)
+            start = self.end
+        else:
+            # miss
+            self.miss_count += 1
+            part = b""
+        end = min(self.size, end + self.blocksize)
+        self.total_requested_bytes += end - start
+        self.cache = self.fetcher(start, end)  # new block replaces old
+        self.start = start
+        self.end = self.start + len(self.cache)
+        return part + self.cache[:l]
+
+
+class FirstChunkCache(BaseCache):
+    """Caches the first block of a file only
+
+    This may be useful for file types where the metadata is stored in the header,
+    but is randomly accessed.
+    """
+
+    name = "first"
+
+    def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
+        if blocksize > size:
+            # this will buffer the whole thing
+            blocksize = size
+        super().__init__(blocksize, fetcher, size)
+        self.cache: bytes | None = None
+
+    def _fetch(self, start: int | None, end: int | None) -> bytes:
+        start = start or 0
+        if start > self.size:
+            logger.debug("FirstChunkCache: requested start > file size")
+            return b""
+
+        end = min(end, self.size)
+
+        if start < self.blocksize:
+            if self.cache is None:
+                self.miss_count += 1
+                if end > self.blocksize:
+                    self.total_requested_bytes += end
+                    data = self.fetcher(0, end)
+                    self.cache = data[: self.blocksize]
+                    return data[start:]
+                self.cache = self.fetcher(0, self.blocksize)
+                self.total_requested_bytes += self.blocksize
+            part = self.cache[start:end]
+            if end > self.blocksize:
+                self.total_requested_bytes += end - self.blocksize
+                part += self.fetcher(self.blocksize, end)
+            self.hit_count += 1
+            return part
+        else:
+            self.miss_count += 1
+            self.total_requested_bytes += end - start
+            return self.fetcher(start, end)
+
+
+class BlockCache(BaseCache):
+    """
+    Cache holding memory as a set of blocks.
+
+    Requests are only ever made ``blocksize`` at a time, and are
+    stored in an LRU cache. The least recently accessed block is
+    discarded when more than ``maxblocks`` are stored.
+
+    Parameters
+    ----------
+    blocksize : int
+        The number of bytes to store in each block.
+        Requests are only ever made for ``blocksize``, so this
+        should balance the overhead of making a request against
+        the granularity of the blocks.
+    fetcher : Callable
+    size : int
+        The total size of the file being cached.
+    maxblocks : int
+        The maximum number of blocks to cache for. The maximum memory
+        use for this cache is then ``blocksize * maxblocks``.
+    """
+
+    name = "blockcache"
+
+    def __init__(
+        self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
+    ) -> None:
+        super().__init__(blocksize, fetcher, size)
+        self.nblocks = math.ceil(size / blocksize)
+        self.maxblocks = maxblocks
+        self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block)
+
+    def cache_info(self):
+        """
+        The statistics on the block cache.
+
+        Returns
+        -------
+        NamedTuple
+            Returned directly from the LRU Cache used internally.
+        """
+        return self._fetch_block_cached.cache_info()
+
+    def __getstate__(self) -> dict[str, Any]:
+        state = self.__dict__
+        del state["_fetch_block_cached"]
+        return state
+
+    def __setstate__(self, state: dict[str, Any]) -> None:
+        self.__dict__.update(state)
+        self._fetch_block_cached = functools.lru_cache(state["maxblocks"])(
+            self._fetch_block
+        )
+
+    def _fetch(self, start: int | None, end: int | None) -> bytes:
+        if start is None:
+            start = 0
+        if end is None:
+            end = self.size
+        if start >= self.size or start >= end:
+            return b""
+
+        # byte position -> block numbers
+        start_block_number = start // self.blocksize
+        end_block_number = end // self.blocksize
+
+        # these are cached, so safe to do multiple calls for the same start and end.
+        for block_number in range(start_block_number, end_block_number + 1):
+            self._fetch_block_cached(block_number)
+
+        return self._read_cache(
+            start,
+            end,
+            start_block_number=start_block_number,
+            end_block_number=end_block_number,
+        )
+
+    def _fetch_block(self, block_number: int) -> bytes:
+        """
+        Fetch the block of data for `block_number`.
+        """
+        if block_number > self.nblocks:
+            raise ValueError(
+                f"'block_number={block_number}' is greater than "
+                f"the number of blocks ({self.nblocks})"
+            )
+
+        start = block_number * self.blocksize
+        end = start + self.blocksize
+        self.total_requested_bytes += end - start
+        self.miss_count += 1
+        logger.info("BlockCache fetching block %d", block_number)
+        block_contents = super()._fetch(start, end)
+        return block_contents
+
+    def _read_cache(
+        self, start: int, end: int, start_block_number: int, end_block_number: int
+    ) -> bytes:
+        """
+        Read from our block cache.
+
+        Parameters
+        ----------
+        start, end : int
+            The start and end byte positions.
+        start_block_number, end_block_number : int
+            The start and end block numbers.
+        """
+        start_pos = start % self.blocksize
+        end_pos = end % self.blocksize
+
+        self.hit_count += 1
+        if start_block_number == end_block_number:
+            block: bytes = self._fetch_block_cached(start_block_number)
+            return block[start_pos:end_pos]
+
+        else:
+            # read from the initial
+            out = [self._fetch_block_cached(start_block_number)[start_pos:]]
+
+            # intermediate blocks
+            # Note: it'd be nice to combine these into one big request. However
+            # that doesn't play nicely with our LRU cache.
+            out.extend(
+                map(
+                    self._fetch_block_cached,
+                    range(start_block_number + 1, end_block_number),
+                )
+            )
+
+            # final block
+            out.append(self._fetch_block_cached(end_block_number)[:end_pos])
+
+            return b"".join(out)
+
+
+class BytesCache(BaseCache):
+    """Cache which holds data in a in-memory bytes object
+
+    Implements read-ahead by the block size, for semi-random reads progressing
+    through the file.
+
+    Parameters
+    ----------
+    trim: bool
+        As we read more data, whether to discard the start of the buffer when
+        we are more than a blocksize ahead of it.
+    """
+
+    name: ClassVar[str] = "bytes"
+
+    def __init__(
+        self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True
+    ) -> None:
+        super().__init__(blocksize, fetcher, size)
+        self.cache = b""
+        self.start: int | None = None
+        self.end: int | None = None
+        self.trim = trim
+
+    def _fetch(self, start: int | None, end: int | None) -> bytes:
+        # TODO: only set start/end after fetch, in case it fails?
+        # is this where retry logic might go?
+        if start is None:
+            start = 0
+        if end is None:
+            end = self.size
+        if start >= self.size or start >= end:
+            return b""
+        if (
+            self.start is not None
+            and start >= self.start
+            and self.end is not None
+            and end < self.end
+        ):
+            # cache hit: we have all the required data
+            offset = start - self.start
+            self.hit_count += 1
+            return self.cache[offset : offset + end - start]
+
+        if self.blocksize:
+            bend = min(self.size, end + self.blocksize)
+        else:
+            bend = end
+
+        if bend == start or start > self.size:
+            return b""
+
+        if (self.start is None or start < self.start) and (
+            self.end is None or end > self.end
+        ):
+            # First read, or extending both before and after
+            self.total_requested_bytes += bend - start
+            self.miss_count += 1
+            self.cache = self.fetcher(start, bend)
+            self.start = start
+        else:
+            assert self.start is not None
+            assert self.end is not None
+            self.miss_count += 1
+
+            if start < self.start:
+                if self.end is None or self.end - end > self.blocksize:
+                    self.total_requested_bytes += bend - start
+                    self.cache = self.fetcher(start, bend)
+                    self.start = start
+                else:
+                    self.total_requested_bytes += self.start - start
+                    new = self.fetcher(start, self.start)
+                    self.start = start
+                    self.cache = new + self.cache
+            elif self.end is not None and bend > self.end:
+                if self.end > self.size:
+                    pass
+                elif end - self.end > self.blocksize:
+                    self.total_requested_bytes += bend - start
+                    self.cache = self.fetcher(start, bend)
+                    self.start = start
+                else:
+                    self.total_requested_bytes += bend - self.end
+                    new = self.fetcher(self.end, bend)
+                    self.cache = self.cache + new
+
+        self.end = self.start + len(self.cache)
+        offset = start - self.start
+        out = self.cache[offset : offset + end - start]
+        if self.trim:
+            num = (self.end - self.start) // (self.blocksize + 1)
+            if num > 1:
+                self.start += self.blocksize * num
+                self.cache = self.cache[self.blocksize * num :]
+        return out
+
+    def __len__(self) -> int:
+        return len(self.cache)
+
+
+class AllBytes(BaseCache):
+    """Cache entire contents of the file"""
+
+    name: ClassVar[str] = "all"
+
+    def __init__(
+        self,
+        blocksize: int | None = None,
+        fetcher: Fetcher | None = None,
+        size: int | None = None,
+        data: bytes | None = None,
+    ) -> None:
+        super().__init__(blocksize, fetcher, size)  # type: ignore[arg-type]
+        if data is None:
+            self.miss_count += 1
+            self.total_requested_bytes += self.size
+            data = self.fetcher(0, self.size)
+        self.data = data
+
+    def _fetch(self, start: int | None, stop: int | None) -> bytes:
+        self.hit_count += 1
+        return self.data[start:stop]
+
+
+class KnownPartsOfAFile(BaseCache):
+    """
+    Cache holding known file parts.
+
+    Parameters
+    ----------
+    blocksize: int
+        How far to read ahead in numbers of bytes
+    fetcher: func
+        Function of the form f(start, end) which gets bytes from remote as
+        specified
+    size: int
+        How big this file is
+    data: dict
+        A dictionary mapping explicit `(start, stop)` file-offset tuples
+        with known bytes.
+    strict: bool, default True
+        Whether to fetch reads that go beyond a known byte-range boundary.
+        If `False`, any read that ends outside a known part will be zero
+        padded. Note that zero padding will not be used for reads that
+        begin outside a known byte-range.
+    """
+
+    name: ClassVar[str] = "parts"
+
+    def __init__(
+        self,
+        blocksize: int,
+        fetcher: Fetcher,
+        size: int,
+        data: Optional[dict[tuple[int, int], bytes]] = None,
+        strict: bool = True,
+        **_: Any,
+    ):
+        super().__init__(blocksize, fetcher, size)
+        self.strict = strict
+
+        # simple consolidation of contiguous blocks
+        if data:
+            old_offsets = sorted(data.keys())
+            offsets = [old_offsets[0]]
+            blocks = [data.pop(old_offsets[0])]
+            for start, stop in old_offsets[1:]:
+                start0, stop0 = offsets[-1]
+                if start == stop0:
+                    offsets[-1] = (start0, stop)
+                    blocks[-1] += data.pop((start, stop))
+                else:
+                    offsets.append((start, stop))
+                    blocks.append(data.pop((start, stop)))
+
+            self.data = dict(zip(offsets, blocks))
+        else:
+            self.data = {}
+
+    def _fetch(self, start: int | None, stop: int | None) -> bytes:
+        if start is None:
+            start = 0
+        if stop is None:
+            stop = self.size
+
+        out = b""
+        for (loc0, loc1), data in self.data.items():
+            # If self.strict=False, use zero-padded data
+            # for reads beyond the end of a "known" buffer
+            if loc0 <= start < loc1:
+                off = start - loc0
+                out = data[off : off + stop - start]
+                if not self.strict or loc0 <= stop <= loc1:
+                    # The request is within a known range, or
+                    # it begins within a known range, and we
+                    # are allowed to pad reads beyond the
+                    # buffer with zero
+                    out += b"\x00" * (stop - start - len(out))
+                    self.hit_count += 1
+                    return out
+                else:
+                    # The request ends outside a known range,
+                    # and we are being "strict" about reads
+                    # beyond the buffer
+                    start = loc1
+                    break
+
+        # We only get here if there is a request outside the
+        # known parts of the file. In an ideal world, this
+        # should never happen
+        if self.fetcher is None:
+            # We cannot fetch the data, so raise an error
+            raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ")
+        # We can fetch the data, but should warn the user
+        # that this may be slow
+        warnings.warn(
+            f"Read is outside the known file parts: {(start, stop)}. "
+            f"IO/caching performance may be poor!"
+        )
+        logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}")
+        self.total_requested_bytes += stop - start
+        self.miss_count += 1
+        return out + super()._fetch(start, stop)
+
+
+class UpdatableLRU(Generic[P, T]):
+    """
+    Custom implementation of LRU cache that allows updating keys
+
+    Used by BackgroudBlockCache
+    """
+
+    class CacheInfo(NamedTuple):
+        hits: int
+        misses: int
+        maxsize: int
+        currsize: int
+
+    def __init__(self, func: Callable[P, T], max_size: int = 128) -> None:
+        self._cache: OrderedDict[Any, T] = collections.OrderedDict()
+        self._func = func
+        self._max_size = max_size
+        self._hits = 0
+        self._misses = 0
+        self._lock = threading.Lock()
+
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
+        if kwargs:
+            raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}")
+        with self._lock:
+            if args in self._cache:
+                self._cache.move_to_end(args)
+                self._hits += 1
+                return self._cache[args]
+
+        result = self._func(*args, **kwargs)
+
+        with self._lock:
+            self._cache[args] = result
+            self._misses += 1
+            if len(self._cache) > self._max_size:
+                self._cache.popitem(last=False)
+
+        return result
+
+    def is_key_cached(self, *args: Any) -> bool:
+        with self._lock:
+            return args in self._cache
+
+    def add_key(self, result: T, *args: Any) -> None:
+        with self._lock:
+            self._cache[args] = result
+            if len(self._cache) > self._max_size:
+                self._cache.popitem(last=False)
+
+    def cache_info(self) -> UpdatableLRU.CacheInfo:
+        with self._lock:
+            return self.CacheInfo(
+                maxsize=self._max_size,
+                currsize=len(self._cache),
+                hits=self._hits,
+                misses=self._misses,
+            )
+
+
+class BackgroundBlockCache(BaseCache):
+    """
+    Cache holding memory as a set of blocks with pre-loading of
+    the next block in the background.
+
+    Requests are only ever made ``blocksize`` at a time, and are
+    stored in an LRU cache. The least recently accessed block is
+    discarded when more than ``maxblocks`` are stored. If the
+    next block is not in cache, it is loaded in a separate thread
+    in non-blocking way.
+
+    Parameters
+    ----------
+    blocksize : int
+        The number of bytes to store in each block.
+        Requests are only ever made for ``blocksize``, so this
+        should balance the overhead of making a request against
+        the granularity of the blocks.
+    fetcher : Callable
+    size : int
+        The total size of the file being cached.
+    maxblocks : int
+        The maximum number of blocks to cache for. The maximum memory
+        use for this cache is then ``blocksize * maxblocks``.
+    """
+
+    name: ClassVar[str] = "background"
+
+    def __init__(
+        self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
+    ) -> None:
+        super().__init__(blocksize, fetcher, size)
+        self.nblocks = math.ceil(size / blocksize)
+        self.maxblocks = maxblocks
+        self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks)
+
+        self._thread_executor = ThreadPoolExecutor(max_workers=1)
+        self._fetch_future_block_number: int | None = None
+        self._fetch_future: Future[bytes] | None = None
+        self._fetch_future_lock = threading.Lock()
+
+    def cache_info(self) -> UpdatableLRU.CacheInfo:
+        """
+        The statistics on the block cache.
+
+        Returns
+        -------
+        NamedTuple
+            Returned directly from the LRU Cache used internally.
+        """
+        return self._fetch_block_cached.cache_info()
+
+    def __getstate__(self) -> dict[str, Any]:
+        state = self.__dict__
+        del state["_fetch_block_cached"]
+        del state["_thread_executor"]
+        del state["_fetch_future_block_number"]
+        del state["_fetch_future"]
+        del state["_fetch_future_lock"]
+        return state
+
+    def __setstate__(self, state) -> None:
+        self.__dict__.update(state)
+        self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"])
+        self._thread_executor = ThreadPoolExecutor(max_workers=1)
+        self._fetch_future_block_number = None
+        self._fetch_future = None
+        self._fetch_future_lock = threading.Lock()
+
+    def _fetch(self, start: int | None, end: int | None) -> bytes:
+        if start is None:
+            start = 0
+        if end is None:
+            end = self.size
+        if start >= self.size or start >= end:
+            return b""
+
+        # byte position -> block numbers
+        start_block_number = start // self.blocksize
+        end_block_number = end // self.blocksize
+
+        fetch_future_block_number = None
+        fetch_future = None
+        with self._fetch_future_lock:
+            # Background thread is running. Check we we can or must join it.
+            if self._fetch_future is not None:
+                assert self._fetch_future_block_number is not None
+                if self._fetch_future.done():
+                    logger.info("BlockCache joined background fetch without waiting.")
+                    self._fetch_block_cached.add_key(
+                        self._fetch_future.result(), self._fetch_future_block_number
+                    )
+                    # Cleanup the fetch variables. Done with fetching the block.
+                    self._fetch_future_block_number = None
+                    self._fetch_future = None
+                else:
+                    # Must join if we need the block for the current fetch
+                    must_join = bool(
+                        start_block_number
+                        <= self._fetch_future_block_number
+                        <= end_block_number
+                    )
+                    if must_join:
+                        # Copy to the local variables to release lock
+                        # before waiting for result
+                        fetch_future_block_number = self._fetch_future_block_number
+                        fetch_future = self._fetch_future
+
+                        # Cleanup the fetch variables. Have a local copy.
+                        self._fetch_future_block_number = None
+                        self._fetch_future = None
+
+        # Need to wait for the future for the current read
+        if fetch_future is not None:
+            logger.info("BlockCache waiting for background fetch.")
+            # Wait until result and put it in cache
+            self._fetch_block_cached.add_key(
+                fetch_future.result(), fetch_future_block_number
+            )
+
+        # these are cached, so safe to do multiple calls for the same start and end.
+        for block_number in range(start_block_number, end_block_number + 1):
+            self._fetch_block_cached(block_number)
+
+        # fetch next block in the background if nothing is running in the background,
+        # the block is within file and it is not already cached
+        end_block_plus_1 = end_block_number + 1
+        with self._fetch_future_lock:
+            if (
+                self._fetch_future is None
+                and end_block_plus_1 <= self.nblocks
+                and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
+            ):
+                self._fetch_future_block_number = end_block_plus_1
+                self._fetch_future = self._thread_executor.submit(
+                    self._fetch_block, end_block_plus_1, "async"
+                )
+
+        return self._read_cache(
+            start,
+            end,
+            start_block_number=start_block_number,
+            end_block_number=end_block_number,
+        )
+
+    def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes:
+        """
+        Fetch the block of data for `block_number`.
+        """
+        if block_number > self.nblocks:
+            raise ValueError(
+                f"'block_number={block_number}' is greater than "
+                f"the number of blocks ({self.nblocks})"
+            )
+
+        start = block_number * self.blocksize
+        end = start + self.blocksize
+        logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
+        self.total_requested_bytes += end - start
+        self.miss_count += 1
+        block_contents = super()._fetch(start, end)
+        return block_contents
+
+    def _read_cache(
+        self, start: int, end: int, start_block_number: int, end_block_number: int
+    ) -> bytes:
+        """
+        Read from our block cache.
+
+        Parameters
+        ----------
+        start, end : int
+            The start and end byte positions.
+        start_block_number, end_block_number : int
+            The start and end block numbers.
+        """
+        start_pos = start % self.blocksize
+        end_pos = end % self.blocksize
+
+        # kind of pointless to count this as a hit, but it is
+        self.hit_count += 1
+
+        if start_block_number == end_block_number:
+            block = self._fetch_block_cached(start_block_number)
+            return block[start_pos:end_pos]
+
+        else:
+            # read from the initial
+            out = [self._fetch_block_cached(start_block_number)[start_pos:]]
+
+            # intermediate blocks
+            # Note: it'd be nice to combine these into one big request. However
+            # that doesn't play nicely with our LRU cache.
+            out.extend(
+                map(
+                    self._fetch_block_cached,
+                    range(start_block_number + 1, end_block_number),
+                )
+            )
+
+            # final block
+            out.append(self._fetch_block_cached(end_block_number)[:end_pos])
+
+            return b"".join(out)
+
+
+caches: dict[str | None, type[BaseCache]] = {
+    # one custom case
+    None: BaseCache,
+}
+
+
+def register_cache(cls: type[BaseCache], clobber: bool = False) -> None:
+    """'Register' cache implementation.
+
+    Parameters
+    ----------
+    clobber: bool, optional
+        If set to True (default is False) - allow to overwrite existing
+        entry.
+
+    Raises
+    ------
+    ValueError
+    """
+    name = cls.name
+    if not clobber and name in caches:
+        raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}")
+    caches[name] = cls
+
+
+for c in (
+    BaseCache,
+    MMapCache,
+    BytesCache,
+    ReadAheadCache,
+    BlockCache,
+    FirstChunkCache,
+    AllBytes,
+    KnownPartsOfAFile,
+    BackgroundBlockCache,
+):
+    register_cache(c)