aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/anyio/_core/_tempfile.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/anyio/_core/_tempfile.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/anyio/_core/_tempfile.py')
-rw-r--r--.venv/lib/python3.12/site-packages/anyio/_core/_tempfile.py616
1 files changed, 616 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/anyio/_core/_tempfile.py b/.venv/lib/python3.12/site-packages/anyio/_core/_tempfile.py
new file mode 100644
index 00000000..26d70eca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/anyio/_core/_tempfile.py
@@ -0,0 +1,616 @@
+from __future__ import annotations
+
+import os
+import sys
+import tempfile
+from collections.abc import Iterable
+from io import BytesIO, TextIOWrapper
+from types import TracebackType
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ AnyStr,
+ Generic,
+ overload,
+)
+
+from .. import to_thread
+from .._core._fileio import AsyncFile
+from ..lowlevel import checkpoint_if_cancelled
+
+if TYPE_CHECKING:
+ from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
+
+
+class TemporaryFile(Generic[AnyStr]):
+ """
+ An asynchronous temporary file that is automatically created and cleaned up.
+
+ This class provides an asynchronous context manager interface to a temporary file.
+ The file is created using Python's standard `tempfile.TemporaryFile` function in a
+ background thread, and is wrapped as an asynchronous file using `AsyncFile`.
+
+ :param mode: The mode in which the file is opened. Defaults to "w+b".
+ :param buffering: The buffering policy (-1 means the default buffering).
+ :param encoding: The encoding used to decode or encode the file. Only applicable in
+ text mode.
+ :param newline: Controls how universal newlines mode works (only applicable in text
+ mode).
+ :param suffix: The suffix for the temporary file name.
+ :param prefix: The prefix for the temporary file name.
+ :param dir: The directory in which the temporary file is created.
+ :param errors: The error handling scheme used for encoding/decoding errors.
+ """
+
+ _async_file: AsyncFile[AnyStr]
+
+ @overload
+ def __init__(
+ self: TemporaryFile[bytes],
+ mode: OpenBinaryMode = ...,
+ buffering: int = ...,
+ encoding: str | None = ...,
+ newline: str | None = ...,
+ suffix: str | None = ...,
+ prefix: str | None = ...,
+ dir: str | None = ...,
+ *,
+ errors: str | None = ...,
+ ): ...
+ @overload
+ def __init__(
+ self: TemporaryFile[str],
+ mode: OpenTextMode,
+ buffering: int = ...,
+ encoding: str | None = ...,
+ newline: str | None = ...,
+ suffix: str | None = ...,
+ prefix: str | None = ...,
+ dir: str | None = ...,
+ *,
+ errors: str | None = ...,
+ ): ...
+
+ def __init__(
+ self,
+ mode: OpenTextMode | OpenBinaryMode = "w+b",
+ buffering: int = -1,
+ encoding: str | None = None,
+ newline: str | None = None,
+ suffix: str | None = None,
+ prefix: str | None = None,
+ dir: str | None = None,
+ *,
+ errors: str | None = None,
+ ) -> None:
+ self.mode = mode
+ self.buffering = buffering
+ self.encoding = encoding
+ self.newline = newline
+ self.suffix: str | None = suffix
+ self.prefix: str | None = prefix
+ self.dir: str | None = dir
+ self.errors = errors
+
+ async def __aenter__(self) -> AsyncFile[AnyStr]:
+ fp = await to_thread.run_sync(
+ lambda: tempfile.TemporaryFile(
+ self.mode,
+ self.buffering,
+ self.encoding,
+ self.newline,
+ self.suffix,
+ self.prefix,
+ self.dir,
+ errors=self.errors,
+ )
+ )
+ self._async_file = AsyncFile(fp)
+ return self._async_file
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None:
+ await self._async_file.aclose()
+
+
+class NamedTemporaryFile(Generic[AnyStr]):
+ """
+ An asynchronous named temporary file that is automatically created and cleaned up.
+
+ This class provides an asynchronous context manager for a temporary file with a
+ visible name in the file system. It uses Python's standard
+ :func:`~tempfile.NamedTemporaryFile` function and wraps the file object with
+ :class:`AsyncFile` for asynchronous operations.
+
+ :param mode: The mode in which the file is opened. Defaults to "w+b".
+ :param buffering: The buffering policy (-1 means the default buffering).
+ :param encoding: The encoding used to decode or encode the file. Only applicable in
+ text mode.
+ :param newline: Controls how universal newlines mode works (only applicable in text
+ mode).
+ :param suffix: The suffix for the temporary file name.
+ :param prefix: The prefix for the temporary file name.
+ :param dir: The directory in which the temporary file is created.
+ :param delete: Whether to delete the file when it is closed.
+ :param errors: The error handling scheme used for encoding/decoding errors.
+ :param delete_on_close: (Python 3.12+) Whether to delete the file on close.
+ """
+
+ _async_file: AsyncFile[AnyStr]
+
+ @overload
+ def __init__(
+ self: NamedTemporaryFile[bytes],
+ mode: OpenBinaryMode = ...,
+ buffering: int = ...,
+ encoding: str | None = ...,
+ newline: str | None = ...,
+ suffix: str | None = ...,
+ prefix: str | None = ...,
+ dir: str | None = ...,
+ delete: bool = ...,
+ *,
+ errors: str | None = ...,
+ delete_on_close: bool = ...,
+ ): ...
+ @overload
+ def __init__(
+ self: NamedTemporaryFile[str],
+ mode: OpenTextMode,
+ buffering: int = ...,
+ encoding: str | None = ...,
+ newline: str | None = ...,
+ suffix: str | None = ...,
+ prefix: str | None = ...,
+ dir: str | None = ...,
+ delete: bool = ...,
+ *,
+ errors: str | None = ...,
+ delete_on_close: bool = ...,
+ ): ...
+
+ def __init__(
+ self,
+ mode: OpenBinaryMode | OpenTextMode = "w+b",
+ buffering: int = -1,
+ encoding: str | None = None,
+ newline: str | None = None,
+ suffix: str | None = None,
+ prefix: str | None = None,
+ dir: str | None = None,
+ delete: bool = True,
+ *,
+ errors: str | None = None,
+ delete_on_close: bool = True,
+ ) -> None:
+ self._params: dict[str, Any] = {
+ "mode": mode,
+ "buffering": buffering,
+ "encoding": encoding,
+ "newline": newline,
+ "suffix": suffix,
+ "prefix": prefix,
+ "dir": dir,
+ "delete": delete,
+ "errors": errors,
+ }
+ if sys.version_info >= (3, 12):
+ self._params["delete_on_close"] = delete_on_close
+
+ async def __aenter__(self) -> AsyncFile[AnyStr]:
+ fp = await to_thread.run_sync(
+ lambda: tempfile.NamedTemporaryFile(**self._params)
+ )
+ self._async_file = AsyncFile(fp)
+ return self._async_file
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None:
+ await self._async_file.aclose()
+
+
+class SpooledTemporaryFile(AsyncFile[AnyStr]):
+ """
+ An asynchronous spooled temporary file that starts in memory and is spooled to disk.
+
+ This class provides an asynchronous interface to a spooled temporary file, much like
+ Python's standard :class:`~tempfile.SpooledTemporaryFile`. It supports asynchronous
+ write operations and provides a method to force a rollover to disk.
+
+ :param max_size: Maximum size in bytes before the file is rolled over to disk.
+ :param mode: The mode in which the file is opened. Defaults to "w+b".
+ :param buffering: The buffering policy (-1 means the default buffering).
+ :param encoding: The encoding used to decode or encode the file (text mode only).
+ :param newline: Controls how universal newlines mode works (text mode only).
+ :param suffix: The suffix for the temporary file name.
+ :param prefix: The prefix for the temporary file name.
+ :param dir: The directory in which the temporary file is created.
+ :param errors: The error handling scheme used for encoding/decoding errors.
+ """
+
+ _rolled: bool = False
+
+ @overload
+ def __init__(
+ self: SpooledTemporaryFile[bytes],
+ max_size: int = ...,
+ mode: OpenBinaryMode = ...,
+ buffering: int = ...,
+ encoding: str | None = ...,
+ newline: str | None = ...,
+ suffix: str | None = ...,
+ prefix: str | None = ...,
+ dir: str | None = ...,
+ *,
+ errors: str | None = ...,
+ ): ...
+ @overload
+ def __init__(
+ self: SpooledTemporaryFile[str],
+ max_size: int = ...,
+ mode: OpenTextMode = ...,
+ buffering: int = ...,
+ encoding: str | None = ...,
+ newline: str | None = ...,
+ suffix: str | None = ...,
+ prefix: str | None = ...,
+ dir: str | None = ...,
+ *,
+ errors: str | None = ...,
+ ): ...
+
+ def __init__(
+ self,
+ max_size: int = 0,
+ mode: OpenBinaryMode | OpenTextMode = "w+b",
+ buffering: int = -1,
+ encoding: str | None = None,
+ newline: str | None = None,
+ suffix: str | None = None,
+ prefix: str | None = None,
+ dir: str | None = None,
+ *,
+ errors: str | None = None,
+ ) -> None:
+ self._tempfile_params: dict[str, Any] = {
+ "mode": mode,
+ "buffering": buffering,
+ "encoding": encoding,
+ "newline": newline,
+ "suffix": suffix,
+ "prefix": prefix,
+ "dir": dir,
+ "errors": errors,
+ }
+ self._max_size = max_size
+ if "b" in mode:
+ super().__init__(BytesIO()) # type: ignore[arg-type]
+ else:
+ super().__init__(
+ TextIOWrapper( # type: ignore[arg-type]
+ BytesIO(),
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ write_through=True,
+ )
+ )
+
+ async def aclose(self) -> None:
+ if not self._rolled:
+ self._fp.close()
+ return
+
+ await super().aclose()
+
+ async def _check(self) -> None:
+ if self._rolled or self._fp.tell() < self._max_size:
+ return
+
+ await self.rollover()
+
+ async def rollover(self) -> None:
+ if self._rolled:
+ return
+
+ self._rolled = True
+ buffer = self._fp
+ buffer.seek(0)
+ self._fp = await to_thread.run_sync(
+ lambda: tempfile.TemporaryFile(**self._tempfile_params)
+ )
+ await self.write(buffer.read())
+ buffer.close()
+
+ @property
+ def closed(self) -> bool:
+ return self._fp.closed
+
+ async def read(self, size: int = -1) -> AnyStr:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ return self._fp.read(size)
+
+ return await super().read(size) # type: ignore[return-value]
+
+ async def read1(self: SpooledTemporaryFile[bytes], size: int = -1) -> bytes:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ return self._fp.read1(size)
+
+ return await super().read1(size)
+
+ async def readline(self) -> AnyStr:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ return self._fp.readline()
+
+ return await super().readline() # type: ignore[return-value]
+
+ async def readlines(self) -> list[AnyStr]:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ return self._fp.readlines()
+
+ return await super().readlines() # type: ignore[return-value]
+
+ async def readinto(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ self._fp.readinto(b)
+
+ return await super().readinto(b)
+
+ async def readinto1(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ self._fp.readinto(b)
+
+ return await super().readinto1(b)
+
+ async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ return self._fp.seek(offset, whence)
+
+ return await super().seek(offset, whence)
+
+ async def tell(self) -> int:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ return self._fp.tell()
+
+ return await super().tell()
+
+ async def truncate(self, size: int | None = None) -> int:
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ return self._fp.truncate(size)
+
+ return await super().truncate(size)
+
+ @overload
+ async def write(self: SpooledTemporaryFile[bytes], b: ReadableBuffer) -> int: ...
+ @overload
+ async def write(self: SpooledTemporaryFile[str], b: str) -> int: ...
+
+ async def write(self, b: ReadableBuffer | str) -> int:
+ """
+ Asynchronously write data to the spooled temporary file.
+
+ If the file has not yet been rolled over, the data is written synchronously,
+ and a rollover is triggered if the size exceeds the maximum size.
+
+ :param s: The data to write.
+ :return: The number of bytes written.
+ :raises RuntimeError: If the underlying file is not initialized.
+
+ """
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ result = self._fp.write(b)
+ await self._check()
+ return result
+
+ return await super().write(b) # type: ignore[misc]
+
+ @overload
+ async def writelines(
+ self: SpooledTemporaryFile[bytes], lines: Iterable[ReadableBuffer]
+ ) -> None: ...
+ @overload
+ async def writelines(
+ self: SpooledTemporaryFile[str], lines: Iterable[str]
+ ) -> None: ...
+
+ async def writelines(self, lines: Iterable[str] | Iterable[ReadableBuffer]) -> None:
+ """
+ Asynchronously write a list of lines to the spooled temporary file.
+
+ If the file has not yet been rolled over, the lines are written synchronously,
+ and a rollover is triggered if the size exceeds the maximum size.
+
+ :param lines: An iterable of lines to write.
+ :raises RuntimeError: If the underlying file is not initialized.
+
+ """
+ if not self._rolled:
+ await checkpoint_if_cancelled()
+ result = self._fp.writelines(lines)
+ await self._check()
+ return result
+
+ return await super().writelines(lines) # type: ignore[misc]
+
+
+class TemporaryDirectory(Generic[AnyStr]):
+ """
+ An asynchronous temporary directory that is created and cleaned up automatically.
+
+ This class provides an asynchronous context manager for creating a temporary
+ directory. It wraps Python's standard :class:`~tempfile.TemporaryDirectory` to
+ perform directory creation and cleanup operations in a background thread.
+
+ :param suffix: Suffix to be added to the temporary directory name.
+ :param prefix: Prefix to be added to the temporary directory name.
+ :param dir: The parent directory where the temporary directory is created.
+ :param ignore_cleanup_errors: Whether to ignore errors during cleanup
+ (Python 3.10+).
+ :param delete: Whether to delete the directory upon closing (Python 3.12+).
+ """
+
+ def __init__(
+ self,
+ suffix: AnyStr | None = None,
+ prefix: AnyStr | None = None,
+ dir: AnyStr | None = None,
+ *,
+ ignore_cleanup_errors: bool = False,
+ delete: bool = True,
+ ) -> None:
+ self.suffix: AnyStr | None = suffix
+ self.prefix: AnyStr | None = prefix
+ self.dir: AnyStr | None = dir
+ self.ignore_cleanup_errors = ignore_cleanup_errors
+ self.delete = delete
+
+ self._tempdir: tempfile.TemporaryDirectory | None = None
+
+ async def __aenter__(self) -> str:
+ params: dict[str, Any] = {
+ "suffix": self.suffix,
+ "prefix": self.prefix,
+ "dir": self.dir,
+ }
+ if sys.version_info >= (3, 10):
+ params["ignore_cleanup_errors"] = self.ignore_cleanup_errors
+
+ if sys.version_info >= (3, 12):
+ params["delete"] = self.delete
+
+ self._tempdir = await to_thread.run_sync(
+ lambda: tempfile.TemporaryDirectory(**params)
+ )
+ return await to_thread.run_sync(self._tempdir.__enter__)
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None:
+ if self._tempdir is not None:
+ await to_thread.run_sync(
+ self._tempdir.__exit__, exc_type, exc_value, traceback
+ )
+
+ async def cleanup(self) -> None:
+ if self._tempdir is not None:
+ await to_thread.run_sync(self._tempdir.cleanup)
+
+
+@overload
+async def mkstemp(
+ suffix: str | None = None,
+ prefix: str | None = None,
+ dir: str | None = None,
+ text: bool = False,
+) -> tuple[int, str]: ...
+
+
+@overload
+async def mkstemp(
+ suffix: bytes | None = None,
+ prefix: bytes | None = None,
+ dir: bytes | None = None,
+ text: bool = False,
+) -> tuple[int, bytes]: ...
+
+
+async def mkstemp(
+ suffix: AnyStr | None = None,
+ prefix: AnyStr | None = None,
+ dir: AnyStr | None = None,
+ text: bool = False,
+) -> tuple[int, str | bytes]:
+ """
+ Asynchronously create a temporary file and return an OS-level handle and the file
+ name.
+
+ This function wraps `tempfile.mkstemp` and executes it in a background thread.
+
+ :param suffix: Suffix to be added to the file name.
+ :param prefix: Prefix to be added to the file name.
+ :param dir: Directory in which the temporary file is created.
+ :param text: Whether the file is opened in text mode.
+ :return: A tuple containing the file descriptor and the file name.
+
+ """
+ return await to_thread.run_sync(tempfile.mkstemp, suffix, prefix, dir, text)
+
+
+@overload
+async def mkdtemp(
+ suffix: str | None = None,
+ prefix: str | None = None,
+ dir: str | None = None,
+) -> str: ...
+
+
+@overload
+async def mkdtemp(
+ suffix: bytes | None = None,
+ prefix: bytes | None = None,
+ dir: bytes | None = None,
+) -> bytes: ...
+
+
+async def mkdtemp(
+ suffix: AnyStr | None = None,
+ prefix: AnyStr | None = None,
+ dir: AnyStr | None = None,
+) -> str | bytes:
+ """
+ Asynchronously create a temporary directory and return its path.
+
+ This function wraps `tempfile.mkdtemp` and executes it in a background thread.
+
+ :param suffix: Suffix to be added to the directory name.
+ :param prefix: Prefix to be added to the directory name.
+ :param dir: Parent directory where the temporary directory is created.
+ :return: The path of the created temporary directory.
+
+ """
+ return await to_thread.run_sync(tempfile.mkdtemp, suffix, prefix, dir)
+
+
+async def gettempdir() -> str:
+ """
+ Asynchronously return the name of the directory used for temporary files.
+
+ This function wraps `tempfile.gettempdir` and executes it in a background thread.
+
+ :return: The path of the temporary directory as a string.
+
+ """
+ return await to_thread.run_sync(tempfile.gettempdir)
+
+
+async def gettempdirb() -> bytes:
+ """
+ Asynchronously return the name of the directory used for temporary files in bytes.
+
+ This function wraps `tempfile.gettempdirb` and executes it in a background thread.
+
+ :return: The path of the temporary directory as bytes.
+
+ """
+ return await to_thread.run_sync(tempfile.gettempdirb)