about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py')
-rw-r--r--.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py732
1 files changed, 732 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py b/.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py
new file mode 100644
index 00000000..a6331328
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/anyio/_core/_synchronization.py
@@ -0,0 +1,732 @@
+from __future__ import annotations
+
+import math
+from collections import deque
+from dataclasses import dataclass
+from types import TracebackType
+
+from sniffio import AsyncLibraryNotFoundError
+
+from ..lowlevel import checkpoint
+from ._eventloop import get_async_backend
+from ._exceptions import BusyResourceError
+from ._tasks import CancelScope
+from ._testing import TaskInfo, get_current_task
+
+
+@dataclass(frozen=True)
+class EventStatistics:
+    """
+    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
+    """
+
+    tasks_waiting: int
+
+
+@dataclass(frozen=True)
+class CapacityLimiterStatistics:
+    """
+    :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
+    :ivar float total_tokens: total number of available tokens
+    :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
+        this limiter
+    :ivar int tasks_waiting: number of tasks waiting on
+        :meth:`~.CapacityLimiter.acquire` or
+        :meth:`~.CapacityLimiter.acquire_on_behalf_of`
+    """
+
+    borrowed_tokens: int
+    total_tokens: float
+    borrowers: tuple[object, ...]
+    tasks_waiting: int
+
+
+@dataclass(frozen=True)
+class LockStatistics:
+    """
+    :ivar bool locked: flag indicating if this lock is locked or not
+    :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
+        lock is not held by any task)
+    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
+    """
+
+    locked: bool
+    owner: TaskInfo | None
+    tasks_waiting: int
+
+
+@dataclass(frozen=True)
+class ConditionStatistics:
+    """
+    :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
+    :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
+        :class:`~.Lock`
+    """
+
+    tasks_waiting: int
+    lock_statistics: LockStatistics
+
+
+@dataclass(frozen=True)
+class SemaphoreStatistics:
+    """
+    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
+
+    """
+
+    tasks_waiting: int
+
+
+class Event:
+    def __new__(cls) -> Event:
+        try:
+            return get_async_backend().create_event()
+        except AsyncLibraryNotFoundError:
+            return EventAdapter()
+
+    def set(self) -> None:
+        """Set the flag, notifying all listeners."""
+        raise NotImplementedError
+
+    def is_set(self) -> bool:
+        """Return ``True`` if the flag is set, ``False`` if not."""
+        raise NotImplementedError
+
+    async def wait(self) -> None:
+        """
+        Wait until the flag has been set.
+
+        If the flag has already been set when this method is called, it returns
+        immediately.
+
+        """
+        raise NotImplementedError
+
+    def statistics(self) -> EventStatistics:
+        """Return statistics about the current state of this event."""
+        raise NotImplementedError
+
+
+class EventAdapter(Event):
+    _internal_event: Event | None = None
+    _is_set: bool = False
+
+    def __new__(cls) -> EventAdapter:
+        return object.__new__(cls)
+
+    @property
+    def _event(self) -> Event:
+        if self._internal_event is None:
+            self._internal_event = get_async_backend().create_event()
+            if self._is_set:
+                self._internal_event.set()
+
+        return self._internal_event
+
+    def set(self) -> None:
+        if self._internal_event is None:
+            self._is_set = True
+        else:
+            self._event.set()
+
+    def is_set(self) -> bool:
+        if self._internal_event is None:
+            return self._is_set
+
+        return self._internal_event.is_set()
+
+    async def wait(self) -> None:
+        await self._event.wait()
+
+    def statistics(self) -> EventStatistics:
+        if self._internal_event is None:
+            return EventStatistics(tasks_waiting=0)
+
+        return self._internal_event.statistics()
+
+
+class Lock:
+    def __new__(cls, *, fast_acquire: bool = False) -> Lock:
+        try:
+            return get_async_backend().create_lock(fast_acquire=fast_acquire)
+        except AsyncLibraryNotFoundError:
+            return LockAdapter(fast_acquire=fast_acquire)
+
+    async def __aenter__(self) -> None:
+        await self.acquire()
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        self.release()
+
+    async def acquire(self) -> None:
+        """Acquire the lock."""
+        raise NotImplementedError
+
+    def acquire_nowait(self) -> None:
+        """
+        Acquire the lock, without blocking.
+
+        :raises ~anyio.WouldBlock: if the operation would block
+
+        """
+        raise NotImplementedError
+
+    def release(self) -> None:
+        """Release the lock."""
+        raise NotImplementedError
+
+    def locked(self) -> bool:
+        """Return True if the lock is currently held."""
+        raise NotImplementedError
+
+    def statistics(self) -> LockStatistics:
+        """
+        Return statistics about the current state of this lock.
+
+        .. versionadded:: 3.0
+        """
+        raise NotImplementedError
+
+
+class LockAdapter(Lock):
+    _internal_lock: Lock | None = None
+
+    def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter:
+        return object.__new__(cls)
+
+    def __init__(self, *, fast_acquire: bool = False):
+        self._fast_acquire = fast_acquire
+
+    @property
+    def _lock(self) -> Lock:
+        if self._internal_lock is None:
+            self._internal_lock = get_async_backend().create_lock(
+                fast_acquire=self._fast_acquire
+            )
+
+        return self._internal_lock
+
+    async def __aenter__(self) -> None:
+        await self._lock.acquire()
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        if self._internal_lock is not None:
+            self._internal_lock.release()
+
+    async def acquire(self) -> None:
+        """Acquire the lock."""
+        await self._lock.acquire()
+
+    def acquire_nowait(self) -> None:
+        """
+        Acquire the lock, without blocking.
+
+        :raises ~anyio.WouldBlock: if the operation would block
+
+        """
+        self._lock.acquire_nowait()
+
+    def release(self) -> None:
+        """Release the lock."""
+        self._lock.release()
+
+    def locked(self) -> bool:
+        """Return True if the lock is currently held."""
+        return self._lock.locked()
+
+    def statistics(self) -> LockStatistics:
+        """
+        Return statistics about the current state of this lock.
+
+        .. versionadded:: 3.0
+
+        """
+        if self._internal_lock is None:
+            return LockStatistics(False, None, 0)
+
+        return self._internal_lock.statistics()
+
+
+class Condition:
+    _owner_task: TaskInfo | None = None
+
+    def __init__(self, lock: Lock | None = None):
+        self._lock = lock or Lock()
+        self._waiters: deque[Event] = deque()
+
+    async def __aenter__(self) -> None:
+        await self.acquire()
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        self.release()
+
+    def _check_acquired(self) -> None:
+        if self._owner_task != get_current_task():
+            raise RuntimeError("The current task is not holding the underlying lock")
+
+    async def acquire(self) -> None:
+        """Acquire the underlying lock."""
+        await self._lock.acquire()
+        self._owner_task = get_current_task()
+
+    def acquire_nowait(self) -> None:
+        """
+        Acquire the underlying lock, without blocking.
+
+        :raises ~anyio.WouldBlock: if the operation would block
+
+        """
+        self._lock.acquire_nowait()
+        self._owner_task = get_current_task()
+
+    def release(self) -> None:
+        """Release the underlying lock."""
+        self._lock.release()
+
+    def locked(self) -> bool:
+        """Return True if the lock is set."""
+        return self._lock.locked()
+
+    def notify(self, n: int = 1) -> None:
+        """Notify exactly n listeners."""
+        self._check_acquired()
+        for _ in range(n):
+            try:
+                event = self._waiters.popleft()
+            except IndexError:
+                break
+
+            event.set()
+
+    def notify_all(self) -> None:
+        """Notify all the listeners."""
+        self._check_acquired()
+        for event in self._waiters:
+            event.set()
+
+        self._waiters.clear()
+
+    async def wait(self) -> None:
+        """Wait for a notification."""
+        await checkpoint()
+        event = Event()
+        self._waiters.append(event)
+        self.release()
+        try:
+            await event.wait()
+        except BaseException:
+            if not event.is_set():
+                self._waiters.remove(event)
+
+            raise
+        finally:
+            with CancelScope(shield=True):
+                await self.acquire()
+
+    def statistics(self) -> ConditionStatistics:
+        """
+        Return statistics about the current state of this condition.
+
+        .. versionadded:: 3.0
+        """
+        return ConditionStatistics(len(self._waiters), self._lock.statistics())
+
+
+class Semaphore:
+    def __new__(
+        cls,
+        initial_value: int,
+        *,
+        max_value: int | None = None,
+        fast_acquire: bool = False,
+    ) -> Semaphore:
+        try:
+            return get_async_backend().create_semaphore(
+                initial_value, max_value=max_value, fast_acquire=fast_acquire
+            )
+        except AsyncLibraryNotFoundError:
+            return SemaphoreAdapter(initial_value, max_value=max_value)
+
+    def __init__(
+        self,
+        initial_value: int,
+        *,
+        max_value: int | None = None,
+        fast_acquire: bool = False,
+    ):
+        if not isinstance(initial_value, int):
+            raise TypeError("initial_value must be an integer")
+        if initial_value < 0:
+            raise ValueError("initial_value must be >= 0")
+        if max_value is not None:
+            if not isinstance(max_value, int):
+                raise TypeError("max_value must be an integer or None")
+            if max_value < initial_value:
+                raise ValueError(
+                    "max_value must be equal to or higher than initial_value"
+                )
+
+        self._fast_acquire = fast_acquire
+
+    async def __aenter__(self) -> Semaphore:
+        await self.acquire()
+        return self
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        self.release()
+
+    async def acquire(self) -> None:
+        """Decrement the semaphore value, blocking if necessary."""
+        raise NotImplementedError
+
+    def acquire_nowait(self) -> None:
+        """
+        Acquire the underlying lock, without blocking.
+
+        :raises ~anyio.WouldBlock: if the operation would block
+
+        """
+        raise NotImplementedError
+
+    def release(self) -> None:
+        """Increment the semaphore value."""
+        raise NotImplementedError
+
+    @property
+    def value(self) -> int:
+        """The current value of the semaphore."""
+        raise NotImplementedError
+
+    @property
+    def max_value(self) -> int | None:
+        """The maximum value of the semaphore."""
+        raise NotImplementedError
+
+    def statistics(self) -> SemaphoreStatistics:
+        """
+        Return statistics about the current state of this semaphore.
+
+        .. versionadded:: 3.0
+        """
+        raise NotImplementedError
+
+
+class SemaphoreAdapter(Semaphore):
+    _internal_semaphore: Semaphore | None = None
+
+    def __new__(
+        cls,
+        initial_value: int,
+        *,
+        max_value: int | None = None,
+        fast_acquire: bool = False,
+    ) -> SemaphoreAdapter:
+        return object.__new__(cls)
+
+    def __init__(
+        self,
+        initial_value: int,
+        *,
+        max_value: int | None = None,
+        fast_acquire: bool = False,
+    ) -> None:
+        super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
+        self._initial_value = initial_value
+        self._max_value = max_value
+
+    @property
+    def _semaphore(self) -> Semaphore:
+        if self._internal_semaphore is None:
+            self._internal_semaphore = get_async_backend().create_semaphore(
+                self._initial_value, max_value=self._max_value
+            )
+
+        return self._internal_semaphore
+
+    async def acquire(self) -> None:
+        await self._semaphore.acquire()
+
+    def acquire_nowait(self) -> None:
+        self._semaphore.acquire_nowait()
+
+    def release(self) -> None:
+        self._semaphore.release()
+
+    @property
+    def value(self) -> int:
+        if self._internal_semaphore is None:
+            return self._initial_value
+
+        return self._semaphore.value
+
+    @property
+    def max_value(self) -> int | None:
+        return self._max_value
+
+    def statistics(self) -> SemaphoreStatistics:
+        if self._internal_semaphore is None:
+            return SemaphoreStatistics(tasks_waiting=0)
+
+        return self._semaphore.statistics()
+
+
+class CapacityLimiter:
+    def __new__(cls, total_tokens: float) -> CapacityLimiter:
+        try:
+            return get_async_backend().create_capacity_limiter(total_tokens)
+        except AsyncLibraryNotFoundError:
+            return CapacityLimiterAdapter(total_tokens)
+
+    async def __aenter__(self) -> None:
+        raise NotImplementedError
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> bool | None:
+        raise NotImplementedError
+
+    @property
+    def total_tokens(self) -> float:
+        """
+        The total number of tokens available for borrowing.
+
+        This is a read-write property. If the total number of tokens is increased, the
+        proportionate number of tasks waiting on this limiter will be granted their
+        tokens.
+
+        .. versionchanged:: 3.0
+            The property is now writable.
+
+        """
+        raise NotImplementedError
+
+    @total_tokens.setter
+    def total_tokens(self, value: float) -> None:
+        raise NotImplementedError
+
+    @property
+    def borrowed_tokens(self) -> int:
+        """The number of tokens that have currently been borrowed."""
+        raise NotImplementedError
+
+    @property
+    def available_tokens(self) -> float:
+        """The number of tokens currently available to be borrowed"""
+        raise NotImplementedError
+
+    def acquire_nowait(self) -> None:
+        """
+        Acquire a token for the current task without waiting for one to become
+        available.
+
+        :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
+
+        """
+        raise NotImplementedError
+
+    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
+        """
+        Acquire a token without waiting for one to become available.
+
+        :param borrower: the entity borrowing a token
+        :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
+
+        """
+        raise NotImplementedError
+
+    async def acquire(self) -> None:
+        """
+        Acquire a token for the current task, waiting if necessary for one to become
+        available.
+
+        """
+        raise NotImplementedError
+
+    async def acquire_on_behalf_of(self, borrower: object) -> None:
+        """
+        Acquire a token, waiting if necessary for one to become available.
+
+        :param borrower: the entity borrowing a token
+
+        """
+        raise NotImplementedError
+
+    def release(self) -> None:
+        """
+        Release the token held by the current task.
+
+        :raises RuntimeError: if the current task has not borrowed a token from this
+            limiter.
+
+        """
+        raise NotImplementedError
+
+    def release_on_behalf_of(self, borrower: object) -> None:
+        """
+        Release the token held by the given borrower.
+
+        :raises RuntimeError: if the borrower has not borrowed a token from this
+            limiter.
+
+        """
+        raise NotImplementedError
+
+    def statistics(self) -> CapacityLimiterStatistics:
+        """
+        Return statistics about the current state of this limiter.
+
+        .. versionadded:: 3.0
+
+        """
+        raise NotImplementedError
+
+
+class CapacityLimiterAdapter(CapacityLimiter):
+    _internal_limiter: CapacityLimiter | None = None
+
+    def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter:
+        return object.__new__(cls)
+
+    def __init__(self, total_tokens: float) -> None:
+        self.total_tokens = total_tokens
+
+    @property
+    def _limiter(self) -> CapacityLimiter:
+        if self._internal_limiter is None:
+            self._internal_limiter = get_async_backend().create_capacity_limiter(
+                self._total_tokens
+            )
+
+        return self._internal_limiter
+
+    async def __aenter__(self) -> None:
+        await self._limiter.__aenter__()
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> bool | None:
+        return await self._limiter.__aexit__(exc_type, exc_val, exc_tb)
+
+    @property
+    def total_tokens(self) -> float:
+        if self._internal_limiter is None:
+            return self._total_tokens
+
+        return self._internal_limiter.total_tokens
+
+    @total_tokens.setter
+    def total_tokens(self, value: float) -> None:
+        if not isinstance(value, int) and value is not math.inf:
+            raise TypeError("total_tokens must be an int or math.inf")
+        elif value < 1:
+            raise ValueError("total_tokens must be >= 1")
+
+        if self._internal_limiter is None:
+            self._total_tokens = value
+            return
+
+        self._limiter.total_tokens = value
+
+    @property
+    def borrowed_tokens(self) -> int:
+        if self._internal_limiter is None:
+            return 0
+
+        return self._internal_limiter.borrowed_tokens
+
+    @property
+    def available_tokens(self) -> float:
+        if self._internal_limiter is None:
+            return self._total_tokens
+
+        return self._internal_limiter.available_tokens
+
+    def acquire_nowait(self) -> None:
+        self._limiter.acquire_nowait()
+
+    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
+        self._limiter.acquire_on_behalf_of_nowait(borrower)
+
+    async def acquire(self) -> None:
+        await self._limiter.acquire()
+
+    async def acquire_on_behalf_of(self, borrower: object) -> None:
+        await self._limiter.acquire_on_behalf_of(borrower)
+
+    def release(self) -> None:
+        self._limiter.release()
+
+    def release_on_behalf_of(self, borrower: object) -> None:
+        self._limiter.release_on_behalf_of(borrower)
+
+    def statistics(self) -> CapacityLimiterStatistics:
+        if self._internal_limiter is None:
+            return CapacityLimiterStatistics(
+                borrowed_tokens=0,
+                total_tokens=self.total_tokens,
+                borrowers=(),
+                tasks_waiting=0,
+            )
+
+        return self._internal_limiter.statistics()
+
+
+class ResourceGuard:
+    """
+    A context manager for ensuring that a resource is only used by a single task at a
+    time.
+
+    Entering this context manager while the previous has not exited it yet will trigger
+    :exc:`BusyResourceError`.
+
+    :param action: the action to guard against (visible in the :exc:`BusyResourceError`
+        when triggered, e.g. "Another task is already {action} this resource")
+
+    .. versionadded:: 4.1
+    """
+
+    __slots__ = "action", "_guarded"
+
+    def __init__(self, action: str = "using"):
+        self.action: str = action
+        self._guarded = False
+
+    def __enter__(self) -> None:
+        if self._guarded:
+            raise BusyResourceError(self.action)
+
+        self._guarded = True
+
+    def __exit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        self._guarded = False