from __future__ import annotations import sys from collections.abc import AsyncIterable, Iterable, Mapping, Sequence from io import BytesIO from os import PathLike from subprocess import PIPE, CalledProcessError, CompletedProcess from typing import IO, Any, Union, cast from ..abc import Process from ._eventloop import get_async_backend from ._tasks import create_task_group if sys.version_info >= (3, 10): from typing import TypeAlias else: from typing_extensions import TypeAlias StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] async def run_process( command: StrOrBytesPath | Sequence[StrOrBytesPath], *, input: bytes | None = None, stdin: int | IO[Any] | None = None, stdout: int | IO[Any] | None = PIPE, stderr: int | IO[Any] | None = PIPE, check: bool = True, cwd: StrOrBytesPath | None = None, env: Mapping[str, str] | None = None, startupinfo: Any = None, creationflags: int = 0, start_new_session: bool = False, pass_fds: Sequence[int] = (), user: str | int | None = None, group: str | int | None = None, extra_groups: Iterable[str | int] | None = None, umask: int = -1, ) -> CompletedProcess[bytes]: """ Run an external command in a subprocess and wait until it completes. .. seealso:: :func:`subprocess.run` :param command: either a string to pass to the shell, or an iterable of strings containing the executable name or path and its arguments :param input: bytes passed to the standard input of the subprocess :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a file-like object, or `None`; ``input`` overrides this :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a file-like object, or `None` :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, :data:`subprocess.STDOUT`, a file-like object, or `None` :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the process terminates with a return code other than 0 :param cwd: If not ``None``, change the working directory to this before running the command :param env: if not ``None``, this mapping replaces the inherited environment variables from the parent process :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used to specify process startup parameters (Windows only) :param creationflags: flags that can be used to control the creation of the subprocess (see :class:`subprocess.Popen` for the specifics) :param start_new_session: if ``true`` the setsid() system call will be made in the child process prior to the execution of the subprocess. (POSIX only) :param pass_fds: sequence of file descriptors to keep open between the parent and child processes. (POSIX only) :param user: effective user to run the process as (Python >= 3.9, POSIX only) :param group: effective group to run the process as (Python >= 3.9, POSIX only) :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9, POSIX only) :param umask: if not negative, this umask is applied in the child process before running the given command (Python >= 3.9, POSIX only) :return: an object representing the completed process :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process exits with a nonzero return code """ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None: buffer = BytesIO() async for chunk in stream: buffer.write(chunk) stream_contents[index] = buffer.getvalue() if stdin is not None and input is not None: raise ValueError("only one of stdin and input is allowed") async with await open_process( command, stdin=PIPE if input else stdin, stdout=stdout, stderr=stderr, cwd=cwd, env=env, startupinfo=startupinfo, creationflags=creationflags, start_new_session=start_new_session, pass_fds=pass_fds, user=user, group=group, extra_groups=extra_groups, umask=umask, ) as process: stream_contents: list[bytes | None] = [None, None] async with create_task_group() as tg: if process.stdout: tg.start_soon(drain_stream, process.stdout, 0) if process.stderr: tg.start_soon(drain_stream, process.stderr, 1) if process.stdin and input: await process.stdin.send(input) await process.stdin.aclose() await process.wait() output, errors = stream_contents if check and process.returncode != 0: raise CalledProcessError(cast(int, process.returncode), command, output, errors) return CompletedProcess(command, cast(int, process.returncode), output, errors) async def open_process( command: StrOrBytesPath | Sequence[StrOrBytesPath], *, stdin: int | IO[Any] | None = PIPE, stdout: int | IO[Any] | None = PIPE, stderr: int | IO[Any] | None = PIPE, cwd: StrOrBytesPath | None = None, env: Mapping[str, str] | None = None, startupinfo: Any = None, creationflags: int = 0, start_new_session: bool = False, pass_fds: Sequence[int] = (), user: str | int | None = None, group: str | int | None = None, extra_groups: Iterable[str | int] | None = None, umask: int = -1, ) -> Process: """ Start an external command in a subprocess. .. seealso:: :class:`subprocess.Popen` :param command: either a string to pass to the shell, or an iterable of strings containing the executable name or path and its arguments :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a file-like object, or ``None`` :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a file-like object, or ``None`` :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, :data:`subprocess.STDOUT`, a file-like object, or ``None`` :param cwd: If not ``None``, the working directory is changed before executing :param env: If env is not ``None``, it must be a mapping that defines the environment variables for the new process :param creationflags: flags that can be used to control the creation of the subprocess (see :class:`subprocess.Popen` for the specifics) :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used to specify process startup parameters (Windows only) :param start_new_session: if ``true`` the setsid() system call will be made in the child process prior to the execution of the subprocess. (POSIX only) :param pass_fds: sequence of file descriptors to keep open between the parent and child processes. (POSIX only) :param user: effective user to run the process as (POSIX only) :param group: effective group to run the process as (POSIX only) :param extra_groups: supplementary groups to set in the subprocess (POSIX only) :param umask: if not negative, this umask is applied in the child process before running the given command (POSIX only) :return: an asynchronous process object """ kwargs: dict[str, Any] = {} if user is not None: kwargs["user"] = user if group is not None: kwargs["group"] = group if extra_groups is not None: kwargs["extra_groups"] = group if umask >= 0: kwargs["umask"] = umask return await get_async_backend().open_process( command, stdin=stdin, stdout=stdout, stderr=stderr, cwd=cwd, env=env, startupinfo=startupinfo, creationflags=creationflags, start_new_session=start_new_session, pass_fds=pass_fds, **kwargs, )