diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiofiles/tempfile')
| -rw-r--r-- | .venv/lib/python3.12/site-packages/aiofiles/tempfile/__init__.py | 357 | ||||
| -rw-r--r-- | .venv/lib/python3.12/site-packages/aiofiles/tempfile/temptypes.py | 69 |
2 files changed, 426 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiofiles/tempfile/__init__.py b/.venv/lib/python3.12/site-packages/aiofiles/tempfile/__init__.py new file mode 100644 index 00000000..ac3f8bd8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiofiles/tempfile/__init__.py @@ -0,0 +1,357 @@ +import asyncio +from functools import partial, singledispatch +from io import BufferedRandom, BufferedReader, BufferedWriter, FileIO, TextIOBase +from tempfile import NamedTemporaryFile as syncNamedTemporaryFile +from tempfile import SpooledTemporaryFile as syncSpooledTemporaryFile +from tempfile import TemporaryDirectory as syncTemporaryDirectory +from tempfile import TemporaryFile as syncTemporaryFile +from tempfile import _TemporaryFileWrapper as syncTemporaryFileWrapper + +from ..base import AiofilesContextManager +from ..threadpool.binary import AsyncBufferedIOBase, AsyncBufferedReader, AsyncFileIO +from ..threadpool.text import AsyncTextIOWrapper +from .temptypes import AsyncSpooledTemporaryFile, AsyncTemporaryDirectory +import sys + +__all__ = [ + "NamedTemporaryFile", + "TemporaryFile", + "SpooledTemporaryFile", + "TemporaryDirectory", +] + + +# ================================================================ +# Public methods for async open and return of temp file/directory +# objects with async interface +# ================================================================ +if sys.version_info >= (3, 12): + + def NamedTemporaryFile( + mode="w+b", + buffering=-1, + encoding=None, + newline=None, + suffix=None, + prefix=None, + dir=None, + delete=True, + delete_on_close=True, + loop=None, + executor=None, + ): + """Async open a named temporary file""" + return AiofilesContextManager( + _temporary_file( + named=True, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + delete=delete, + delete_on_close=delete_on_close, + loop=loop, + executor=executor, + ) + ) + +else: + + def NamedTemporaryFile( + mode="w+b", + buffering=-1, + encoding=None, + newline=None, + suffix=None, + prefix=None, + dir=None, + delete=True, + loop=None, + executor=None, + ): + """Async open a named temporary file""" + return AiofilesContextManager( + _temporary_file( + named=True, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + delete=delete, + loop=loop, + executor=executor, + ) + ) + + +def TemporaryFile( + mode="w+b", + buffering=-1, + encoding=None, + newline=None, + suffix=None, + prefix=None, + dir=None, + loop=None, + executor=None, +): + """Async open an unnamed temporary file""" + return AiofilesContextManager( + _temporary_file( + named=False, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + loop=loop, + executor=executor, + ) + ) + + +def SpooledTemporaryFile( + max_size=0, + mode="w+b", + buffering=-1, + encoding=None, + newline=None, + suffix=None, + prefix=None, + dir=None, + loop=None, + executor=None, +): + """Async open a spooled temporary file""" + return AiofilesContextManager( + _spooled_temporary_file( + max_size=max_size, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + loop=loop, + executor=executor, + ) + ) + + +def TemporaryDirectory(suffix=None, prefix=None, dir=None, loop=None, executor=None): + """Async open a temporary directory""" + return AiofilesContextManagerTempDir( + _temporary_directory( + suffix=suffix, prefix=prefix, dir=dir, loop=loop, executor=executor + ) + ) + + +# ========================================================= +# Internal coroutines to open new temp files/directories +# ========================================================= +if sys.version_info >= (3, 12): + + async def _temporary_file( + named=True, + mode="w+b", + buffering=-1, + encoding=None, + newline=None, + suffix=None, + prefix=None, + dir=None, + delete=True, + delete_on_close=True, + loop=None, + executor=None, + max_size=0, + ): + """Async method to open a temporary file with async interface""" + if loop is None: + loop = asyncio.get_running_loop() + + if named: + cb = partial( + syncNamedTemporaryFile, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + delete=delete, + delete_on_close=delete_on_close, + ) + else: + cb = partial( + syncTemporaryFile, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + ) + + f = await loop.run_in_executor(executor, cb) + + # Wrap based on type of underlying IO object + if type(f) is syncTemporaryFileWrapper: + # _TemporaryFileWrapper was used (named files) + result = wrap(f.file, f, loop=loop, executor=executor) + result._closer = f._closer + return result + else: + # IO object was returned directly without wrapper + return wrap(f, f, loop=loop, executor=executor) + +else: + + async def _temporary_file( + named=True, + mode="w+b", + buffering=-1, + encoding=None, + newline=None, + suffix=None, + prefix=None, + dir=None, + delete=True, + loop=None, + executor=None, + max_size=0, + ): + """Async method to open a temporary file with async interface""" + if loop is None: + loop = asyncio.get_running_loop() + + if named: + cb = partial( + syncNamedTemporaryFile, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + delete=delete, + ) + else: + cb = partial( + syncTemporaryFile, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + ) + + f = await loop.run_in_executor(executor, cb) + + # Wrap based on type of underlying IO object + if type(f) is syncTemporaryFileWrapper: + # _TemporaryFileWrapper was used (named files) + result = wrap(f.file, f, loop=loop, executor=executor) + # add delete property + result.delete = f.delete + return result + else: + # IO object was returned directly without wrapper + return wrap(f, f, loop=loop, executor=executor) + + +async def _spooled_temporary_file( + max_size=0, + mode="w+b", + buffering=-1, + encoding=None, + newline=None, + suffix=None, + prefix=None, + dir=None, + loop=None, + executor=None, +): + """Open a spooled temporary file with async interface""" + if loop is None: + loop = asyncio.get_running_loop() + + cb = partial( + syncSpooledTemporaryFile, + max_size=max_size, + mode=mode, + buffering=buffering, + encoding=encoding, + newline=newline, + suffix=suffix, + prefix=prefix, + dir=dir, + ) + + f = await loop.run_in_executor(executor, cb) + + # Single interface provided by SpooledTemporaryFile for all modes + return AsyncSpooledTemporaryFile(f, loop=loop, executor=executor) + + +async def _temporary_directory( + suffix=None, prefix=None, dir=None, loop=None, executor=None +): + """Async method to open a temporary directory with async interface""" + if loop is None: + loop = asyncio.get_running_loop() + + cb = partial(syncTemporaryDirectory, suffix, prefix, dir) + f = await loop.run_in_executor(executor, cb) + + return AsyncTemporaryDirectory(f, loop=loop, executor=executor) + + +class AiofilesContextManagerTempDir(AiofilesContextManager): + """With returns the directory location, not the object (matching sync lib)""" + + async def __aenter__(self): + self._obj = await self._coro + return self._obj.name + + +@singledispatch +def wrap(base_io_obj, file, *, loop=None, executor=None): + """Wrap the object with interface based on type of underlying IO""" + raise TypeError("Unsupported IO type: {}".format(base_io_obj)) + + +@wrap.register(TextIOBase) +def _(base_io_obj, file, *, loop=None, executor=None): + return AsyncTextIOWrapper(file, loop=loop, executor=executor) + + +@wrap.register(BufferedWriter) +def _(base_io_obj, file, *, loop=None, executor=None): + return AsyncBufferedIOBase(file, loop=loop, executor=executor) + + +@wrap.register(BufferedReader) +@wrap.register(BufferedRandom) +def _(base_io_obj, file, *, loop=None, executor=None): + return AsyncBufferedReader(file, loop=loop, executor=executor) + + +@wrap.register(FileIO) +def _(base_io_obj, file, *, loop=None, executor=None): + return AsyncFileIO(file, loop=loop, executor=executor) diff --git a/.venv/lib/python3.12/site-packages/aiofiles/tempfile/temptypes.py b/.venv/lib/python3.12/site-packages/aiofiles/tempfile/temptypes.py new file mode 100644 index 00000000..1a1b1a88 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiofiles/tempfile/temptypes.py @@ -0,0 +1,69 @@ +"""Async wrappers for spooled temp files and temp directory objects""" +from functools import partial + +from ..base import AsyncBase +from ..threadpool.utils import ( + cond_delegate_to_executor, + delegate_to_executor, + proxy_property_directly, +) + + +@delegate_to_executor("fileno", "rollover") +@cond_delegate_to_executor( + "close", + "flush", + "isatty", + "read", + "readline", + "readlines", + "seek", + "tell", + "truncate", +) +@proxy_property_directly("closed", "encoding", "mode", "name", "newlines") +class AsyncSpooledTemporaryFile(AsyncBase): + """Async wrapper for SpooledTemporaryFile class""" + + async def _check(self): + if self._file._rolled: + return + max_size = self._file._max_size + if max_size and self._file.tell() > max_size: + await self.rollover() + + async def write(self, s): + """Implementation to anticipate rollover""" + if self._file._rolled: + cb = partial(self._file.write, s) + return await self._loop.run_in_executor(self._executor, cb) + else: + file = self._file._file # reference underlying base IO object + rv = file.write(s) + await self._check() + return rv + + async def writelines(self, iterable): + """Implementation to anticipate rollover""" + if self._file._rolled: + cb = partial(self._file.writelines, iterable) + return await self._loop.run_in_executor(self._executor, cb) + else: + file = self._file._file # reference underlying base IO object + rv = file.writelines(iterable) + await self._check() + return rv + + +@delegate_to_executor("cleanup") +@proxy_property_directly("name") +class AsyncTemporaryDirectory: + """Async wrapper for TemporaryDirectory class""" + + def __init__(self, file, loop, executor): + self._file = file + self._loop = loop + self._executor = executor + + async def close(self): + await self.cleanup() |
