diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/core.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/core.py | 743 |
1 files changed, 743 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/core.py b/.venv/lib/python3.12/site-packages/fsspec/core.py new file mode 100644 index 00000000..1d3c04c6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/core.py @@ -0,0 +1,743 @@ +from __future__ import annotations + +import io +import logging +import os +import re +from glob import has_magic +from pathlib import Path + +# for backwards compat, we export cache things from here too +from fsspec.caching import ( # noqa: F401 + BaseCache, + BlockCache, + BytesCache, + MMapCache, + ReadAheadCache, + caches, +) +from fsspec.compression import compr +from fsspec.config import conf +from fsspec.registry import filesystem, get_filesystem_class +from fsspec.utils import ( + _unstrip_protocol, + build_name_function, + infer_compression, + stringify_path, +) + +logger = logging.getLogger("fsspec") + + +class OpenFile: + """ + File-like object to be used in a context + + Can layer (buffered) text-mode and compression over any file-system, which + are typically binary-only. + + These instances are safe to serialize, as the low-level file object + is not created until invoked using ``with``. + + Parameters + ---------- + fs: FileSystem + The file system to use for opening the file. Should be a subclass or duck-type + with ``fsspec.spec.AbstractFileSystem`` + path: str + Location to open + mode: str like 'rb', optional + Mode of the opened file + compression: str or None, optional + Compression to apply + encoding: str or None, optional + The encoding to use if opened in text mode. + errors: str or None, optional + How to handle encoding errors if opened in text mode. + newline: None or str + Passed to TextIOWrapper in text mode, how to handle line endings. + autoopen: bool + If True, calls open() immediately. Mostly used by pickle + pos: int + If given and autoopen is True, seek to this location immediately + """ + + def __init__( + self, + fs, + path, + mode="rb", + compression=None, + encoding=None, + errors=None, + newline=None, + ): + self.fs = fs + self.path = path + self.mode = mode + self.compression = get_compression(path, compression) + self.encoding = encoding + self.errors = errors + self.newline = newline + self.fobjects = [] + + def __reduce__(self): + return ( + OpenFile, + ( + self.fs, + self.path, + self.mode, + self.compression, + self.encoding, + self.errors, + self.newline, + ), + ) + + def __repr__(self): + return f"<OpenFile '{self.path}'>" + + def __enter__(self): + mode = self.mode.replace("t", "").replace("b", "") + "b" + + try: + f = self.fs.open(self.path, mode=mode) + except FileNotFoundError as e: + if has_magic(self.path): + raise FileNotFoundError( + "%s not found. The URL contains glob characters: you maybe needed\n" + "to pass expand=True in fsspec.open() or the storage_options of \n" + "your library. You can also set the config value 'open_expand'\n" + "before import, or fsspec.core.DEFAULT_EXPAND at runtime, to True.", + self.path, + ) from e + raise + + self.fobjects = [f] + + if self.compression is not None: + compress = compr[self.compression] + f = compress(f, mode=mode[0]) + self.fobjects.append(f) + + if "b" not in self.mode: + # assume, for example, that 'r' is equivalent to 'rt' as in builtin + f = PickleableTextIOWrapper( + f, encoding=self.encoding, errors=self.errors, newline=self.newline + ) + self.fobjects.append(f) + + return self.fobjects[-1] + + def __exit__(self, *args): + self.close() + + @property + def full_name(self): + return _unstrip_protocol(self.path, self.fs) + + def open(self): + """Materialise this as a real open file without context + + The OpenFile object should be explicitly closed to avoid enclosed file + instances persisting. You must, therefore, keep a reference to the OpenFile + during the life of the file-like it generates. + """ + return self.__enter__() + + def close(self): + """Close all encapsulated file objects""" + for f in reversed(self.fobjects): + if "r" not in self.mode and not f.closed: + f.flush() + f.close() + self.fobjects.clear() + + +class OpenFiles(list): + """List of OpenFile instances + + Can be used in a single context, which opens and closes all of the + contained files. Normal list access to get the elements works as + normal. + + A special case is made for caching filesystems - the files will + be down/uploaded together at the start or end of the context, and + this may happen concurrently, if the target filesystem supports it. + """ + + def __init__(self, *args, mode="rb", fs=None): + self.mode = mode + self.fs = fs + self.files = [] + super().__init__(*args) + + def __enter__(self): + if self.fs is None: + raise ValueError("Context has already been used") + + fs = self.fs + while True: + if hasattr(fs, "open_many"): + # check for concurrent cache download; or set up for upload + self.files = fs.open_many(self) + return self.files + if hasattr(fs, "fs") and fs.fs is not None: + fs = fs.fs + else: + break + return [s.__enter__() for s in self] + + def __exit__(self, *args): + fs = self.fs + [s.__exit__(*args) for s in self] + if "r" not in self.mode: + while True: + if hasattr(fs, "open_many"): + # check for concurrent cache upload + fs.commit_many(self.files) + return + if hasattr(fs, "fs") and fs.fs is not None: + fs = fs.fs + else: + break + + def __getitem__(self, item): + out = super().__getitem__(item) + if isinstance(item, slice): + return OpenFiles(out, mode=self.mode, fs=self.fs) + return out + + def __repr__(self): + return f"<List of {len(self)} OpenFile instances>" + + +def open_files( + urlpath, + mode="rb", + compression=None, + encoding="utf8", + errors=None, + name_function=None, + num=1, + protocol=None, + newline=None, + auto_mkdir=True, + expand=True, + **kwargs, +): + """Given a path or paths, return a list of ``OpenFile`` objects. + + For writing, a str path must contain the "*" character, which will be filled + in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2. + + For either reading or writing, can instead provide explicit list of paths. + + Parameters + ---------- + urlpath: string or list + Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` + to read from alternative filesystems. To read from multiple files you + can pass a globstring or a list of paths, with the caveat that they + must all have the same protocol. + mode: 'rb', 'wt', etc. + compression: string or None + If given, open file using compression codec. Can either be a compression + name (a key in ``fsspec.compression.compr``) or "infer" to guess the + compression from the filename suffix. + encoding: str + For text mode only + errors: None or str + Passed to TextIOWrapper in text mode + name_function: function or None + if opening a set of files for writing, those files do not yet exist, + so we need to generate their names by formatting the urlpath for + each sequence number + num: int [1] + if writing mode, number of files we expect to create (passed to + name+function) + protocol: str or None + If given, overrides the protocol found in the URL. + newline: bytes or None + Used for line terminator in text mode. If None, uses system default; + if blank, uses no translation. + auto_mkdir: bool (True) + If in write mode, this will ensure the target directory exists before + writing, by calling ``fs.mkdirs(exist_ok=True)``. + expand: bool + **kwargs: dict + Extra options that make sense to a particular storage connection, e.g. + host, port, username, password, etc. + + Examples + -------- + >>> files = open_files('2015-*-*.csv') # doctest: +SKIP + >>> files = open_files( + ... 's3://bucket/2015-*-*.csv.gz', compression='gzip' + ... ) # doctest: +SKIP + + Returns + ------- + An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can + be used as a single context + + Notes + ----- + For a full list of the available protocols and the implementations that + they map across to see the latest online documentation: + + - For implementations built into ``fsspec`` see + https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations + - For implementations in separate packages see + https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations + """ + fs, fs_token, paths = get_fs_token_paths( + urlpath, + mode, + num=num, + name_function=name_function, + storage_options=kwargs, + protocol=protocol, + expand=expand, + ) + if fs.protocol == "file": + fs.auto_mkdir = auto_mkdir + elif "r" not in mode and auto_mkdir: + parents = {fs._parent(path) for path in paths} + for parent in parents: + try: + fs.makedirs(parent, exist_ok=True) + except PermissionError: + pass + return OpenFiles( + [ + OpenFile( + fs, + path, + mode=mode, + compression=compression, + encoding=encoding, + errors=errors, + newline=newline, + ) + for path in paths + ], + mode=mode, + fs=fs, + ) + + +def _un_chain(path, kwargs): + # Avoid a circular import + from fsspec.implementations.cached import CachingFileSystem + + if "::" in path: + x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word + bits = [] + for p in path.split("::"): + if "://" in p or x.match(p): + bits.append(p) + else: + bits.append(p + "://") + else: + bits = [path] + # [[url, protocol, kwargs], ...] + out = [] + previous_bit = None + kwargs = kwargs.copy() + for bit in reversed(bits): + protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file" + cls = get_filesystem_class(protocol) + extra_kwargs = cls._get_kwargs_from_urls(bit) + kws = kwargs.pop(protocol, {}) + if bit is bits[0]: + kws.update(kwargs) + kw = dict( + **{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]}, + **kws, + ) + bit = cls._strip_protocol(bit) + if "target_protocol" not in kw and issubclass(cls, CachingFileSystem): + bit = previous_bit + out.append((bit, protocol, kw)) + previous_bit = bit + out.reverse() + return out + + +def url_to_fs(url, **kwargs): + """ + Turn fully-qualified and potentially chained URL into filesystem instance + + Parameters + ---------- + url : str + The fsspec-compatible URL + **kwargs: dict + Extra options that make sense to a particular storage connection, e.g. + host, port, username, password, etc. + + Returns + ------- + filesystem : FileSystem + The new filesystem discovered from ``url`` and created with + ``**kwargs``. + urlpath : str + The file-systems-specific URL for ``url``. + """ + url = stringify_path(url) + # non-FS arguments that appear in fsspec.open() + # inspect could keep this in sync with open()'s signature + known_kwargs = { + "compression", + "encoding", + "errors", + "expand", + "mode", + "name_function", + "newline", + "num", + } + kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs} + chain = _un_chain(url, kwargs) + inkwargs = {} + # Reverse iterate the chain, creating a nested target_* structure + for i, ch in enumerate(reversed(chain)): + urls, protocol, kw = ch + if i == len(chain) - 1: + inkwargs = dict(**kw, **inkwargs) + continue + inkwargs["target_options"] = dict(**kw, **inkwargs) + inkwargs["target_protocol"] = protocol + inkwargs["fo"] = urls + urlpath, protocol, _ = chain[0] + fs = filesystem(protocol, **inkwargs) + return fs, urlpath + + +DEFAULT_EXPAND = conf.get("open_expand", False) + + +def open( + urlpath, + mode="rb", + compression=None, + encoding="utf8", + errors=None, + protocol=None, + newline=None, + expand=None, + **kwargs, +): + """Given a path or paths, return one ``OpenFile`` object. + + Parameters + ---------- + urlpath: string or list + Absolute or relative filepath. Prefix with a protocol like ``s3://`` + to read from alternative filesystems. Should not include glob + character(s). + mode: 'rb', 'wt', etc. + compression: string or None + If given, open file using compression codec. Can either be a compression + name (a key in ``fsspec.compression.compr``) or "infer" to guess the + compression from the filename suffix. + encoding: str + For text mode only + errors: None or str + Passed to TextIOWrapper in text mode + protocol: str or None + If given, overrides the protocol found in the URL. + newline: bytes or None + Used for line terminator in text mode. If None, uses system default; + if blank, uses no translation. + expand: bool or Nonw + Whether to regard file paths containing special glob characters as needing + expansion (finding the first match) or absolute. Setting False allows using + paths which do embed such characters. If None (default), this argument + takes its value from the DEFAULT_EXPAND module variable, which takes + its initial value from the "open_expand" config value at startup, which will + be False if not set. + **kwargs: dict + Extra options that make sense to a particular storage connection, e.g. + host, port, username, password, etc. + + Examples + -------- + >>> openfile = open('2015-01-01.csv') # doctest: +SKIP + >>> openfile = open( + ... 's3://bucket/2015-01-01.csv.gz', compression='gzip' + ... ) # doctest: +SKIP + >>> with openfile as f: + ... df = pd.read_csv(f) # doctest: +SKIP + ... + + Returns + ------- + ``OpenFile`` object. + + Notes + ----- + For a full list of the available protocols and the implementations that + they map across to see the latest online documentation: + + - For implementations built into ``fsspec`` see + https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations + - For implementations in separate packages see + https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations + """ + expand = DEFAULT_EXPAND if expand is None else expand + out = open_files( + urlpath=[urlpath], + mode=mode, + compression=compression, + encoding=encoding, + errors=errors, + protocol=protocol, + newline=newline, + expand=expand, + **kwargs, + ) + if not out: + raise FileNotFoundError(urlpath) + return out[0] + + +def open_local( + url: str | list[str] | Path | list[Path], + mode: str = "rb", + **storage_options: dict, +) -> str | list[str]: + """Open file(s) which can be resolved to local + + For files which either are local, or get downloaded upon open + (e.g., by file caching) + + Parameters + ---------- + url: str or list(str) + mode: str + Must be read mode + storage_options: + passed on to FS for or used by open_files (e.g., compression) + """ + if "r" not in mode: + raise ValueError("Can only ensure local files when reading") + of = open_files(url, mode=mode, **storage_options) + if not getattr(of[0].fs, "local_file", False): + raise ValueError( + "open_local can only be used on a filesystem which" + " has attribute local_file=True" + ) + with of as files: + paths = [f.name for f in files] + if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path): + return paths[0] + return paths + + +def get_compression(urlpath, compression): + if compression == "infer": + compression = infer_compression(urlpath) + if compression is not None and compression not in compr: + raise ValueError(f"Compression type {compression} not supported") + return compression + + +def split_protocol(urlpath): + """Return protocol, path pair""" + urlpath = stringify_path(urlpath) + if "://" in urlpath: + protocol, path = urlpath.split("://", 1) + if len(protocol) > 1: + # excludes Windows paths + return protocol, path + if urlpath.startswith("data:"): + return urlpath.split(":", 1) + return None, urlpath + + +def strip_protocol(urlpath): + """Return only path part of full URL, according to appropriate backend""" + protocol, _ = split_protocol(urlpath) + cls = get_filesystem_class(protocol) + return cls._strip_protocol(urlpath) + + +def expand_paths_if_needed(paths, mode, num, fs, name_function): + """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]`` + in them (read mode). + + :param paths: list of paths + mode: str + Mode in which to open files. + num: int + If opening in writing mode, number of files we expect to create. + fs: filesystem object + name_function: callable + If opening in writing mode, this callable is used to generate path + names. Names are generated for each partition by + ``urlpath.replace('*', name_function(partition_index))``. + :return: list of paths + """ + expanded_paths = [] + paths = list(paths) + + if "w" in mode: # read mode + if sum(1 for p in paths if "*" in p) > 1: + raise ValueError( + "When writing data, only one filename mask can be specified." + ) + num = max(num, len(paths)) + + for curr_path in paths: + if "*" in curr_path: + # expand using name_function + expanded_paths.extend(_expand_paths(curr_path, name_function, num)) + else: + expanded_paths.append(curr_path) + # if we generated more paths that asked for, trim the list + if len(expanded_paths) > num: + expanded_paths = expanded_paths[:num] + + else: # read mode + for curr_path in paths: + if has_magic(curr_path): + # expand using glob + expanded_paths.extend(fs.glob(curr_path)) + else: + expanded_paths.append(curr_path) + + return expanded_paths + + +def get_fs_token_paths( + urlpath, + mode="rb", + num=1, + name_function=None, + storage_options=None, + protocol=None, + expand=True, +): + """Filesystem, deterministic token, and paths from a urlpath and options. + + Parameters + ---------- + urlpath: string or iterable + Absolute or relative filepath, URL (may include protocols like + ``s3://``), or globstring pointing to data. + mode: str, optional + Mode in which to open files. + num: int, optional + If opening in writing mode, number of files we expect to create. + name_function: callable, optional + If opening in writing mode, this callable is used to generate path + names. Names are generated for each partition by + ``urlpath.replace('*', name_function(partition_index))``. + storage_options: dict, optional + Additional keywords to pass to the filesystem class. + protocol: str or None + To override the protocol specifier in the URL + expand: bool + Expand string paths for writing, assuming the path is a directory + """ + if isinstance(urlpath, (list, tuple, set)): + if not urlpath: + raise ValueError("empty urlpath sequence") + urlpath0 = stringify_path(next(iter(urlpath))) + else: + urlpath0 = stringify_path(urlpath) + storage_options = storage_options or {} + if protocol: + storage_options["protocol"] = protocol + chain = _un_chain(urlpath0, storage_options or {}) + inkwargs = {} + # Reverse iterate the chain, creating a nested target_* structure + for i, ch in enumerate(reversed(chain)): + urls, nested_protocol, kw = ch + if i == len(chain) - 1: + inkwargs = dict(**kw, **inkwargs) + continue + inkwargs["target_options"] = dict(**kw, **inkwargs) + inkwargs["target_protocol"] = nested_protocol + inkwargs["fo"] = urls + paths, protocol, _ = chain[0] + fs = filesystem(protocol, **inkwargs) + if isinstance(urlpath, (list, tuple, set)): + pchains = [ + _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath + ] + if len({pc[1] for pc in pchains}) > 1: + raise ValueError("Protocol mismatch getting fs from %s", urlpath) + paths = [pc[0] for pc in pchains] + else: + paths = fs._strip_protocol(paths) + if isinstance(paths, (list, tuple, set)): + if expand: + paths = expand_paths_if_needed(paths, mode, num, fs, name_function) + elif not isinstance(paths, list): + paths = list(paths) + else: + if ("w" in mode or "x" in mode) and expand: + paths = _expand_paths(paths, name_function, num) + elif "*" in paths: + paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)] + else: + paths = [paths] + + return fs, fs._fs_token, paths + + +def _expand_paths(path, name_function, num): + if isinstance(path, str): + if path.count("*") > 1: + raise ValueError("Output path spec must contain exactly one '*'.") + elif "*" not in path: + path = os.path.join(path, "*.part") + + if name_function is None: + name_function = build_name_function(num - 1) + + paths = [path.replace("*", name_function(i)) for i in range(num)] + if paths != sorted(paths): + logger.warning( + "In order to preserve order between partitions" + " paths created with ``name_function`` should " + "sort to partition order" + ) + elif isinstance(path, (tuple, list)): + assert len(path) == num + paths = list(path) + else: + raise ValueError( + "Path should be either\n" + "1. A list of paths: ['foo.json', 'bar.json', ...]\n" + "2. A directory: 'foo/\n" + "3. A path with a '*' in it: 'foo.*.json'" + ) + return paths + + +class PickleableTextIOWrapper(io.TextIOWrapper): + """TextIOWrapper cannot be pickled. This solves it. + + Requires that ``buffer`` be pickleable, which all instances of + AbstractBufferedFile are. + """ + + def __init__( + self, + buffer, + encoding=None, + errors=None, + newline=None, + line_buffering=False, + write_through=False, + ): + self.args = buffer, encoding, errors, newline, line_buffering, write_through + super().__init__(*self.args) + + def __reduce__(self): + return PickleableTextIOWrapper, self.args |