about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py232
1 files changed, 232 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py
new file mode 100644
index 00000000..bd9b5cdd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py
@@ -0,0 +1,232 @@
+from __future__ import annotations
+
+import os
+import pickle
+import time
+from typing import TYPE_CHECKING
+
+from fsspec.utils import atomic_write
+
+try:
+    import ujson as json
+except ImportError:
+    if not TYPE_CHECKING:
+        import json
+
+if TYPE_CHECKING:
+    from typing import Any, Dict, Iterator, Literal
+
+    from typing_extensions import TypeAlias
+
+    from .cached import CachingFileSystem
+
+    Detail: TypeAlias = Dict[str, Any]
+
+
+class CacheMetadata:
+    """Cache metadata.
+
+    All reading and writing of cache metadata is performed by this class,
+    accessing the cached files and blocks is not.
+
+    Metadata is stored in a single file per storage directory in JSON format.
+    For backward compatibility, also reads metadata stored in pickle format
+    which is converted to JSON when next saved.
+    """
+
+    def __init__(self, storage: list[str]):
+        """
+
+        Parameters
+        ----------
+        storage: list[str]
+            Directories containing cached files, must be at least one. Metadata
+            is stored in the last of these directories by convention.
+        """
+        if not storage:
+            raise ValueError("CacheMetadata expects at least one storage location")
+
+        self._storage = storage
+        self.cached_files: list[Detail] = [{}]
+
+        # Private attribute to force saving of metadata in pickle format rather than
+        # JSON for use in tests to confirm can read both pickle and JSON formats.
+        self._force_save_pickle = False
+
+    def _load(self, fn: str) -> Detail:
+        """Low-level function to load metadata from specific file"""
+        try:
+            with open(fn, "r") as f:
+                loaded = json.load(f)
+        except ValueError:
+            with open(fn, "rb") as f:
+                loaded = pickle.load(f)
+        for c in loaded.values():
+            if isinstance(c.get("blocks"), list):
+                c["blocks"] = set(c["blocks"])
+        return loaded
+
+    def _save(self, metadata_to_save: Detail, fn: str) -> None:
+        """Low-level function to save metadata to specific file"""
+        if self._force_save_pickle:
+            with atomic_write(fn) as f:
+                pickle.dump(metadata_to_save, f)
+        else:
+            with atomic_write(fn, mode="w") as f:
+                json.dump(metadata_to_save, f)
+
+    def _scan_locations(
+        self, writable_only: bool = False
+    ) -> Iterator[tuple[str, str, bool]]:
+        """Yield locations (filenames) where metadata is stored, and whether
+        writable or not.
+
+        Parameters
+        ----------
+        writable: bool
+            Set to True to only yield writable locations.
+
+        Returns
+        -------
+        Yields (str, str, bool)
+        """
+        n = len(self._storage)
+        for i, storage in enumerate(self._storage):
+            writable = i == n - 1
+            if writable_only and not writable:
+                continue
+            yield os.path.join(storage, "cache"), storage, writable
+
+    def check_file(
+        self, path: str, cfs: CachingFileSystem | None
+    ) -> Literal[False] | tuple[Detail, str]:
+        """If path is in cache return its details, otherwise return ``False``.
+
+        If the optional CachingFileSystem is specified then it is used to
+        perform extra checks to reject possible matches, such as if they are
+        too old.
+        """
+        for (fn, base, _), cache in zip(self._scan_locations(), self.cached_files):
+            if path not in cache:
+                continue
+            detail = cache[path].copy()
+
+            if cfs is not None:
+                if cfs.check_files and detail["uid"] != cfs.fs.ukey(path):
+                    # Wrong file as determined by hash of file properties
+                    continue
+                if cfs.expiry and time.time() - detail["time"] > cfs.expiry:
+                    # Cached file has expired
+                    continue
+
+            fn = os.path.join(base, detail["fn"])
+            if os.path.exists(fn):
+                return detail, fn
+        return False
+
+    def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]:
+        """Remove expired metadata from the cache.
+
+        Returns names of files corresponding to expired metadata and a boolean
+        flag indicating whether the writable cache is empty. Caller is
+        responsible for deleting the expired files.
+        """
+        expired_files = []
+        for path, detail in self.cached_files[-1].copy().items():
+            if time.time() - detail["time"] > expiry_time:
+                fn = detail.get("fn", "")
+                if not fn:
+                    raise RuntimeError(
+                        f"Cache metadata does not contain 'fn' for {path}"
+                    )
+                fn = os.path.join(self._storage[-1], fn)
+                expired_files.append(fn)
+                self.cached_files[-1].pop(path)
+
+        if self.cached_files[-1]:
+            cache_path = os.path.join(self._storage[-1], "cache")
+            self._save(self.cached_files[-1], cache_path)
+
+        writable_cache_empty = not self.cached_files[-1]
+        return expired_files, writable_cache_empty
+
+    def load(self) -> None:
+        """Load all metadata from disk and store in ``self.cached_files``"""
+        cached_files = []
+        for fn, _, _ in self._scan_locations():
+            if os.path.exists(fn):
+                # TODO: consolidate blocks here
+                cached_files.append(self._load(fn))
+            else:
+                cached_files.append({})
+        self.cached_files = cached_files or [{}]
+
+    def on_close_cached_file(self, f: Any, path: str) -> None:
+        """Perform side-effect actions on closing a cached file.
+
+        The actual closing of the file is the responsibility of the caller.
+        """
+        # File must be writeble, so in self.cached_files[-1]
+        c = self.cached_files[-1][path]
+        if c["blocks"] is not True and len(c["blocks"]) * f.blocksize >= f.size:
+            c["blocks"] = True
+
+    def pop_file(self, path: str) -> str | None:
+        """Remove metadata of cached file.
+
+        If path is in the cache, return the filename of the cached file,
+        otherwise return ``None``.  Caller is responsible for deleting the
+        cached file.
+        """
+        details = self.check_file(path, None)
+        if not details:
+            return None
+        _, fn = details
+        if fn.startswith(self._storage[-1]):
+            self.cached_files[-1].pop(path)
+            self.save()
+        else:
+            raise PermissionError(
+                "Can only delete cached file in last, writable cache location"
+            )
+        return fn
+
+    def save(self) -> None:
+        """Save metadata to disk"""
+        for (fn, _, writable), cache in zip(self._scan_locations(), self.cached_files):
+            if not writable:
+                continue
+
+            if os.path.exists(fn):
+                cached_files = self._load(fn)
+                for k, c in cached_files.items():
+                    if k in cache:
+                        if c["blocks"] is True or cache[k]["blocks"] is True:
+                            c["blocks"] = True
+                        else:
+                            # self.cached_files[*][*]["blocks"] must continue to
+                            # point to the same set object so that updates
+                            # performed by MMapCache are propagated back to
+                            # self.cached_files.
+                            blocks = cache[k]["blocks"]
+                            blocks.update(c["blocks"])
+                            c["blocks"] = blocks
+                        c["time"] = max(c["time"], cache[k]["time"])
+                        c["uid"] = cache[k]["uid"]
+
+                # Files can be added to cache after it was written once
+                for k, c in cache.items():
+                    if k not in cached_files:
+                        cached_files[k] = c
+            else:
+                cached_files = cache
+            cache = {k: v.copy() for k, v in cached_files.items()}
+            for c in cache.values():
+                if isinstance(c["blocks"], set):
+                    c["blocks"] = list(c["blocks"])
+            self._save(cache, fn)
+            self.cached_files[-1] = cached_files
+
+    def update_file(self, path: str, detail: Detail) -> None:
+        """Update metadata for specific file in memory, do not save"""
+        self.cached_files[-1][path] = detail