aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/asgiref/testing.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/asgiref/testing.py')
-rw-r--r--.venv/lib/python3.12/site-packages/asgiref/testing.py103
1 files changed, 103 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asgiref/testing.py b/.venv/lib/python3.12/site-packages/asgiref/testing.py
new file mode 100644
index 00000000..aa7cff1c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/asgiref/testing.py
@@ -0,0 +1,103 @@
+import asyncio
+import contextvars
+import time
+
+from .compatibility import guarantee_single_callable
+from .timeout import timeout as async_timeout
+
+
+class ApplicationCommunicator:
+ """
+ Runs an ASGI application in a test mode, allowing sending of
+ messages to it and retrieval of messages it sends.
+ """
+
+ def __init__(self, application, scope):
+ self.application = guarantee_single_callable(application)
+ self.scope = scope
+ self.input_queue = asyncio.Queue()
+ self.output_queue = asyncio.Queue()
+ # Clear context - this ensures that context vars set in the testing scope
+ # are not "leaked" into the application which would normally begin with
+ # an empty context. In Python >= 3.11 this could also be written as:
+ # asyncio.create_task(..., context=contextvars.Context())
+ self.future = contextvars.Context().run(
+ asyncio.create_task,
+ self.application(scope, self.input_queue.get, self.output_queue.put),
+ )
+
+ async def wait(self, timeout=1):
+ """
+ Waits for the application to stop itself and returns any exceptions.
+ """
+ try:
+ async with async_timeout(timeout):
+ try:
+ await self.future
+ self.future.result()
+ except asyncio.CancelledError:
+ pass
+ finally:
+ if not self.future.done():
+ self.future.cancel()
+ try:
+ await self.future
+ except asyncio.CancelledError:
+ pass
+
+ def stop(self, exceptions=True):
+ if not self.future.done():
+ self.future.cancel()
+ elif exceptions:
+ # Give a chance to raise any exceptions
+ self.future.result()
+
+ def __del__(self):
+ # Clean up on deletion
+ try:
+ self.stop(exceptions=False)
+ except RuntimeError:
+ # Event loop already stopped
+ pass
+
+ async def send_input(self, message):
+ """
+ Sends a single message to the application
+ """
+ # Give it the message
+ await self.input_queue.put(message)
+
+ async def receive_output(self, timeout=1):
+ """
+ Receives a single message from the application, with optional timeout.
+ """
+ # Make sure there's not an exception to raise from the task
+ if self.future.done():
+ self.future.result()
+ # Wait and receive the message
+ try:
+ async with async_timeout(timeout):
+ return await self.output_queue.get()
+ except asyncio.TimeoutError as e:
+ # See if we have another error to raise inside
+ if self.future.done():
+ self.future.result()
+ else:
+ self.future.cancel()
+ try:
+ await self.future
+ except asyncio.CancelledError:
+ pass
+ raise e
+
+ async def receive_nothing(self, timeout=0.1, interval=0.01):
+ """
+ Checks that there is no message to receive in the given time.
+ """
+ # `interval` has precedence over `timeout`
+ start = time.monotonic()
+ while time.monotonic() - start < timeout:
+ if not self.output_queue.empty():
+ return False
+ await asyncio.sleep(interval)
+ return self.output_queue.empty()