about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/reference.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/fsspec/implementations/reference.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/reference.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/reference.py1216
1 files changed, 1216 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/reference.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/reference.py
new file mode 100644
index 00000000..60fb5d57
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/reference.py
@@ -0,0 +1,1216 @@
+import base64
+import collections
+import io
+import itertools
+import logging
+import math
+import os
+from functools import lru_cache
+from itertools import chain
+from typing import TYPE_CHECKING, Literal
+
+import fsspec.core
+
+try:
+    import ujson as json
+except ImportError:
+    if not TYPE_CHECKING:
+        import json
+
+from fsspec.asyn import AsyncFileSystem
+from fsspec.callbacks import DEFAULT_CALLBACK
+from fsspec.core import filesystem, open, split_protocol
+from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
+from fsspec.utils import isfilelike, merge_offset_ranges, other_paths
+
+logger = logging.getLogger("fsspec.reference")
+
+
+class ReferenceNotReachable(RuntimeError):
+    def __init__(self, reference, target, *args):
+        super().__init__(*args)
+        self.reference = reference
+        self.target = target
+
+    def __str__(self):
+        return f'Reference "{self.reference}" failed to fetch target {self.target}'
+
+
+def _first(d):
+    return next(iter(d.values()))
+
+
+def _prot_in_references(path, references):
+    ref = references.get(path)
+    if isinstance(ref, (list, tuple)) and isinstance(ref[0], str):
+        return split_protocol(ref[0])[0] if ref[0] else ref[0]
+
+
+def _protocol_groups(paths, references):
+    if isinstance(paths, str):
+        return {_prot_in_references(paths, references): [paths]}
+    out = {}
+    for path in paths:
+        protocol = _prot_in_references(path, references)
+        out.setdefault(protocol, []).append(path)
+    return out
+
+
+class RefsValuesView(collections.abc.ValuesView):
+    def __iter__(self):
+        for val in self._mapping.zmetadata.values():
+            yield json.dumps(val).encode()
+        yield from self._mapping._items.values()
+        for field in self._mapping.listdir():
+            chunk_sizes = self._mapping._get_chunk_sizes(field)
+            if len(chunk_sizes) == 0:
+                yield self._mapping[field + "/0"]
+                continue
+            yield from self._mapping._generate_all_records(field)
+
+
+class RefsItemsView(collections.abc.ItemsView):
+    def __iter__(self):
+        return zip(self._mapping.keys(), self._mapping.values())
+
+
+def ravel_multi_index(idx, sizes):
+    val = 0
+    mult = 1
+    for i, s in zip(idx[::-1], sizes[::-1]):
+        val += i * mult
+        mult *= s
+    return val
+
+
+class LazyReferenceMapper(collections.abc.MutableMapping):
+    """This interface can be used to read/write references from Parquet stores.
+    It is not intended for other types of references.
+    It can be used with Kerchunk's MultiZarrToZarr method to combine
+    references into a parquet store.
+    Examples of this use-case can be found here:
+    https://fsspec.github.io/kerchunk/advanced.html?highlight=parquet#parquet-storage"""
+
+    # import is class level to prevent numpy dep requirement for fsspec
+    @property
+    def np(self):
+        import numpy as np
+
+        return np
+
+    @property
+    def pd(self):
+        import pandas as pd
+
+        return pd
+
+    def __init__(
+        self,
+        root,
+        fs=None,
+        out_root=None,
+        cache_size=128,
+        categorical_threshold=10,
+        engine: Literal["fastparquet", "pyarrow"] = "fastparquet",
+    ):
+        """
+
+        This instance will be writable, storing changes in memory until full partitions
+        are accumulated or .flush() is called.
+
+        To create an empty lazy store, use .create()
+
+        Parameters
+        ----------
+        root : str
+            Root of parquet store
+        fs : fsspec.AbstractFileSystem
+            fsspec filesystem object, default is local filesystem.
+        cache_size : int, default=128
+            Maximum size of LRU cache, where cache_size*record_size denotes
+            the total number of references that can be loaded in memory at once.
+        categorical_threshold : int
+            Encode urls as pandas.Categorical to reduce memory footprint if the ratio
+            of the number of unique urls to total number of refs for each variable
+            is greater than or equal to this number. (default 10)
+        engine: Literal["fastparquet","pyarrow"]
+            Engine choice for reading parquet files. (default is "fastparquet")
+        """
+
+        self.root = root
+        self.chunk_sizes = {}
+        self.out_root = out_root or self.root
+        self.cat_thresh = categorical_threshold
+        self.engine = engine
+        self.cache_size = cache_size
+        self.url = self.root + "/{field}/refs.{record}.parq"
+        # TODO: derive fs from `root`
+        self.fs = fsspec.filesystem("file") if fs is None else fs
+
+        from importlib.util import find_spec
+
+        if self.engine == "pyarrow" and find_spec("pyarrow") is None:
+            raise ImportError("engine choice `pyarrow` is not installed.")
+
+    def __getattr__(self, item):
+        if item in ("_items", "record_size", "zmetadata"):
+            self.setup()
+            # avoid possible recursion if setup fails somehow
+            return self.__dict__[item]
+        raise AttributeError(item)
+
+    def setup(self):
+        self._items = {}
+        self._items[".zmetadata"] = self.fs.cat_file(
+            "/".join([self.root, ".zmetadata"])
+        )
+        met = json.loads(self._items[".zmetadata"])
+        self.record_size = met["record_size"]
+        self.zmetadata = met["metadata"]
+
+        # Define function to open and decompress refs
+        @lru_cache(maxsize=self.cache_size)
+        def open_refs(field, record):
+            """cached parquet file loader"""
+            path = self.url.format(field=field, record=record)
+            data = io.BytesIO(self.fs.cat_file(path))
+            try:
+                df = self.pd.read_parquet(data, engine=self.engine)
+                refs = {c: df[c].to_numpy() for c in df.columns}
+            except OSError:
+                refs = None
+            return refs
+
+        self.open_refs = open_refs
+
+    @staticmethod
+    def create(root, storage_options=None, fs=None, record_size=10000, **kwargs):
+        """Make empty parquet reference set
+
+        First deletes the contents of the given directory, if it exists.
+
+        Parameters
+        ----------
+        root: str
+            Directory to contain the output; will be created
+        storage_options: dict | None
+            For making the filesystem to use for writing is fs is None
+        fs: FileSystem | None
+            Filesystem for writing
+        record_size: int
+            Number of references per parquet file
+        kwargs: passed to __init__
+
+        Returns
+        -------
+        LazyReferenceMapper instance
+        """
+        met = {"metadata": {}, "record_size": record_size}
+        if fs is None:
+            fs, root = fsspec.core.url_to_fs(root, **(storage_options or {}))
+        if fs.exists(root):
+            fs.rm(root, recursive=True)
+        fs.makedirs(root, exist_ok=True)
+        fs.pipe("/".join([root, ".zmetadata"]), json.dumps(met).encode())
+        return LazyReferenceMapper(root, fs, **kwargs)
+
+    @lru_cache()
+    def listdir(self):
+        """List top-level directories"""
+        dirs = (p.rsplit("/", 1)[0] for p in self.zmetadata if not p.startswith(".z"))
+        return set(dirs)
+
+    def ls(self, path="", detail=True):
+        """Shortcut file listings"""
+        path = path.rstrip("/")
+        pathdash = path + "/" if path else ""
+        dirnames = self.listdir()
+        dirs = [
+            d
+            for d in dirnames
+            if d.startswith(pathdash) and "/" not in d.lstrip(pathdash)
+        ]
+        if dirs:
+            others = {
+                f
+                for f in chain(
+                    [".zmetadata"],
+                    (name for name in self.zmetadata),
+                    (name for name in self._items),
+                )
+                if f.startswith(pathdash) and "/" not in f.lstrip(pathdash)
+            }
+            if detail is False:
+                others.update(dirs)
+                return sorted(others)
+            dirinfo = [{"name": name, "type": "directory", "size": 0} for name in dirs]
+            fileinfo = [
+                {
+                    "name": name,
+                    "type": "file",
+                    "size": len(
+                        json.dumps(self.zmetadata[name])
+                        if name in self.zmetadata
+                        else self._items[name]
+                    ),
+                }
+                for name in others
+            ]
+            return sorted(dirinfo + fileinfo, key=lambda s: s["name"])
+        field = path
+        others = set(
+            [name for name in self.zmetadata if name.startswith(f"{path}/")]
+            + [name for name in self._items if name.startswith(f"{path}/")]
+        )
+        fileinfo = [
+            {
+                "name": name,
+                "type": "file",
+                "size": len(
+                    json.dumps(self.zmetadata[name])
+                    if name in self.zmetadata
+                    else self._items[name]
+                ),
+            }
+            for name in others
+        ]
+        keys = self._keys_in_field(field)
+
+        if detail is False:
+            return list(others) + list(keys)
+        recs = self._generate_all_records(field)
+        recinfo = [
+            {"name": name, "type": "file", "size": rec[-1]}
+            for name, rec in zip(keys, recs)
+            if rec[0]  # filters out path==None, deleted/missing
+        ]
+        return fileinfo + recinfo
+
+    def _load_one_key(self, key):
+        """Get the reference for one key
+
+        Returns bytes, one-element list or three-element list.
+        """
+        if key in self._items:
+            return self._items[key]
+        elif key in self.zmetadata:
+            return json.dumps(self.zmetadata[key]).encode()
+        elif "/" not in key or self._is_meta(key):
+            raise KeyError(key)
+        field, _ = key.rsplit("/", 1)
+        record, ri, chunk_size = self._key_to_record(key)
+        maybe = self._items.get((field, record), {}).get(ri, False)
+        if maybe is None:
+            # explicitly deleted
+            raise KeyError
+        elif maybe:
+            return maybe
+        elif chunk_size == 0:
+            return b""
+
+        # Chunk keys can be loaded from row group and cached in LRU cache
+        try:
+            refs = self.open_refs(field, record)
+        except (ValueError, TypeError, FileNotFoundError) as exc:
+            raise KeyError(key) from exc
+        columns = ["path", "offset", "size", "raw"]
+        selection = [refs[c][ri] if c in refs else None for c in columns]
+        raw = selection[-1]
+        if raw is not None:
+            return raw
+        if selection[0] is None:
+            raise KeyError("This reference does not exist or has been deleted")
+        if selection[1:3] == [0, 0]:
+            # URL only
+            return selection[:1]
+        # URL, offset, size
+        return selection[:3]
+
+    @lru_cache(4096)
+    def _key_to_record(self, key):
+        """Details needed to construct a reference for one key"""
+        field, chunk = key.rsplit("/", 1)
+        chunk_sizes = self._get_chunk_sizes(field)
+        if len(chunk_sizes) == 0:
+            return 0, 0, 0
+        chunk_idx = [int(c) for c in chunk.split(".")]
+        chunk_number = ravel_multi_index(chunk_idx, chunk_sizes)
+        record = chunk_number // self.record_size
+        ri = chunk_number % self.record_size
+        return record, ri, len(chunk_sizes)
+
+    def _get_chunk_sizes(self, field):
+        """The number of chunks along each axis for a given field"""
+        if field not in self.chunk_sizes:
+            zarray = self.zmetadata[f"{field}/.zarray"]
+            size_ratio = [
+                math.ceil(s / c) for s, c in zip(zarray["shape"], zarray["chunks"])
+            ]
+            self.chunk_sizes[field] = size_ratio or [1]
+        return self.chunk_sizes[field]
+
+    def _generate_record(self, field, record):
+        """The references for a given parquet file of a given field"""
+        refs = self.open_refs(field, record)
+        it = iter(zip(*refs.values()))
+        if len(refs) == 3:
+            # All urls
+            return (list(t) for t in it)
+        elif len(refs) == 1:
+            # All raws
+            return refs["raw"]
+        else:
+            # Mix of urls and raws
+            return (list(t[:3]) if not t[3] else t[3] for t in it)
+
+    def _generate_all_records(self, field):
+        """Load all the references within a field by iterating over the parquet files"""
+        nrec = 1
+        for ch in self._get_chunk_sizes(field):
+            nrec *= ch
+        nrec = math.ceil(nrec / self.record_size)
+        for record in range(nrec):
+            yield from self._generate_record(field, record)
+
+    def values(self):
+        return RefsValuesView(self)
+
+    def items(self):
+        return RefsItemsView(self)
+
+    def __hash__(self):
+        return id(self)
+
+    def __getitem__(self, key):
+        return self._load_one_key(key)
+
+    def __setitem__(self, key, value):
+        if "/" in key and not self._is_meta(key):
+            field, chunk = key.rsplit("/", 1)
+            record, i, _ = self._key_to_record(key)
+            subdict = self._items.setdefault((field, record), {})
+            subdict[i] = value
+            if len(subdict) == self.record_size:
+                self.write(field, record)
+        else:
+            # metadata or top-level
+            self._items[key] = value
+            new_value = json.loads(
+                value.decode() if isinstance(value, bytes) else value
+            )
+            self.zmetadata[key] = {**self.zmetadata.get(key, {}), **new_value}
+
+    @staticmethod
+    def _is_meta(key):
+        return key.startswith(".z") or "/.z" in key
+
+    def __delitem__(self, key):
+        if key in self._items:
+            del self._items[key]
+        elif key in self.zmetadata:
+            del self.zmetadata[key]
+        else:
+            if "/" in key and not self._is_meta(key):
+                field, _ = key.rsplit("/", 1)
+                record, i, _ = self._key_to_record(key)
+                subdict = self._items.setdefault((field, record), {})
+                subdict[i] = None
+                if len(subdict) == self.record_size:
+                    self.write(field, record)
+            else:
+                # metadata or top-level
+                self._items[key] = None
+
+    def write(self, field, record, base_url=None, storage_options=None):
+        # extra requirements if writing
+        import kerchunk.df
+        import numpy as np
+        import pandas as pd
+
+        partition = self._items[(field, record)]
+        original = False
+        if len(partition) < self.record_size:
+            try:
+                original = self.open_refs(field, record)
+            except OSError:
+                pass
+
+        if original:
+            paths = original["path"]
+            offsets = original["offset"]
+            sizes = original["size"]
+            raws = original["raw"]
+        else:
+            paths = np.full(self.record_size, np.nan, dtype="O")
+            offsets = np.zeros(self.record_size, dtype="int64")
+            sizes = np.zeros(self.record_size, dtype="int64")
+            raws = np.full(self.record_size, np.nan, dtype="O")
+        for j, data in partition.items():
+            if isinstance(data, list):
+                if (
+                    str(paths.dtype) == "category"
+                    and data[0] not in paths.dtype.categories
+                ):
+                    paths = paths.add_categories(data[0])
+                paths[j] = data[0]
+                if len(data) > 1:
+                    offsets[j] = data[1]
+                    sizes[j] = data[2]
+            elif data is None:
+                # delete
+                paths[j] = None
+                offsets[j] = 0
+                sizes[j] = 0
+                raws[j] = None
+            else:
+                # this is the only call into kerchunk, could remove
+                raws[j] = kerchunk.df._proc_raw(data)
+        # TODO: only save needed columns
+        df = pd.DataFrame(
+            {
+                "path": paths,
+                "offset": offsets,
+                "size": sizes,
+                "raw": raws,
+            },
+            copy=False,
+        )
+        if df.path.count() / (df.path.nunique() or 1) > self.cat_thresh:
+            df["path"] = df["path"].astype("category")
+        object_encoding = {"raw": "bytes", "path": "utf8"}
+        has_nulls = ["path", "raw"]
+
+        fn = f"{base_url or self.out_root}/{field}/refs.{record}.parq"
+        self.fs.mkdirs(f"{base_url or self.out_root}/{field}", exist_ok=True)
+
+        if self.engine == "pyarrow":
+            df_backend_kwargs = {"write_statistics": False}
+        elif self.engine == "fastparquet":
+            df_backend_kwargs = {
+                "stats": False,
+                "object_encoding": object_encoding,
+                "has_nulls": has_nulls,
+            }
+        else:
+            raise NotImplementedError(f"{self.engine} not supported")
+
+        df.to_parquet(
+            fn,
+            engine=self.engine,
+            storage_options=storage_options
+            or getattr(self.fs, "storage_options", None),
+            compression="zstd",
+            index=False,
+            **df_backend_kwargs,
+        )
+
+        partition.clear()
+        self._items.pop((field, record))
+
+    def flush(self, base_url=None, storage_options=None):
+        """Output any modified or deleted keys
+
+        Parameters
+        ----------
+        base_url: str
+            Location of the output
+        """
+
+        # write what we have so far and clear sub chunks
+        for thing in list(self._items):
+            if isinstance(thing, tuple):
+                field, record = thing
+                self.write(
+                    field,
+                    record,
+                    base_url=base_url,
+                    storage_options=storage_options,
+                )
+
+        # gather .zmetadata from self._items and write that too
+        for k in list(self._items):
+            if k != ".zmetadata" and ".z" in k:
+                self.zmetadata[k] = json.loads(self._items.pop(k))
+        met = {"metadata": self.zmetadata, "record_size": self.record_size}
+        self._items.clear()
+        self._items[".zmetadata"] = json.dumps(met).encode()
+        self.fs.pipe(
+            "/".join([base_url or self.out_root, ".zmetadata"]),
+            self._items[".zmetadata"],
+        )
+
+        # TODO: only clear those that we wrote to?
+        self.open_refs.cache_clear()
+
+    def __len__(self):
+        # Caveat: This counts expected references, not actual - but is fast
+        count = 0
+        for field in self.listdir():
+            if field.startswith("."):
+                count += 1
+            else:
+                count += math.prod(self._get_chunk_sizes(field))
+        count += len(self.zmetadata)  # all metadata keys
+        # any other files not in reference partitions
+        count += sum(1 for _ in self._items if not isinstance(_, tuple))
+        return count
+
+    def __iter__(self):
+        # Caveat: returns only existing keys, so the number of these does not
+        #  match len(self)
+        metas = set(self.zmetadata)
+        metas.update(self._items)
+        for bit in metas:
+            if isinstance(bit, str):
+                yield bit
+        for field in self.listdir():
+            for k in self._keys_in_field(field):
+                if k in self:
+                    yield k
+
+    def __contains__(self, item):
+        try:
+            self._load_one_key(item)
+            return True
+        except KeyError:
+            return False
+
+    def _keys_in_field(self, field):
+        """List key names in given field
+
+        Produces strings like "field/x.y" appropriate from the chunking of the array
+        """
+        chunk_sizes = self._get_chunk_sizes(field)
+        if len(chunk_sizes) == 0:
+            yield field + "/0"
+            return
+        inds = itertools.product(*(range(i) for i in chunk_sizes))
+        for ind in inds:
+            yield field + "/" + ".".join([str(c) for c in ind])
+
+
+class ReferenceFileSystem(AsyncFileSystem):
+    """View byte ranges of some other file as a file system
+    Initial version: single file system target, which must support
+    async, and must allow start and end args in _cat_file. Later versions
+    may allow multiple arbitrary URLs for the targets.
+    This FileSystem is read-only. It is designed to be used with async
+    targets (for now). This FileSystem only allows whole-file access, no
+    ``open``. We do not get original file details from the target FS.
+    Configuration is by passing a dict of references at init, or a URL to
+    a JSON file containing the same; this dict
+    can also contain concrete data for some set of paths.
+    Reference dict format:
+    {path0: bytes_data, path1: (target_url, offset, size)}
+    https://github.com/fsspec/kerchunk/blob/main/README.md
+    """
+
+    protocol = "reference"
+
+    def __init__(
+        self,
+        fo,
+        target=None,
+        ref_storage_args=None,
+        target_protocol=None,
+        target_options=None,
+        remote_protocol=None,
+        remote_options=None,
+        fs=None,
+        template_overrides=None,
+        simple_templates=True,
+        max_gap=64_000,
+        max_block=256_000_000,
+        cache_size=128,
+        **kwargs,
+    ):
+        """
+        Parameters
+        ----------
+        fo : dict or str
+            The set of references to use for this instance, with a structure as above.
+            If str referencing a JSON file, will use fsspec.open, in conjunction
+            with target_options and target_protocol to open and parse JSON at this
+            location. If a directory, then assume references are a set of parquet
+            files to be loaded lazily.
+        target : str
+            For any references having target_url as None, this is the default file
+            target to use
+        ref_storage_args : dict
+            If references is a str, use these kwargs for loading the JSON file.
+            Deprecated: use target_options instead.
+        target_protocol : str
+            Used for loading the reference file, if it is a path. If None, protocol
+            will be derived from the given path
+        target_options : dict
+            Extra FS options for loading the reference file ``fo``, if given as a path
+        remote_protocol : str
+            The protocol of the filesystem on which the references will be evaluated
+            (unless fs is provided). If not given, will be derived from the first
+            URL that has a protocol in the templates or in the references, in that
+            order.
+        remote_options : dict
+            kwargs to go with remote_protocol
+        fs : AbstractFileSystem | dict(str, (AbstractFileSystem | dict))
+            Directly provide a file system(s):
+                - a single filesystem instance
+                - a dict of protocol:filesystem, where each value is either a filesystem
+                  instance, or a dict of kwargs that can be used to create in
+                  instance for the given protocol
+
+            If this is given, remote_options and remote_protocol are ignored.
+        template_overrides : dict
+            Swap out any templates in the references file with these - useful for
+            testing.
+        simple_templates: bool
+            Whether templates can be processed with simple replace (True) or if
+            jinja  is needed (False, much slower). All reference sets produced by
+            ``kerchunk`` are simple in this sense, but the spec allows for complex.
+        max_gap, max_block: int
+            For merging multiple concurrent requests to the same remote file.
+            Neighboring byte ranges will only be merged when their
+            inter-range gap is <= ``max_gap``. Default is 64KB. Set to 0
+            to only merge when it requires no extra bytes. Pass a negative
+            number to disable merging, appropriate for local target files.
+            Neighboring byte ranges will only be merged when the size of
+            the aggregated range is <= ``max_block``. Default is 256MB.
+        cache_size : int
+            Maximum size of LRU cache, where cache_size*record_size denotes
+            the total number of references that can be loaded in memory at once.
+            Only used for lazily loaded references.
+        kwargs : passed to parent class
+        """
+        super().__init__(**kwargs)
+        self.target = target
+        self.template_overrides = template_overrides
+        self.simple_templates = simple_templates
+        self.templates = {}
+        self.fss = {}
+        self._dircache = {}
+        self.max_gap = max_gap
+        self.max_block = max_block
+        if isinstance(fo, str):
+            dic = dict(
+                **(ref_storage_args or target_options or {}), protocol=target_protocol
+            )
+            ref_fs, fo2 = fsspec.core.url_to_fs(fo, **dic)
+            if ref_fs.isfile(fo2):
+                # text JSON
+                with fsspec.open(fo, "rb", **dic) as f:
+                    logger.info("Read reference from URL %s", fo)
+                    text = json.load(f)
+                self._process_references(text, template_overrides)
+            else:
+                # Lazy parquet refs
+                logger.info("Open lazy reference dict from URL %s", fo)
+                self.references = LazyReferenceMapper(
+                    fo2,
+                    fs=ref_fs,
+                    cache_size=cache_size,
+                )
+        else:
+            # dictionaries
+            self._process_references(fo, template_overrides)
+        if isinstance(fs, dict):
+            self.fss = {
+                k: (
+                    fsspec.filesystem(k.split(":", 1)[0], **opts)
+                    if isinstance(opts, dict)
+                    else opts
+                )
+                for k, opts in fs.items()
+            }
+            if None not in self.fss:
+                self.fss[None] = filesystem("file")
+            return
+        if fs is not None:
+            # single remote FS
+            remote_protocol = (
+                fs.protocol[0] if isinstance(fs.protocol, tuple) else fs.protocol
+            )
+            self.fss[remote_protocol] = fs
+
+        if remote_protocol is None:
+            # get single protocol from any templates
+            for ref in self.templates.values():
+                if callable(ref):
+                    ref = ref()
+                protocol, _ = fsspec.core.split_protocol(ref)
+                if protocol and protocol not in self.fss:
+                    fs = filesystem(protocol, **(remote_options or {}))
+                    self.fss[protocol] = fs
+        if remote_protocol is None:
+            # get single protocol from references
+            # TODO: warning here, since this can be very expensive?
+            for ref in self.references.values():
+                if callable(ref):
+                    ref = ref()
+                if isinstance(ref, list) and ref[0]:
+                    protocol, _ = fsspec.core.split_protocol(ref[0])
+                    if protocol not in self.fss:
+                        fs = filesystem(protocol, **(remote_options or {}))
+                        self.fss[protocol] = fs
+                        # only use first remote URL
+                        break
+
+        if remote_protocol and remote_protocol not in self.fss:
+            fs = filesystem(remote_protocol, **(remote_options or {}))
+            self.fss[remote_protocol] = fs
+
+        self.fss[None] = fs or filesystem("file")  # default one
+        # Wrap any non-async filesystems to ensure async methods are available below
+        for k, f in self.fss.items():
+            if not f.async_impl:
+                self.fss[k] = AsyncFileSystemWrapper(f)
+
+    def _cat_common(self, path, start=None, end=None):
+        path = self._strip_protocol(path)
+        logger.debug(f"cat: {path}")
+        try:
+            part = self.references[path]
+        except KeyError as exc:
+            raise FileNotFoundError(path) from exc
+        if isinstance(part, str):
+            part = part.encode()
+        if isinstance(part, bytes):
+            logger.debug(f"Reference: {path}, type bytes")
+            if part.startswith(b"base64:"):
+                part = base64.b64decode(part[7:])
+            return part, None, None
+
+        if len(part) == 1:
+            logger.debug(f"Reference: {path}, whole file => {part}")
+            url = part[0]
+            start1, end1 = start, end
+        else:
+            url, start0, size = part
+            logger.debug(f"Reference: {path} => {url}, offset {start0}, size {size}")
+            end0 = start0 + size
+
+            if start is not None:
+                if start >= 0:
+                    start1 = start0 + start
+                else:
+                    start1 = end0 + start
+            else:
+                start1 = start0
+            if end is not None:
+                if end >= 0:
+                    end1 = start0 + end
+                else:
+                    end1 = end0 + end
+            else:
+                end1 = end0
+        if url is None:
+            url = self.target
+        return url, start1, end1
+
+    async def _cat_file(self, path, start=None, end=None, **kwargs):
+        part_or_url, start0, end0 = self._cat_common(path, start=start, end=end)
+        if isinstance(part_or_url, bytes):
+            return part_or_url[start:end]
+        protocol, _ = split_protocol(part_or_url)
+        try:
+            return await self.fss[protocol]._cat_file(
+                part_or_url, start=start0, end=end0
+            )
+        except Exception as e:
+            raise ReferenceNotReachable(path, part_or_url) from e
+
+    def cat_file(self, path, start=None, end=None, **kwargs):
+        part_or_url, start0, end0 = self._cat_common(path, start=start, end=end)
+        if isinstance(part_or_url, bytes):
+            return part_or_url[start:end]
+        protocol, _ = split_protocol(part_or_url)
+        try:
+            return self.fss[protocol].cat_file(part_or_url, start=start0, end=end0)
+        except Exception as e:
+            raise ReferenceNotReachable(path, part_or_url) from e
+
+    def pipe_file(self, path, value, **_):
+        """Temporarily add binary data or reference as a file"""
+        self.references[path] = value
+
+    async def _get_file(self, rpath, lpath, **kwargs):
+        if self.isdir(rpath):
+            return os.makedirs(lpath, exist_ok=True)
+        data = await self._cat_file(rpath)
+        with open(lpath, "wb") as f:
+            f.write(data)
+
+    def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, **kwargs):
+        if self.isdir(rpath):
+            return os.makedirs(lpath, exist_ok=True)
+        data = self.cat_file(rpath, **kwargs)
+        callback.set_size(len(data))
+        if isfilelike(lpath):
+            lpath.write(data)
+        else:
+            with open(lpath, "wb") as f:
+                f.write(data)
+        callback.absolute_update(len(data))
+
+    def get(self, rpath, lpath, recursive=False, **kwargs):
+        if recursive:
+            # trigger directory build
+            self.ls("")
+        rpath = self.expand_path(rpath, recursive=recursive)
+        fs = fsspec.filesystem("file", auto_mkdir=True)
+        targets = other_paths(rpath, lpath)
+        if recursive:
+            data = self.cat([r for r in rpath if not self.isdir(r)])
+        else:
+            data = self.cat(rpath)
+        for remote, local in zip(rpath, targets):
+            if remote in data:
+                fs.pipe_file(local, data[remote])
+
+    def cat(self, path, recursive=False, on_error="raise", **kwargs):
+        if isinstance(path, str) and recursive:
+            raise NotImplementedError
+        if isinstance(path, list) and (recursive or any("*" in p for p in path)):
+            raise NotImplementedError
+        # TODO: if references is lazy, pre-fetch all paths in batch before access
+        proto_dict = _protocol_groups(path, self.references)
+        out = {}
+        for proto, paths in proto_dict.items():
+            fs = self.fss[proto]
+            urls, starts, ends, valid_paths = [], [], [], []
+            for p in paths:
+                # find references or label not-found. Early exit if any not
+                # found and on_error is "raise"
+                try:
+                    u, s, e = self._cat_common(p)
+                    if not isinstance(u, (bytes, str)):
+                        # nan/None from parquet
+                        continue
+                except FileNotFoundError as err:
+                    if on_error == "raise":
+                        raise
+                    if on_error != "omit":
+                        out[p] = err
+                else:
+                    urls.append(u)
+                    starts.append(s)
+                    ends.append(e)
+                    valid_paths.append(p)
+
+            # process references into form for merging
+            urls2 = []
+            starts2 = []
+            ends2 = []
+            paths2 = []
+            whole_files = set()
+            for u, s, e, p in zip(urls, starts, ends, valid_paths):
+                if isinstance(u, bytes):
+                    # data
+                    out[p] = u
+                elif s is None:
+                    # whole file - limits are None, None, but no further
+                    # entries take for this file
+                    whole_files.add(u)
+                    urls2.append(u)
+                    starts2.append(s)
+                    ends2.append(e)
+                    paths2.append(p)
+            for u, s, e, p in zip(urls, starts, ends, valid_paths):
+                # second run to account for files that are to be loaded whole
+                if s is not None and u not in whole_files:
+                    urls2.append(u)
+                    starts2.append(s)
+                    ends2.append(e)
+                    paths2.append(p)
+
+            # merge and fetch consolidated ranges
+            new_paths, new_starts, new_ends = merge_offset_ranges(
+                list(urls2),
+                list(starts2),
+                list(ends2),
+                sort=True,
+                max_gap=self.max_gap,
+                max_block=self.max_block,
+            )
+            bytes_out = fs.cat_ranges(new_paths, new_starts, new_ends)
+
+            # unbundle from merged bytes - simple approach
+            for u, s, e, p in zip(urls, starts, ends, valid_paths):
+                if p in out:
+                    continue  # was bytes, already handled
+                for np, ns, ne, b in zip(new_paths, new_starts, new_ends, bytes_out):
+                    if np == u and (ns is None or ne is None):
+                        if isinstance(b, Exception):
+                            out[p] = b
+                        else:
+                            out[p] = b[s:e]
+                    elif np == u and s >= ns and e <= ne:
+                        if isinstance(b, Exception):
+                            out[p] = b
+                        else:
+                            out[p] = b[s - ns : (e - ne) or None]
+
+        for k, v in out.copy().items():
+            # these were valid references, but fetch failed, so transform exc
+            if isinstance(v, Exception) and k in self.references:
+                ex = out[k]
+                new_ex = ReferenceNotReachable(k, self.references[k])
+                new_ex.__cause__ = ex
+                if on_error == "raise":
+                    raise new_ex
+                elif on_error != "omit":
+                    out[k] = new_ex
+
+        if len(out) == 1 and isinstance(path, str) and "*" not in path:
+            return _first(out)
+        return out
+
+    def _process_references(self, references, template_overrides=None):
+        vers = references.get("version", None)
+        if vers is None:
+            self._process_references0(references)
+        elif vers == 1:
+            self._process_references1(references, template_overrides=template_overrides)
+        else:
+            raise ValueError(f"Unknown reference spec version: {vers}")
+        # TODO: we make dircache by iterating over all entries, but for Spec >= 1,
+        #  can replace with programmatic. Is it even needed for mapper interface?
+
+    def _process_references0(self, references):
+        """Make reference dict for Spec Version 0"""
+        if isinstance(references, dict):
+            # do not do this for lazy/parquet backend, which will not make dicts,
+            # but must remain writable in the original object
+            references = {
+                key: json.dumps(val) if isinstance(val, dict) else val
+                for key, val in references.items()
+            }
+        self.references = references
+
+    def _process_references1(self, references, template_overrides=None):
+        if not self.simple_templates or self.templates:
+            import jinja2
+        self.references = {}
+        self._process_templates(references.get("templates", {}))
+
+        @lru_cache(1000)
+        def _render_jinja(u):
+            return jinja2.Template(u).render(**self.templates)
+
+        for k, v in references.get("refs", {}).items():
+            if isinstance(v, str):
+                if v.startswith("base64:"):
+                    self.references[k] = base64.b64decode(v[7:])
+                self.references[k] = v
+            elif isinstance(v, dict):
+                self.references[k] = json.dumps(v)
+            elif self.templates:
+                u = v[0]
+                if "{{" in u:
+                    if self.simple_templates:
+                        u = (
+                            u.replace("{{", "{")
+                            .replace("}}", "}")
+                            .format(**self.templates)
+                        )
+                    else:
+                        u = _render_jinja(u)
+                self.references[k] = [u] if len(v) == 1 else [u, v[1], v[2]]
+            else:
+                self.references[k] = v
+        self.references.update(self._process_gen(references.get("gen", [])))
+
+    def _process_templates(self, tmp):
+        self.templates = {}
+        if self.template_overrides is not None:
+            tmp.update(self.template_overrides)
+        for k, v in tmp.items():
+            if "{{" in v:
+                import jinja2
+
+                self.templates[k] = lambda temp=v, **kwargs: jinja2.Template(
+                    temp
+                ).render(**kwargs)
+            else:
+                self.templates[k] = v
+
+    def _process_gen(self, gens):
+        out = {}
+        for gen in gens:
+            dimension = {
+                k: (
+                    v
+                    if isinstance(v, list)
+                    else range(v.get("start", 0), v["stop"], v.get("step", 1))
+                )
+                for k, v in gen["dimensions"].items()
+            }
+            products = (
+                dict(zip(dimension.keys(), values))
+                for values in itertools.product(*dimension.values())
+            )
+            for pr in products:
+                import jinja2
+
+                key = jinja2.Template(gen["key"]).render(**pr, **self.templates)
+                url = jinja2.Template(gen["url"]).render(**pr, **self.templates)
+                if ("offset" in gen) and ("length" in gen):
+                    offset = int(
+                        jinja2.Template(gen["offset"]).render(**pr, **self.templates)
+                    )
+                    length = int(
+                        jinja2.Template(gen["length"]).render(**pr, **self.templates)
+                    )
+                    out[key] = [url, offset, length]
+                elif ("offset" in gen) ^ ("length" in gen):
+                    raise ValueError(
+                        "Both 'offset' and 'length' are required for a "
+                        "reference generator entry if either is provided."
+                    )
+                else:
+                    out[key] = [url]
+        return out
+
+    def _dircache_from_items(self):
+        self.dircache = {"": []}
+        it = self.references.items()
+        for path, part in it:
+            if isinstance(part, (bytes, str)):
+                size = len(part)
+            elif len(part) == 1:
+                size = None
+            else:
+                _, _, size = part
+            par = path.rsplit("/", 1)[0] if "/" in path else ""
+            par0 = par
+            subdirs = [par0]
+            while par0 and par0 not in self.dircache:
+                # collect parent directories
+                par0 = self._parent(par0)
+                subdirs.append(par0)
+
+            subdirs.reverse()
+            for parent, child in zip(subdirs, subdirs[1:]):
+                # register newly discovered directories
+                assert child not in self.dircache
+                assert parent in self.dircache
+                self.dircache[parent].append(
+                    {"name": child, "type": "directory", "size": 0}
+                )
+                self.dircache[child] = []
+
+            self.dircache[par].append({"name": path, "type": "file", "size": size})
+
+    def _open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs):
+        data = self.cat_file(path)  # load whole chunk into memory
+        return io.BytesIO(data)
+
+    def ls(self, path, detail=True, **kwargs):
+        path = self._strip_protocol(path)
+        if isinstance(self.references, LazyReferenceMapper):
+            try:
+                return self.references.ls(path, detail)
+            except KeyError:
+                pass
+            raise FileNotFoundError(f"'{path}' is not a known key")
+        if not self.dircache:
+            self._dircache_from_items()
+        out = self._ls_from_cache(path)
+        if out is None:
+            raise FileNotFoundError(path)
+        if detail:
+            return out
+        return [o["name"] for o in out]
+
+    def exists(self, path, **kwargs):  # overwrite auto-sync version
+        return self.isdir(path) or self.isfile(path)
+
+    def isdir(self, path):  # overwrite auto-sync version
+        if self.dircache:
+            return path in self.dircache
+        elif isinstance(self.references, LazyReferenceMapper):
+            return path in self.references.listdir()
+        else:
+            # this may be faster than building dircache for single calls, but
+            # by looping will be slow for many calls; could cache it?
+            return any(_.startswith(f"{path}/") for _ in self.references)
+
+    def isfile(self, path):  # overwrite auto-sync version
+        return path in self.references
+
+    async def _ls(self, path, detail=True, **kwargs):  # calls fast sync code
+        return self.ls(path, detail, **kwargs)
+
+    def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
+        if withdirs:
+            return super().find(
+                path, maxdepth=maxdepth, withdirs=withdirs, detail=detail, **kwargs
+            )
+        if path:
+            path = self._strip_protocol(path)
+            r = sorted(k for k in self.references if k.startswith(path))
+        else:
+            r = sorted(self.references)
+        if detail:
+            if not self.dircache:
+                self._dircache_from_items()
+            return {k: self._ls_from_cache(k)[0] for k in r}
+        else:
+            return r
+
+    def info(self, path, **kwargs):
+        out = self.references.get(path)
+        if out is not None:
+            if isinstance(out, (str, bytes)):
+                # decode base64 here
+                return {"name": path, "type": "file", "size": len(out)}
+            elif len(out) > 1:
+                return {"name": path, "type": "file", "size": out[2]}
+            else:
+                out0 = [{"name": path, "type": "file", "size": None}]
+        else:
+            out = self.ls(path, True)
+            out0 = [o for o in out if o["name"] == path]
+            if not out0:
+                return {"name": path, "type": "directory", "size": 0}
+        if out0[0]["size"] is None:
+            # if this is a whole remote file, update size using remote FS
+            prot, _ = split_protocol(self.references[path][0])
+            out0[0]["size"] = self.fss[prot].size(self.references[path][0])
+        return out0[0]
+
+    async def _info(self, path, **kwargs):  # calls fast sync code
+        return self.info(path)
+
+    async def _rm_file(self, path, **kwargs):
+        self.references.pop(
+            path, None
+        )  # ignores FileNotFound, just as well for directories
+        self.dircache.clear()  # this is a bit heavy handed
+
+    async def _pipe_file(self, path, data, mode="overwrite", **kwargs):
+        if mode == "create" and self.exists(path):
+            raise FileExistsError
+        # can be str or bytes
+        self.references[path] = data
+        self.dircache.clear()  # this is a bit heavy handed
+
+    async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
+        # puts binary
+        if mode == "create" and self.exists(rpath):
+            raise FileExistsError
+        with open(lpath, "rb") as f:
+            self.references[rpath] = f.read()
+        self.dircache.clear()  # this is a bit heavy handed
+
+    def save_json(self, url, **storage_options):
+        """Write modified references into new location"""
+        out = {}
+        for k, v in self.references.items():
+            if isinstance(v, bytes):
+                try:
+                    out[k] = v.decode("ascii")
+                except UnicodeDecodeError:
+                    out[k] = (b"base64:" + base64.b64encode(v)).decode()
+            else:
+                out[k] = v
+        with fsspec.open(url, "wb", **storage_options) as f:
+            f.write(json.dumps({"version": 1, "refs": out}).encode())