diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py | 232 |
1 files changed, 232 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py new file mode 100644 index 00000000..bd9b5cdd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/cache_metadata.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +import os +import pickle +import time +from typing import TYPE_CHECKING + +from fsspec.utils import atomic_write + +try: + import ujson as json +except ImportError: + if not TYPE_CHECKING: + import json + +if TYPE_CHECKING: + from typing import Any, Dict, Iterator, Literal + + from typing_extensions import TypeAlias + + from .cached import CachingFileSystem + + Detail: TypeAlias = Dict[str, Any] + + +class CacheMetadata: + """Cache metadata. + + All reading and writing of cache metadata is performed by this class, + accessing the cached files and blocks is not. + + Metadata is stored in a single file per storage directory in JSON format. + For backward compatibility, also reads metadata stored in pickle format + which is converted to JSON when next saved. + """ + + def __init__(self, storage: list[str]): + """ + + Parameters + ---------- + storage: list[str] + Directories containing cached files, must be at least one. Metadata + is stored in the last of these directories by convention. + """ + if not storage: + raise ValueError("CacheMetadata expects at least one storage location") + + self._storage = storage + self.cached_files: list[Detail] = [{}] + + # Private attribute to force saving of metadata in pickle format rather than + # JSON for use in tests to confirm can read both pickle and JSON formats. + self._force_save_pickle = False + + def _load(self, fn: str) -> Detail: + """Low-level function to load metadata from specific file""" + try: + with open(fn, "r") as f: + loaded = json.load(f) + except ValueError: + with open(fn, "rb") as f: + loaded = pickle.load(f) + for c in loaded.values(): + if isinstance(c.get("blocks"), list): + c["blocks"] = set(c["blocks"]) + return loaded + + def _save(self, metadata_to_save: Detail, fn: str) -> None: + """Low-level function to save metadata to specific file""" + if self._force_save_pickle: + with atomic_write(fn) as f: + pickle.dump(metadata_to_save, f) + else: + with atomic_write(fn, mode="w") as f: + json.dump(metadata_to_save, f) + + def _scan_locations( + self, writable_only: bool = False + ) -> Iterator[tuple[str, str, bool]]: + """Yield locations (filenames) where metadata is stored, and whether + writable or not. + + Parameters + ---------- + writable: bool + Set to True to only yield writable locations. + + Returns + ------- + Yields (str, str, bool) + """ + n = len(self._storage) + for i, storage in enumerate(self._storage): + writable = i == n - 1 + if writable_only and not writable: + continue + yield os.path.join(storage, "cache"), storage, writable + + def check_file( + self, path: str, cfs: CachingFileSystem | None + ) -> Literal[False] | tuple[Detail, str]: + """If path is in cache return its details, otherwise return ``False``. + + If the optional CachingFileSystem is specified then it is used to + perform extra checks to reject possible matches, such as if they are + too old. + """ + for (fn, base, _), cache in zip(self._scan_locations(), self.cached_files): + if path not in cache: + continue + detail = cache[path].copy() + + if cfs is not None: + if cfs.check_files and detail["uid"] != cfs.fs.ukey(path): + # Wrong file as determined by hash of file properties + continue + if cfs.expiry and time.time() - detail["time"] > cfs.expiry: + # Cached file has expired + continue + + fn = os.path.join(base, detail["fn"]) + if os.path.exists(fn): + return detail, fn + return False + + def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]: + """Remove expired metadata from the cache. + + Returns names of files corresponding to expired metadata and a boolean + flag indicating whether the writable cache is empty. Caller is + responsible for deleting the expired files. + """ + expired_files = [] + for path, detail in self.cached_files[-1].copy().items(): + if time.time() - detail["time"] > expiry_time: + fn = detail.get("fn", "") + if not fn: + raise RuntimeError( + f"Cache metadata does not contain 'fn' for {path}" + ) + fn = os.path.join(self._storage[-1], fn) + expired_files.append(fn) + self.cached_files[-1].pop(path) + + if self.cached_files[-1]: + cache_path = os.path.join(self._storage[-1], "cache") + self._save(self.cached_files[-1], cache_path) + + writable_cache_empty = not self.cached_files[-1] + return expired_files, writable_cache_empty + + def load(self) -> None: + """Load all metadata from disk and store in ``self.cached_files``""" + cached_files = [] + for fn, _, _ in self._scan_locations(): + if os.path.exists(fn): + # TODO: consolidate blocks here + cached_files.append(self._load(fn)) + else: + cached_files.append({}) + self.cached_files = cached_files or [{}] + + def on_close_cached_file(self, f: Any, path: str) -> None: + """Perform side-effect actions on closing a cached file. + + The actual closing of the file is the responsibility of the caller. + """ + # File must be writeble, so in self.cached_files[-1] + c = self.cached_files[-1][path] + if c["blocks"] is not True and len(c["blocks"]) * f.blocksize >= f.size: + c["blocks"] = True + + def pop_file(self, path: str) -> str | None: + """Remove metadata of cached file. + + If path is in the cache, return the filename of the cached file, + otherwise return ``None``. Caller is responsible for deleting the + cached file. + """ + details = self.check_file(path, None) + if not details: + return None + _, fn = details + if fn.startswith(self._storage[-1]): + self.cached_files[-1].pop(path) + self.save() + else: + raise PermissionError( + "Can only delete cached file in last, writable cache location" + ) + return fn + + def save(self) -> None: + """Save metadata to disk""" + for (fn, _, writable), cache in zip(self._scan_locations(), self.cached_files): + if not writable: + continue + + if os.path.exists(fn): + cached_files = self._load(fn) + for k, c in cached_files.items(): + if k in cache: + if c["blocks"] is True or cache[k]["blocks"] is True: + c["blocks"] = True + else: + # self.cached_files[*][*]["blocks"] must continue to + # point to the same set object so that updates + # performed by MMapCache are propagated back to + # self.cached_files. + blocks = cache[k]["blocks"] + blocks.update(c["blocks"]) + c["blocks"] = blocks + c["time"] = max(c["time"], cache[k]["time"]) + c["uid"] = cache[k]["uid"] + + # Files can be added to cache after it was written once + for k, c in cache.items(): + if k not in cached_files: + cached_files[k] = c + else: + cached_files = cache + cache = {k: v.copy() for k, v in cached_files.items()} + for c in cache.values(): + if isinstance(c["blocks"], set): + c["blocks"] = list(c["blocks"]) + self._save(cache, fn) + self.cached_files[-1] = cached_files + + def update_file(self, path: str, detail: Detail) -> None: + """Update metadata for specific file in memory, do not save""" + self.cached_files[-1][path] = detail |