aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/anyio/_core/_subprocesses.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/anyio/_core/_subprocesses.py')
-rw-r--r--.venv/lib/python3.12/site-packages/anyio/_core/_subprocesses.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/anyio/_core/_subprocesses.py b/.venv/lib/python3.12/site-packages/anyio/_core/_subprocesses.py
new file mode 100644
index 00000000..36d9b306
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/anyio/_core/_subprocesses.py
@@ -0,0 +1,202 @@
+from __future__ import annotations
+
+import sys
+from collections.abc import AsyncIterable, Iterable, Mapping, Sequence
+from io import BytesIO
+from os import PathLike
+from subprocess import PIPE, CalledProcessError, CompletedProcess
+from typing import IO, Any, Union, cast
+
+from ..abc import Process
+from ._eventloop import get_async_backend
+from ._tasks import create_task_group
+
+if sys.version_info >= (3, 10):
+ from typing import TypeAlias
+else:
+ from typing_extensions import TypeAlias
+
+StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
+
+
+async def run_process(
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
+ *,
+ input: bytes | None = None,
+ stdin: int | IO[Any] | None = None,
+ stdout: int | IO[Any] | None = PIPE,
+ stderr: int | IO[Any] | None = PIPE,
+ check: bool = True,
+ cwd: StrOrBytesPath | None = None,
+ env: Mapping[str, str] | None = None,
+ startupinfo: Any = None,
+ creationflags: int = 0,
+ start_new_session: bool = False,
+ pass_fds: Sequence[int] = (),
+ user: str | int | None = None,
+ group: str | int | None = None,
+ extra_groups: Iterable[str | int] | None = None,
+ umask: int = -1,
+) -> CompletedProcess[bytes]:
+ """
+ Run an external command in a subprocess and wait until it completes.
+
+ .. seealso:: :func:`subprocess.run`
+
+ :param command: either a string to pass to the shell, or an iterable of strings
+ containing the executable name or path and its arguments
+ :param input: bytes passed to the standard input of the subprocess
+ :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
+ a file-like object, or `None`; ``input`` overrides this
+ :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
+ a file-like object, or `None`
+ :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
+ :data:`subprocess.STDOUT`, a file-like object, or `None`
+ :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the
+ process terminates with a return code other than 0
+ :param cwd: If not ``None``, change the working directory to this before running the
+ command
+ :param env: if not ``None``, this mapping replaces the inherited environment
+ variables from the parent process
+ :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
+ to specify process startup parameters (Windows only)
+ :param creationflags: flags that can be used to control the creation of the
+ subprocess (see :class:`subprocess.Popen` for the specifics)
+ :param start_new_session: if ``true`` the setsid() system call will be made in the
+ child process prior to the execution of the subprocess. (POSIX only)
+ :param pass_fds: sequence of file descriptors to keep open between the parent and
+ child processes. (POSIX only)
+ :param user: effective user to run the process as (Python >= 3.9, POSIX only)
+ :param group: effective group to run the process as (Python >= 3.9, POSIX only)
+ :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9,
+ POSIX only)
+ :param umask: if not negative, this umask is applied in the child process before
+ running the given command (Python >= 3.9, POSIX only)
+ :return: an object representing the completed process
+ :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process
+ exits with a nonzero return code
+
+ """
+
+ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
+ buffer = BytesIO()
+ async for chunk in stream:
+ buffer.write(chunk)
+
+ stream_contents[index] = buffer.getvalue()
+
+ if stdin is not None and input is not None:
+ raise ValueError("only one of stdin and input is allowed")
+
+ async with await open_process(
+ command,
+ stdin=PIPE if input else stdin,
+ stdout=stdout,
+ stderr=stderr,
+ cwd=cwd,
+ env=env,
+ startupinfo=startupinfo,
+ creationflags=creationflags,
+ start_new_session=start_new_session,
+ pass_fds=pass_fds,
+ user=user,
+ group=group,
+ extra_groups=extra_groups,
+ umask=umask,
+ ) as process:
+ stream_contents: list[bytes | None] = [None, None]
+ async with create_task_group() as tg:
+ if process.stdout:
+ tg.start_soon(drain_stream, process.stdout, 0)
+
+ if process.stderr:
+ tg.start_soon(drain_stream, process.stderr, 1)
+
+ if process.stdin and input:
+ await process.stdin.send(input)
+ await process.stdin.aclose()
+
+ await process.wait()
+
+ output, errors = stream_contents
+ if check and process.returncode != 0:
+ raise CalledProcessError(cast(int, process.returncode), command, output, errors)
+
+ return CompletedProcess(command, cast(int, process.returncode), output, errors)
+
+
+async def open_process(
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
+ *,
+ stdin: int | IO[Any] | None = PIPE,
+ stdout: int | IO[Any] | None = PIPE,
+ stderr: int | IO[Any] | None = PIPE,
+ cwd: StrOrBytesPath | None = None,
+ env: Mapping[str, str] | None = None,
+ startupinfo: Any = None,
+ creationflags: int = 0,
+ start_new_session: bool = False,
+ pass_fds: Sequence[int] = (),
+ user: str | int | None = None,
+ group: str | int | None = None,
+ extra_groups: Iterable[str | int] | None = None,
+ umask: int = -1,
+) -> Process:
+ """
+ Start an external command in a subprocess.
+
+ .. seealso:: :class:`subprocess.Popen`
+
+ :param command: either a string to pass to the shell, or an iterable of strings
+ containing the executable name or path and its arguments
+ :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a
+ file-like object, or ``None``
+ :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
+ a file-like object, or ``None``
+ :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
+ :data:`subprocess.STDOUT`, a file-like object, or ``None``
+ :param cwd: If not ``None``, the working directory is changed before executing
+ :param env: If env is not ``None``, it must be a mapping that defines the
+ environment variables for the new process
+ :param creationflags: flags that can be used to control the creation of the
+ subprocess (see :class:`subprocess.Popen` for the specifics)
+ :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
+ to specify process startup parameters (Windows only)
+ :param start_new_session: if ``true`` the setsid() system call will be made in the
+ child process prior to the execution of the subprocess. (POSIX only)
+ :param pass_fds: sequence of file descriptors to keep open between the parent and
+ child processes. (POSIX only)
+ :param user: effective user to run the process as (POSIX only)
+ :param group: effective group to run the process as (POSIX only)
+ :param extra_groups: supplementary groups to set in the subprocess (POSIX only)
+ :param umask: if not negative, this umask is applied in the child process before
+ running the given command (POSIX only)
+ :return: an asynchronous process object
+
+ """
+ kwargs: dict[str, Any] = {}
+ if user is not None:
+ kwargs["user"] = user
+
+ if group is not None:
+ kwargs["group"] = group
+
+ if extra_groups is not None:
+ kwargs["extra_groups"] = group
+
+ if umask >= 0:
+ kwargs["umask"] = umask
+
+ return await get_async_backend().open_process(
+ command,
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ cwd=cwd,
+ env=env,
+ startupinfo=startupinfo,
+ creationflags=creationflags,
+ start_new_session=start_new_session,
+ pass_fds=pass_fds,
+ **kwargs,
+ )