diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/asgiref/testing.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/asgiref/testing.py | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asgiref/testing.py b/.venv/lib/python3.12/site-packages/asgiref/testing.py new file mode 100644 index 00000000..aa7cff1c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/testing.py @@ -0,0 +1,103 @@ +import asyncio +import contextvars +import time + +from .compatibility import guarantee_single_callable +from .timeout import timeout as async_timeout + + +class ApplicationCommunicator: + """ + Runs an ASGI application in a test mode, allowing sending of + messages to it and retrieval of messages it sends. + """ + + def __init__(self, application, scope): + self.application = guarantee_single_callable(application) + self.scope = scope + self.input_queue = asyncio.Queue() + self.output_queue = asyncio.Queue() + # Clear context - this ensures that context vars set in the testing scope + # are not "leaked" into the application which would normally begin with + # an empty context. In Python >= 3.11 this could also be written as: + # asyncio.create_task(..., context=contextvars.Context()) + self.future = contextvars.Context().run( + asyncio.create_task, + self.application(scope, self.input_queue.get, self.output_queue.put), + ) + + async def wait(self, timeout=1): + """ + Waits for the application to stop itself and returns any exceptions. + """ + try: + async with async_timeout(timeout): + try: + await self.future + self.future.result() + except asyncio.CancelledError: + pass + finally: + if not self.future.done(): + self.future.cancel() + try: + await self.future + except asyncio.CancelledError: + pass + + def stop(self, exceptions=True): + if not self.future.done(): + self.future.cancel() + elif exceptions: + # Give a chance to raise any exceptions + self.future.result() + + def __del__(self): + # Clean up on deletion + try: + self.stop(exceptions=False) + except RuntimeError: + # Event loop already stopped + pass + + async def send_input(self, message): + """ + Sends a single message to the application + """ + # Give it the message + await self.input_queue.put(message) + + async def receive_output(self, timeout=1): + """ + Receives a single message from the application, with optional timeout. + """ + # Make sure there's not an exception to raise from the task + if self.future.done(): + self.future.result() + # Wait and receive the message + try: + async with async_timeout(timeout): + return await self.output_queue.get() + except asyncio.TimeoutError as e: + # See if we have another error to raise inside + if self.future.done(): + self.future.result() + else: + self.future.cancel() + try: + await self.future + except asyncio.CancelledError: + pass + raise e + + async def receive_nothing(self, timeout=0.1, interval=0.01): + """ + Checks that there is no message to receive in the given time. + """ + # `interval` has precedence over `timeout` + start = time.monotonic() + while time.monotonic() - start < timeout: + if not self.output_queue.empty(): + return False + await asyncio.sleep(interval) + return self.output_queue.empty() |