diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/aioshutil')
-rw-r--r-- | .venv/lib/python3.12/site-packages/aioshutil/__init__.py | 239 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/aioshutil/py.typed | 0 |
2 files changed, 239 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aioshutil/__init__.py b/.venv/lib/python3.12/site-packages/aioshutil/__init__.py new file mode 100644 index 00000000..5b26feaf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aioshutil/__init__.py @@ -0,0 +1,239 @@ +# -*- coding: utf-8 -*- +""" +Asynchronous shutil module. +""" +from __future__ import annotations + +import asyncio +import shutil +from functools import partial, wraps +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Coroutine, + Optional, + Sequence, + TypeVar, + Union, + overload, +) + +try: + from typing import ParamSpec, TypeAlias # type: ignore +except ImportError: + # Python versions < 3.10 + from typing_extensions import ParamSpec, TypeAlias + +__all__ = [ + "copyfileobj", + "copyfile", + "copymode", + "copystat", + "copy", + "copy2", + "copytree", + "move", + "rmtree", + "Error", + "SpecialFileError", + "ExecError", + "make_archive", + "get_archive_formats", + "register_archive_format", + "unregister_archive_format", + "get_unpack_formats", + "register_unpack_format", + "unregister_unpack_format", + "unpack_archive", + "ignore_patterns", + "chown", + "which", + "get_terminal_size", + "SameFileError", +] + +P = ParamSpec("P") +R = TypeVar("R") + +if TYPE_CHECKING: # pragma: no cover + # type hints for wrapped functions with overloads (which are incompatible + # with ParamSpec). + + import sys + from os import PathLike + + StrPath: TypeAlias = Union[str, PathLike[str]] + BytesPath: TypeAlias = Union[bytes, PathLike[bytes]] + StrOrBytesPath: TypeAlias = Union[str, bytes, PathLike[str], PathLike[bytes]] + _PathReturn: TypeAlias = Any + _StrPathT = TypeVar("_StrPathT", bound=StrPath) + + @overload + async def copy( + src: StrPath, dst: StrPath, *, follow_symlinks: bool = ... + ) -> _PathReturn: + ... + + @overload + async def copy( + src: BytesPath, dst: BytesPath, *, follow_symlinks: bool = ... + ) -> _PathReturn: + ... + + async def copy(src, dst, *, follow_symlinks=...): + ... + + @overload + async def copy2( + src: StrPath, dst: StrPath, *, follow_symlinks: bool = ... + ) -> _PathReturn: + ... + + @overload + async def copy2( + src: BytesPath, dst: BytesPath, *, follow_symlinks: bool = ... + ) -> _PathReturn: + ... + + async def copy2(src, dst, *, follow_symlinks=...): + ... + + @overload + async def register_archive_format( + name: str, + function: Callable[..., object], + extra_args: Sequence[tuple[str, Any] | list[Any]], + description: str = ..., + ) -> None: + ... + + @overload + async def register_archive_format( + name: str, + function: Callable[[str, str], object], + extra_args: None = ..., + description: str = ..., + ) -> None: + ... + + async def register_archive_format(name, function, extra_args=..., description=...): + ... + + @overload + async def register_unpack_format( + name: str, + extensions: list[str], + function: Callable[..., object], + extra_args: Sequence[tuple[str, Any]], + description: str = ..., + ) -> None: + ... + + @overload + async def register_unpack_format( + name: str, + extensions: list[str], + function: Callable[[str, str], object], + extra_args: None = ..., + description: str = ..., + ) -> None: + ... + + async def register_unpack_format( + name, extensions, function, extra_args=..., description=... + ): + ... + + @overload + async def chown( + path: StrOrBytesPath, user: Union[str, int], group: None = ... + ) -> None: + ... + + @overload + async def chown( + path: StrOrBytesPath, user: None = ..., *, group: Union[str, int] + ) -> None: + ... + + @overload + async def chown(path: StrOrBytesPath, user: None, group: Union[str, int]) -> None: + ... + + @overload + async def chown( + path: StrOrBytesPath, user: Union[str, int], group: Union[str, int] + ) -> None: + ... + + async def chown(path, user=..., group=...): + ... + + if sys.version_info >= (3, 8): + + @overload + async def which( + cmd: _StrPathT, mode: int = ..., path: Optional[StrPath] = ... + ) -> Union[str, _StrPathT, None]: + ... + + @overload + async def which( + cmd: bytes, mode: int = ..., path: Optional[StrPath] = ... + ) -> Optional[bytes]: + ... + + async def which( + cmd, mode=..., path=... + ) -> Union[bytes, str, StrPath, PathLike[str], None]: + ... + + else: + + async def which( + cmd: _StrPathT, mode: int = ..., path: StrPath | None = ... + ) -> str | _StrPathT | None: + ... + + +def sync_to_async(func: Callable[P, R]) -> Callable[P, Coroutine[Any, Any, R]]: + @wraps(func) + async def run_in_executor(*args: P.args, **kwargs: P.kwargs) -> R: + loop = asyncio.get_event_loop() + pfunc = partial(func, *args, **kwargs) + return await loop.run_in_executor(None, pfunc) + + return run_in_executor + + +rmtree = sync_to_async(shutil.rmtree) +copyfile = sync_to_async(shutil.copyfile) +copyfileobj = sync_to_async(shutil.copyfileobj) +copymode = sync_to_async(shutil.copymode) +copystat = sync_to_async(shutil.copystat) +copy = sync_to_async(shutil.copy) # type: ignore # noqa: F811 +copy2 = sync_to_async(shutil.copy2) # type: ignore # noqa: F811 +copytree = sync_to_async(shutil.copytree) +move = sync_to_async(shutil.move) +Error = shutil.Error +SpecialFileError = shutil.SpecialFileError +ExecError = shutil.ExecError +make_archive = sync_to_async(shutil.make_archive) +get_archive_formats = sync_to_async(shutil.get_archive_formats) +register_archive_format = sync_to_async(shutil.register_archive_format) # type: ignore # noqa: F811 +unregister_archive_format = sync_to_async(shutil.unregister_archive_format) +get_unpack_formats = sync_to_async(shutil.get_unpack_formats) +register_unpack_format = sync_to_async(shutil.register_unpack_format) # type: ignore # noqa: F811 +unregister_unpack_format = sync_to_async(shutil.unregister_unpack_format) +unpack_archive = sync_to_async(shutil.unpack_archive) +ignore_patterns = sync_to_async(shutil.ignore_patterns) +chown = sync_to_async(shutil.chown) # type: ignore # noqa: F811 +which = sync_to_async(shutil.which) # type: ignore # noqa: F811 +get_terminal_size = sync_to_async(shutil.get_terminal_size) +SameFileError = shutil.SameFileError + + +if "disk_usage" in shutil.__all__: # pragma: no cover + __all__.append("disk_usage") + disk_usage = sync_to_async(shutil.disk_usage) diff --git a/.venv/lib/python3.12/site-packages/aioshutil/py.typed b/.venv/lib/python3.12/site-packages/aioshutil/py.typed new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aioshutil/py.typed |