diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/asgiref')
11 files changed, 1727 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asgiref/__init__.py b/.venv/lib/python3.12/site-packages/asgiref/__init__.py new file mode 100644 index 00000000..e4e78c0b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/__init__.py @@ -0,0 +1 @@ +__version__ = "3.8.1" diff --git a/.venv/lib/python3.12/site-packages/asgiref/compatibility.py b/.venv/lib/python3.12/site-packages/asgiref/compatibility.py new file mode 100644 index 00000000..3a2a63e6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/compatibility.py @@ -0,0 +1,48 @@ +import inspect + +from .sync import iscoroutinefunction + + +def is_double_callable(application): + """ + Tests to see if an application is a legacy-style (double-callable) application. + """ + # Look for a hint on the object first + if getattr(application, "_asgi_single_callable", False): + return False + if getattr(application, "_asgi_double_callable", False): + return True + # Uninstanted classes are double-callable + if inspect.isclass(application): + return True + # Instanted classes depend on their __call__ + if hasattr(application, "__call__"): + # We only check to see if its __call__ is a coroutine function - + # if it's not, it still might be a coroutine function itself. + if iscoroutinefunction(application.__call__): + return False + # Non-classes we just check directly + return not iscoroutinefunction(application) + + +def double_to_single_callable(application): + """ + Transforms a double-callable ASGI application into a single-callable one. + """ + + async def new_application(scope, receive, send): + instance = application(scope) + return await instance(receive, send) + + return new_application + + +def guarantee_single_callable(application): + """ + Takes either a single- or double-callable application and always returns it + in single-callable style. Use this to add backwards compatibility for ASGI + 2.0 applications to your server/test harness/etc. + """ + if is_double_callable(application): + application = double_to_single_callable(application) + return application diff --git a/.venv/lib/python3.12/site-packages/asgiref/current_thread_executor.py b/.venv/lib/python3.12/site-packages/asgiref/current_thread_executor.py new file mode 100644 index 00000000..67a7926f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/current_thread_executor.py @@ -0,0 +1,115 @@ +import queue +import sys +import threading +from concurrent.futures import Executor, Future +from typing import TYPE_CHECKING, Any, Callable, TypeVar, Union + +if sys.version_info >= (3, 10): + from typing import ParamSpec +else: + from typing_extensions import ParamSpec + +_T = TypeVar("_T") +_P = ParamSpec("_P") +_R = TypeVar("_R") + + +class _WorkItem: + """ + Represents an item needing to be run in the executor. + Copied from ThreadPoolExecutor (but it's private, so we're not going to rely on importing it) + """ + + def __init__( + self, + future: "Future[_R]", + fn: Callable[_P, _R], + *args: _P.args, + **kwargs: _P.kwargs, + ): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self) -> None: + __traceback_hide__ = True # noqa: F841 + if not self.future.set_running_or_notify_cancel(): + return + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException as exc: + self.future.set_exception(exc) + # Break a reference cycle with the exception 'exc' + self = None # type: ignore[assignment] + else: + self.future.set_result(result) + + +class CurrentThreadExecutor(Executor): + """ + An Executor that actually runs code in the thread it is instantiated in. + Passed to other threads running async code, so they can run sync code in + the thread they came from. + """ + + def __init__(self) -> None: + self._work_thread = threading.current_thread() + self._work_queue: queue.Queue[Union[_WorkItem, "Future[Any]"]] = queue.Queue() + self._broken = False + + def run_until_future(self, future: "Future[Any]") -> None: + """ + Runs the code in the work queue until a result is available from the future. + Should be run from the thread the executor is initialised in. + """ + # Check we're in the right thread + if threading.current_thread() != self._work_thread: + raise RuntimeError( + "You cannot run CurrentThreadExecutor from a different thread" + ) + future.add_done_callback(self._work_queue.put) + # Keep getting and running work items until we get the future we're waiting for + # back via the future's done callback. + try: + while True: + # Get a work item and run it + work_item = self._work_queue.get() + if work_item is future: + return + assert isinstance(work_item, _WorkItem) + work_item.run() + del work_item + finally: + self._broken = True + + def _submit( + self, + fn: Callable[_P, _R], + *args: _P.args, + **kwargs: _P.kwargs, + ) -> "Future[_R]": + # Check they're not submitting from the same thread + if threading.current_thread() == self._work_thread: + raise RuntimeError( + "You cannot submit onto CurrentThreadExecutor from its own thread" + ) + # Check they're not too late or the executor errored + if self._broken: + raise RuntimeError("CurrentThreadExecutor already quit or is broken") + # Add to work queue + f: "Future[_R]" = Future() + work_item = _WorkItem(f, fn, *args, **kwargs) + self._work_queue.put(work_item) + # Return the future + return f + + # Python 3.9+ has a new signature for submit with a "/" after `fn`, to enforce + # it to be a positional argument. If we ignore[override] mypy on 3.9+ will be + # happy but 3.8 will say that the ignore comment is unused, even when + # defining them differently based on sys.version_info. + # We should be able to remove this when we drop support for 3.8. + if not TYPE_CHECKING: + + def submit(self, fn, *args, **kwargs): + return self._submit(fn, *args, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/asgiref/local.py b/.venv/lib/python3.12/site-packages/asgiref/local.py new file mode 100644 index 00000000..a8b9459b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/local.py @@ -0,0 +1,128 @@ +import asyncio +import contextlib +import contextvars +import threading +from typing import Any, Dict, Union + + +class _CVar: + """Storage utility for Local.""" + + def __init__(self) -> None: + self._data: "contextvars.ContextVar[Dict[str, Any]]" = contextvars.ContextVar( + "asgiref.local" + ) + + def __getattr__(self, key): + storage_object = self._data.get({}) + try: + return storage_object[key] + except KeyError: + raise AttributeError(f"{self!r} object has no attribute {key!r}") + + def __setattr__(self, key: str, value: Any) -> None: + if key == "_data": + return super().__setattr__(key, value) + + storage_object = self._data.get({}) + storage_object[key] = value + self._data.set(storage_object) + + def __delattr__(self, key: str) -> None: + storage_object = self._data.get({}) + if key in storage_object: + del storage_object[key] + self._data.set(storage_object) + else: + raise AttributeError(f"{self!r} object has no attribute {key!r}") + + +class Local: + """Local storage for async tasks. + + This is a namespace object (similar to `threading.local`) where data is + also local to the current async task (if there is one). + + In async threads, local means in the same sense as the `contextvars` + module - i.e. a value set in an async frame will be visible: + + - to other async code `await`-ed from this frame. + - to tasks spawned using `asyncio` utilities (`create_task`, `wait_for`, + `gather` and probably others). + - to code scheduled in a sync thread using `sync_to_async` + + In "sync" threads (a thread with no async event loop running), the + data is thread-local, but additionally shared with async code executed + via the `async_to_sync` utility, which schedules async code in a new thread + and copies context across to that thread. + + If `thread_critical` is True, then the local will only be visible per-thread, + behaving exactly like `threading.local` if the thread is sync, and as + `contextvars` if the thread is async. This allows genuinely thread-sensitive + code (such as DB handles) to be kept stricly to their initial thread and + disable the sharing across `sync_to_async` and `async_to_sync` wrapped calls. + + Unlike plain `contextvars` objects, this utility is threadsafe. + """ + + def __init__(self, thread_critical: bool = False) -> None: + self._thread_critical = thread_critical + self._thread_lock = threading.RLock() + + self._storage: "Union[threading.local, _CVar]" + + if thread_critical: + # Thread-local storage + self._storage = threading.local() + else: + # Contextvar storage + self._storage = _CVar() + + @contextlib.contextmanager + def _lock_storage(self): + # Thread safe access to storage + if self._thread_critical: + try: + # this is a test for are we in a async or sync + # thread - will raise RuntimeError if there is + # no current loop + asyncio.get_running_loop() + except RuntimeError: + # We are in a sync thread, the storage is + # just the plain thread local (i.e, "global within + # this thread" - it doesn't matter where you are + # in a call stack you see the same storage) + yield self._storage + else: + # We are in an async thread - storage is still + # local to this thread, but additionally should + # behave like a context var (is only visible with + # the same async call stack) + + # Ensure context exists in the current thread + if not hasattr(self._storage, "cvar"): + self._storage.cvar = _CVar() + + # self._storage is a thread local, so the members + # can't be accessed in another thread (we don't + # need any locks) + yield self._storage.cvar + else: + # Lock for thread_critical=False as other threads + # can access the exact same storage object + with self._thread_lock: + yield self._storage + + def __getattr__(self, key): + with self._lock_storage() as storage: + return getattr(storage, key) + + def __setattr__(self, key, value): + if key in ("_local", "_storage", "_thread_critical", "_thread_lock"): + return super().__setattr__(key, value) + with self._lock_storage() as storage: + setattr(storage, key, value) + + def __delattr__(self, key): + with self._lock_storage() as storage: + delattr(storage, key) diff --git a/.venv/lib/python3.12/site-packages/asgiref/py.typed b/.venv/lib/python3.12/site-packages/asgiref/py.typed new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/py.typed diff --git a/.venv/lib/python3.12/site-packages/asgiref/server.py b/.venv/lib/python3.12/site-packages/asgiref/server.py new file mode 100644 index 00000000..43c28c6c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/server.py @@ -0,0 +1,157 @@ +import asyncio +import logging +import time +import traceback + +from .compatibility import guarantee_single_callable + +logger = logging.getLogger(__name__) + + +class StatelessServer: + """ + Base server class that handles basic concepts like application instance + creation/pooling, exception handling, and similar, for stateless protocols + (i.e. ones without actual incoming connections to the process) + + Your code should override the handle() method, doing whatever it needs to, + and calling get_or_create_application_instance with a unique `scope_id` + and `scope` for the scope it wants to get. + + If an application instance is found with the same `scope_id`, you are + given its input queue, otherwise one is made for you with the scope provided + and you are given that fresh new input queue. Either way, you should do + something like: + + input_queue = self.get_or_create_application_instance( + "user-123456", + {"type": "testprotocol", "user_id": "123456", "username": "andrew"}, + ) + input_queue.put_nowait(message) + + If you try and create an application instance and there are already + `max_application` instances, the oldest/least recently used one will be + reclaimed and shut down to make space. + + Application coroutines that error will be found periodically (every 100ms + by default) and have their exceptions printed to the console. Override + application_exception() if you want to do more when this happens. + + If you override run(), make sure you handle things like launching the + application checker. + """ + + application_checker_interval = 0.1 + + def __init__(self, application, max_applications=1000): + # Parameters + self.application = application + self.max_applications = max_applications + # Initialisation + self.application_instances = {} + + ### Mainloop and handling + + def run(self): + """ + Runs the asyncio event loop with our handler loop. + """ + event_loop = asyncio.get_event_loop() + asyncio.ensure_future(self.application_checker()) + try: + event_loop.run_until_complete(self.handle()) + except KeyboardInterrupt: + logger.info("Exiting due to Ctrl-C/interrupt") + + async def handle(self): + raise NotImplementedError("You must implement handle()") + + async def application_send(self, scope, message): + """ + Receives outbound sends from applications and handles them. + """ + raise NotImplementedError("You must implement application_send()") + + ### Application instance management + + def get_or_create_application_instance(self, scope_id, scope): + """ + Creates an application instance and returns its queue. + """ + if scope_id in self.application_instances: + self.application_instances[scope_id]["last_used"] = time.time() + return self.application_instances[scope_id]["input_queue"] + # See if we need to delete an old one + while len(self.application_instances) > self.max_applications: + self.delete_oldest_application_instance() + # Make an instance of the application + input_queue = asyncio.Queue() + application_instance = guarantee_single_callable(self.application) + # Run it, and stash the future for later checking + future = asyncio.ensure_future( + application_instance( + scope=scope, + receive=input_queue.get, + send=lambda message: self.application_send(scope, message), + ), + ) + self.application_instances[scope_id] = { + "input_queue": input_queue, + "future": future, + "scope": scope, + "last_used": time.time(), + } + return input_queue + + def delete_oldest_application_instance(self): + """ + Finds and deletes the oldest application instance + """ + oldest_time = min( + details["last_used"] for details in self.application_instances.values() + ) + for scope_id, details in self.application_instances.items(): + if details["last_used"] == oldest_time: + self.delete_application_instance(scope_id) + # Return to make sure we only delete one in case two have + # the same oldest time + return + + def delete_application_instance(self, scope_id): + """ + Removes an application instance (makes sure its task is stopped, + then removes it from the current set) + """ + details = self.application_instances[scope_id] + del self.application_instances[scope_id] + if not details["future"].done(): + details["future"].cancel() + + async def application_checker(self): + """ + Goes through the set of current application instance Futures and cleans up + any that are done/prints exceptions for any that errored. + """ + while True: + await asyncio.sleep(self.application_checker_interval) + for scope_id, details in list(self.application_instances.items()): + if details["future"].done(): + exception = details["future"].exception() + if exception: + await self.application_exception(exception, details) + try: + del self.application_instances[scope_id] + except KeyError: + # Exception handling might have already got here before us. That's fine. + pass + + async def application_exception(self, exception, application_details): + """ + Called whenever an application coroutine has an exception. + """ + logging.error( + "Exception inside application: %s\n%s%s", + exception, + "".join(traceback.format_tb(exception.__traceback__)), + f" {exception}", + ) diff --git a/.venv/lib/python3.12/site-packages/asgiref/sync.py b/.venv/lib/python3.12/site-packages/asgiref/sync.py new file mode 100644 index 00000000..4427fc2a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/sync.py @@ -0,0 +1,613 @@ +import asyncio +import asyncio.coroutines +import contextvars +import functools +import inspect +import os +import sys +import threading +import warnings +import weakref +from concurrent.futures import Future, ThreadPoolExecutor +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Coroutine, + Dict, + Generic, + List, + Optional, + TypeVar, + Union, + overload, +) + +from .current_thread_executor import CurrentThreadExecutor +from .local import Local + +if sys.version_info >= (3, 10): + from typing import ParamSpec +else: + from typing_extensions import ParamSpec + +if TYPE_CHECKING: + # This is not available to import at runtime + from _typeshed import OptExcInfo + +_F = TypeVar("_F", bound=Callable[..., Any]) +_P = ParamSpec("_P") +_R = TypeVar("_R") + + +def _restore_context(context: contextvars.Context) -> None: + # Check for changes in contextvars, and set them to the current + # context for downstream consumers + for cvar in context: + cvalue = context.get(cvar) + try: + if cvar.get() != cvalue: + cvar.set(cvalue) + except LookupError: + cvar.set(cvalue) + + +# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for +# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker. +# The latter is replaced with the inspect.markcoroutinefunction decorator. +# Until 3.12 is the minimum supported Python version, provide a shim. + +if hasattr(inspect, "markcoroutinefunction"): + iscoroutinefunction = inspect.iscoroutinefunction + markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction +else: + iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment] + + def markcoroutinefunction(func: _F) -> _F: + func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore + return func + + +class ThreadSensitiveContext: + """Async context manager to manage context for thread sensitive mode + + This context manager controls which thread pool executor is used when in + thread sensitive mode. By default, a single thread pool executor is shared + within a process. + + The ThreadSensitiveContext() context manager may be used to specify a + thread pool per context. + + This context manager is re-entrant, so only the outer-most call to + ThreadSensitiveContext will set the context. + + Usage: + + >>> import time + >>> async with ThreadSensitiveContext(): + ... await sync_to_async(time.sleep, 1)() + """ + + def __init__(self): + self.token = None + + async def __aenter__(self): + try: + SyncToAsync.thread_sensitive_context.get() + except LookupError: + self.token = SyncToAsync.thread_sensitive_context.set(self) + + return self + + async def __aexit__(self, exc, value, tb): + if not self.token: + return + + executor = SyncToAsync.context_to_thread_executor.pop(self, None) + if executor: + executor.shutdown() + SyncToAsync.thread_sensitive_context.reset(self.token) + + +class AsyncToSync(Generic[_P, _R]): + """ + Utility class which turns an awaitable that only works on the thread with + the event loop into a synchronous callable that works in a subthread. + + If the call stack contains an async loop, the code runs there. + Otherwise, the code runs in a new loop in a new thread. + + Either way, this thread then pauses and waits to run any thread_sensitive + code called from further down the call stack using SyncToAsync, before + finally exiting once the async task returns. + """ + + # Keeps a reference to the CurrentThreadExecutor in local context, so that + # any sync_to_async inside the wrapped code can find it. + executors: "Local" = Local() + + # When we can't find a CurrentThreadExecutor from the context, such as + # inside create_task, we'll look it up here from the running event loop. + loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {} + + def __init__( + self, + awaitable: Union[ + Callable[_P, Coroutine[Any, Any, _R]], + Callable[_P, Awaitable[_R]], + ], + force_new_loop: bool = False, + ): + if not callable(awaitable) or ( + not iscoroutinefunction(awaitable) + and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable)) + ): + # Python does not have very reliable detection of async functions + # (lots of false negatives) so this is just a warning. + warnings.warn( + "async_to_sync was passed a non-async-marked callable", stacklevel=2 + ) + self.awaitable = awaitable + try: + self.__self__ = self.awaitable.__self__ # type: ignore[union-attr] + except AttributeError: + pass + self.force_new_loop = force_new_loop + self.main_event_loop = None + try: + self.main_event_loop = asyncio.get_running_loop() + except RuntimeError: + # There's no event loop in this thread. + pass + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: + __traceback_hide__ = True # noqa: F841 + + if not self.force_new_loop and not self.main_event_loop: + # There's no event loop in this thread. Look for the threadlocal if + # we're inside SyncToAsync + main_event_loop_pid = getattr( + SyncToAsync.threadlocal, "main_event_loop_pid", None + ) + # We make sure the parent loop is from the same process - if + # they've forked, this is not going to be valid any more (#194) + if main_event_loop_pid and main_event_loop_pid == os.getpid(): + self.main_event_loop = getattr( + SyncToAsync.threadlocal, "main_event_loop", None + ) + + # You can't call AsyncToSync from a thread with a running event loop + try: + event_loop = asyncio.get_running_loop() + except RuntimeError: + pass + else: + if event_loop.is_running(): + raise RuntimeError( + "You cannot use AsyncToSync in the same thread as an async event loop - " + "just await the async function directly." + ) + + # Make a future for the return information + call_result: "Future[_R]" = Future() + + # Make a CurrentThreadExecutor we'll use to idle in this thread - we + # need one for every sync frame, even if there's one above us in the + # same thread. + old_executor = getattr(self.executors, "current", None) + current_executor = CurrentThreadExecutor() + self.executors.current = current_executor + + # Wrapping context in list so it can be reassigned from within + # `main_wrap`. + context = [contextvars.copy_context()] + + # Get task context so that parent task knows which task to propagate + # an asyncio.CancelledError to. + task_context = getattr(SyncToAsync.threadlocal, "task_context", None) + + loop = None + # Use call_soon_threadsafe to schedule a synchronous callback on the + # main event loop's thread if it's there, otherwise make a new loop + # in this thread. + try: + awaitable = self.main_wrap( + call_result, + sys.exc_info(), + task_context, + context, + *args, + **kwargs, + ) + + if not (self.main_event_loop and self.main_event_loop.is_running()): + # Make our own event loop - in a new thread - and run inside that. + loop = asyncio.new_event_loop() + self.loop_thread_executors[loop] = current_executor + loop_executor = ThreadPoolExecutor(max_workers=1) + loop_future = loop_executor.submit( + self._run_event_loop, loop, awaitable + ) + if current_executor: + # Run the CurrentThreadExecutor until the future is done + current_executor.run_until_future(loop_future) + # Wait for future and/or allow for exception propagation + loop_future.result() + else: + # Call it inside the existing loop + self.main_event_loop.call_soon_threadsafe( + self.main_event_loop.create_task, awaitable + ) + if current_executor: + # Run the CurrentThreadExecutor until the future is done + current_executor.run_until_future(call_result) + finally: + # Clean up any executor we were running + if loop is not None: + del self.loop_thread_executors[loop] + _restore_context(context[0]) + # Restore old current thread executor state + self.executors.current = old_executor + + # Wait for results from the future. + return call_result.result() + + def _run_event_loop(self, loop, coro): + """ + Runs the given event loop (designed to be called in a thread). + """ + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(coro) + finally: + try: + # mimic asyncio.run() behavior + # cancel unexhausted async generators + tasks = asyncio.all_tasks(loop) + for task in tasks: + task.cancel() + + async def gather(): + await asyncio.gather(*tasks, return_exceptions=True) + + loop.run_until_complete(gather()) + for task in tasks: + if task.cancelled(): + continue + if task.exception() is not None: + loop.call_exception_handler( + { + "message": "unhandled exception during loop shutdown", + "exception": task.exception(), + "task": task, + } + ) + if hasattr(loop, "shutdown_asyncgens"): + loop.run_until_complete(loop.shutdown_asyncgens()) + finally: + loop.close() + asyncio.set_event_loop(self.main_event_loop) + + def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]: + """ + Include self for methods + """ + func = functools.partial(self.__call__, parent) + return functools.update_wrapper(func, self.awaitable) + + async def main_wrap( + self, + call_result: "Future[_R]", + exc_info: "OptExcInfo", + task_context: "Optional[List[asyncio.Task[Any]]]", + context: List[contextvars.Context], + *args: _P.args, + **kwargs: _P.kwargs, + ) -> None: + """ + Wraps the awaitable with something that puts the result into the + result/exception future. + """ + + __traceback_hide__ = True # noqa: F841 + + if context is not None: + _restore_context(context[0]) + + current_task = asyncio.current_task() + if current_task is not None and task_context is not None: + task_context.append(current_task) + + try: + # If we have an exception, run the function inside the except block + # after raising it so exc_info is correctly populated. + if exc_info[1]: + try: + raise exc_info[1] + except BaseException: + result = await self.awaitable(*args, **kwargs) + else: + result = await self.awaitable(*args, **kwargs) + except BaseException as e: + call_result.set_exception(e) + else: + call_result.set_result(result) + finally: + if current_task is not None and task_context is not None: + task_context.remove(current_task) + context[0] = contextvars.copy_context() + + +class SyncToAsync(Generic[_P, _R]): + """ + Utility class which turns a synchronous callable into an awaitable that + runs in a threadpool. It also sets a threadlocal inside the thread so + calls to AsyncToSync can escape it. + + If thread_sensitive is passed, the code will run in the same thread as any + outer code. This is needed for underlying Python code that is not + threadsafe (for example, code which handles SQLite database connections). + + If the outermost program is async (i.e. SyncToAsync is outermost), then + this will be a dedicated single sub-thread that all sync code runs in, + one after the other. If the outermost program is sync (i.e. AsyncToSync is + outermost), this will just be the main thread. This is achieved by idling + with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, + rather than just blocking. + + If executor is passed in, that will be used instead of the loop's default executor. + In order to pass in an executor, thread_sensitive must be set to False, otherwise + a TypeError will be raised. + """ + + # Storage for main event loop references + threadlocal = threading.local() + + # Single-thread executor for thread-sensitive code + single_thread_executor = ThreadPoolExecutor(max_workers=1) + + # Maintain a contextvar for the current execution context. Optionally used + # for thread sensitive mode. + thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = ( + contextvars.ContextVar("thread_sensitive_context") + ) + + # Contextvar that is used to detect if the single thread executor + # would be awaited on while already being used in the same context + deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar( + "deadlock_context" + ) + + # Maintaining a weak reference to the context ensures that thread pools are + # erased once the context goes out of scope. This terminates the thread pool. + context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = ( + weakref.WeakKeyDictionary() + ) + + def __init__( + self, + func: Callable[_P, _R], + thread_sensitive: bool = True, + executor: Optional["ThreadPoolExecutor"] = None, + ) -> None: + if ( + not callable(func) + or iscoroutinefunction(func) + or iscoroutinefunction(getattr(func, "__call__", func)) + ): + raise TypeError("sync_to_async can only be applied to sync functions.") + self.func = func + functools.update_wrapper(self, func) + self._thread_sensitive = thread_sensitive + markcoroutinefunction(self) + if thread_sensitive and executor is not None: + raise TypeError("executor must not be set when thread_sensitive is True") + self._executor = executor + try: + self.__self__ = func.__self__ # type: ignore + except AttributeError: + pass + + async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: + __traceback_hide__ = True # noqa: F841 + loop = asyncio.get_running_loop() + + # Work out what thread to run the code in + if self._thread_sensitive: + current_thread_executor = getattr(AsyncToSync.executors, "current", None) + if current_thread_executor: + # If we have a parent sync thread above somewhere, use that + executor = current_thread_executor + elif self.thread_sensitive_context.get(None): + # If we have a way of retrieving the current context, attempt + # to use a per-context thread pool executor + thread_sensitive_context = self.thread_sensitive_context.get() + + if thread_sensitive_context in self.context_to_thread_executor: + # Re-use thread executor in current context + executor = self.context_to_thread_executor[thread_sensitive_context] + else: + # Create new thread executor in current context + executor = ThreadPoolExecutor(max_workers=1) + self.context_to_thread_executor[thread_sensitive_context] = executor + elif loop in AsyncToSync.loop_thread_executors: + # Re-use thread executor for running loop + executor = AsyncToSync.loop_thread_executors[loop] + elif self.deadlock_context.get(False): + raise RuntimeError( + "Single thread executor already being used, would deadlock" + ) + else: + # Otherwise, we run it in a fixed single thread + executor = self.single_thread_executor + self.deadlock_context.set(True) + else: + # Use the passed in executor, or the loop's default if it is None + executor = self._executor + + context = contextvars.copy_context() + child = functools.partial(self.func, *args, **kwargs) + func = context.run + task_context: List[asyncio.Task[Any]] = [] + + # Run the code in the right thread + exec_coro = loop.run_in_executor( + executor, + functools.partial( + self.thread_handler, + loop, + sys.exc_info(), + task_context, + func, + child, + ), + ) + ret: _R + try: + ret = await asyncio.shield(exec_coro) + except asyncio.CancelledError: + cancel_parent = True + try: + task = task_context[0] + task.cancel() + try: + await task + cancel_parent = False + except asyncio.CancelledError: + pass + except IndexError: + pass + if exec_coro.done(): + raise + if cancel_parent: + exec_coro.cancel() + ret = await exec_coro + finally: + _restore_context(context) + self.deadlock_context.set(False) + + return ret + + def __get__( + self, parent: Any, objtype: Any + ) -> Callable[_P, Coroutine[Any, Any, _R]]: + """ + Include self for methods + """ + func = functools.partial(self.__call__, parent) + return functools.update_wrapper(func, self.func) + + def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs): + """ + Wraps the sync application with exception handling. + """ + + __traceback_hide__ = True # noqa: F841 + + # Set the threadlocal for AsyncToSync + self.threadlocal.main_event_loop = loop + self.threadlocal.main_event_loop_pid = os.getpid() + self.threadlocal.task_context = task_context + + # Run the function + # If we have an exception, run the function inside the except block + # after raising it so exc_info is correctly populated. + if exc_info[1]: + try: + raise exc_info[1] + except BaseException: + return func(*args, **kwargs) + else: + return func(*args, **kwargs) + + +@overload +def async_to_sync( + *, + force_new_loop: bool = False, +) -> Callable[ + [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]], + Callable[_P, _R], +]: + ... + + +@overload +def async_to_sync( + awaitable: Union[ + Callable[_P, Coroutine[Any, Any, _R]], + Callable[_P, Awaitable[_R]], + ], + *, + force_new_loop: bool = False, +) -> Callable[_P, _R]: + ... + + +def async_to_sync( + awaitable: Optional[ + Union[ + Callable[_P, Coroutine[Any, Any, _R]], + Callable[_P, Awaitable[_R]], + ] + ] = None, + *, + force_new_loop: bool = False, +) -> Union[ + Callable[ + [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]], + Callable[_P, _R], + ], + Callable[_P, _R], +]: + if awaitable is None: + return lambda f: AsyncToSync( + f, + force_new_loop=force_new_loop, + ) + return AsyncToSync( + awaitable, + force_new_loop=force_new_loop, + ) + + +@overload +def sync_to_async( + *, + thread_sensitive: bool = True, + executor: Optional["ThreadPoolExecutor"] = None, +) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]: + ... + + +@overload +def sync_to_async( + func: Callable[_P, _R], + *, + thread_sensitive: bool = True, + executor: Optional["ThreadPoolExecutor"] = None, +) -> Callable[_P, Coroutine[Any, Any, _R]]: + ... + + +def sync_to_async( + func: Optional[Callable[_P, _R]] = None, + *, + thread_sensitive: bool = True, + executor: Optional["ThreadPoolExecutor"] = None, +) -> Union[ + Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]], + Callable[_P, Coroutine[Any, Any, _R]], +]: + if func is None: + return lambda f: SyncToAsync( + f, + thread_sensitive=thread_sensitive, + executor=executor, + ) + return SyncToAsync( + func, + thread_sensitive=thread_sensitive, + executor=executor, + ) diff --git a/.venv/lib/python3.12/site-packages/asgiref/testing.py b/.venv/lib/python3.12/site-packages/asgiref/testing.py new file mode 100644 index 00000000..aa7cff1c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/testing.py @@ -0,0 +1,103 @@ +import asyncio +import contextvars +import time + +from .compatibility import guarantee_single_callable +from .timeout import timeout as async_timeout + + +class ApplicationCommunicator: + """ + Runs an ASGI application in a test mode, allowing sending of + messages to it and retrieval of messages it sends. + """ + + def __init__(self, application, scope): + self.application = guarantee_single_callable(application) + self.scope = scope + self.input_queue = asyncio.Queue() + self.output_queue = asyncio.Queue() + # Clear context - this ensures that context vars set in the testing scope + # are not "leaked" into the application which would normally begin with + # an empty context. In Python >= 3.11 this could also be written as: + # asyncio.create_task(..., context=contextvars.Context()) + self.future = contextvars.Context().run( + asyncio.create_task, + self.application(scope, self.input_queue.get, self.output_queue.put), + ) + + async def wait(self, timeout=1): + """ + Waits for the application to stop itself and returns any exceptions. + """ + try: + async with async_timeout(timeout): + try: + await self.future + self.future.result() + except asyncio.CancelledError: + pass + finally: + if not self.future.done(): + self.future.cancel() + try: + await self.future + except asyncio.CancelledError: + pass + + def stop(self, exceptions=True): + if not self.future.done(): + self.future.cancel() + elif exceptions: + # Give a chance to raise any exceptions + self.future.result() + + def __del__(self): + # Clean up on deletion + try: + self.stop(exceptions=False) + except RuntimeError: + # Event loop already stopped + pass + + async def send_input(self, message): + """ + Sends a single message to the application + """ + # Give it the message + await self.input_queue.put(message) + + async def receive_output(self, timeout=1): + """ + Receives a single message from the application, with optional timeout. + """ + # Make sure there's not an exception to raise from the task + if self.future.done(): + self.future.result() + # Wait and receive the message + try: + async with async_timeout(timeout): + return await self.output_queue.get() + except asyncio.TimeoutError as e: + # See if we have another error to raise inside + if self.future.done(): + self.future.result() + else: + self.future.cancel() + try: + await self.future + except asyncio.CancelledError: + pass + raise e + + async def receive_nothing(self, timeout=0.1, interval=0.01): + """ + Checks that there is no message to receive in the given time. + """ + # `interval` has precedence over `timeout` + start = time.monotonic() + while time.monotonic() - start < timeout: + if not self.output_queue.empty(): + return False + await asyncio.sleep(interval) + return self.output_queue.empty() diff --git a/.venv/lib/python3.12/site-packages/asgiref/timeout.py b/.venv/lib/python3.12/site-packages/asgiref/timeout.py new file mode 100644 index 00000000..fd5381d0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/timeout.py @@ -0,0 +1,118 @@ +# This code is originally sourced from the aio-libs project "async_timeout", +# under the Apache 2.0 license. You may see the original project at +# https://github.com/aio-libs/async-timeout + +# It is vendored here to reduce chain-dependencies on this library, and +# modified slightly to remove some features we don't use. + + +import asyncio +import warnings +from types import TracebackType +from typing import Any # noqa +from typing import Optional, Type + + +class timeout: + """timeout context manager. + + Useful in cases when you want to apply timeout logic around block + of code or in cases when asyncio.wait_for is not suitable. For example: + + >>> with timeout(0.001): + ... async with aiohttp.get('https://github.com') as r: + ... await r.text() + + + timeout - value in seconds or None to disable timeout logic + loop - asyncio compatible event loop + """ + + def __init__( + self, + timeout: Optional[float], + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self._timeout = timeout + if loop is None: + loop = asyncio.get_running_loop() + else: + warnings.warn( + """The loop argument to timeout() is deprecated.""", DeprecationWarning + ) + self._loop = loop + self._task = None # type: Optional[asyncio.Task[Any]] + self._cancelled = False + self._cancel_handler = None # type: Optional[asyncio.Handle] + self._cancel_at = None # type: Optional[float] + + def __enter__(self) -> "timeout": + return self._do_enter() + + def __exit__( + self, + exc_type: Type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> Optional[bool]: + self._do_exit(exc_type) + return None + + async def __aenter__(self) -> "timeout": + return self._do_enter() + + async def __aexit__( + self, + exc_type: Type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: + self._do_exit(exc_type) + + @property + def expired(self) -> bool: + return self._cancelled + + @property + def remaining(self) -> Optional[float]: + if self._cancel_at is not None: + return max(self._cancel_at - self._loop.time(), 0.0) + else: + return None + + def _do_enter(self) -> "timeout": + # Support Tornado 5- without timeout + # Details: https://github.com/python/asyncio/issues/392 + if self._timeout is None: + return self + + self._task = asyncio.current_task(self._loop) + if self._task is None: + raise RuntimeError( + "Timeout context manager should be used " "inside a task" + ) + + if self._timeout <= 0: + self._loop.call_soon(self._cancel_task) + return self + + self._cancel_at = self._loop.time() + self._timeout + self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task) + return self + + def _do_exit(self, exc_type: Type[BaseException]) -> None: + if exc_type is asyncio.CancelledError and self._cancelled: + self._cancel_handler = None + self._task = None + raise asyncio.TimeoutError + if self._timeout is not None and self._cancel_handler is not None: + self._cancel_handler.cancel() + self._cancel_handler = None + self._task = None + return None + + def _cancel_task(self) -> None: + if self._task is not None: + self._task.cancel() + self._cancelled = True diff --git a/.venv/lib/python3.12/site-packages/asgiref/typing.py b/.venv/lib/python3.12/site-packages/asgiref/typing.py new file mode 100644 index 00000000..71c25ed8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/typing.py @@ -0,0 +1,278 @@ +import sys +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Iterable, + Literal, + Optional, + Protocol, + Tuple, + Type, + TypedDict, + Union, +) + +if sys.version_info >= (3, 11): + from typing import NotRequired +else: + from typing_extensions import NotRequired + +__all__ = ( + "ASGIVersions", + "HTTPScope", + "WebSocketScope", + "LifespanScope", + "WWWScope", + "Scope", + "HTTPRequestEvent", + "HTTPResponseStartEvent", + "HTTPResponseBodyEvent", + "HTTPResponseTrailersEvent", + "HTTPResponsePathsendEvent", + "HTTPServerPushEvent", + "HTTPDisconnectEvent", + "WebSocketConnectEvent", + "WebSocketAcceptEvent", + "WebSocketReceiveEvent", + "WebSocketSendEvent", + "WebSocketResponseStartEvent", + "WebSocketResponseBodyEvent", + "WebSocketDisconnectEvent", + "WebSocketCloseEvent", + "LifespanStartupEvent", + "LifespanShutdownEvent", + "LifespanStartupCompleteEvent", + "LifespanStartupFailedEvent", + "LifespanShutdownCompleteEvent", + "LifespanShutdownFailedEvent", + "ASGIReceiveEvent", + "ASGISendEvent", + "ASGIReceiveCallable", + "ASGISendCallable", + "ASGI2Protocol", + "ASGI2Application", + "ASGI3Application", + "ASGIApplication", +) + + +class ASGIVersions(TypedDict): + spec_version: str + version: Union[Literal["2.0"], Literal["3.0"]] + + +class HTTPScope(TypedDict): + type: Literal["http"] + asgi: ASGIVersions + http_version: str + method: str + scheme: str + path: str + raw_path: bytes + query_string: bytes + root_path: str + headers: Iterable[Tuple[bytes, bytes]] + client: Optional[Tuple[str, int]] + server: Optional[Tuple[str, Optional[int]]] + state: NotRequired[Dict[str, Any]] + extensions: Optional[Dict[str, Dict[object, object]]] + + +class WebSocketScope(TypedDict): + type: Literal["websocket"] + asgi: ASGIVersions + http_version: str + scheme: str + path: str + raw_path: bytes + query_string: bytes + root_path: str + headers: Iterable[Tuple[bytes, bytes]] + client: Optional[Tuple[str, int]] + server: Optional[Tuple[str, Optional[int]]] + subprotocols: Iterable[str] + state: NotRequired[Dict[str, Any]] + extensions: Optional[Dict[str, Dict[object, object]]] + + +class LifespanScope(TypedDict): + type: Literal["lifespan"] + asgi: ASGIVersions + state: NotRequired[Dict[str, Any]] + + +WWWScope = Union[HTTPScope, WebSocketScope] +Scope = Union[HTTPScope, WebSocketScope, LifespanScope] + + +class HTTPRequestEvent(TypedDict): + type: Literal["http.request"] + body: bytes + more_body: bool + + +class HTTPResponseDebugEvent(TypedDict): + type: Literal["http.response.debug"] + info: Dict[str, object] + + +class HTTPResponseStartEvent(TypedDict): + type: Literal["http.response.start"] + status: int + headers: Iterable[Tuple[bytes, bytes]] + trailers: bool + + +class HTTPResponseBodyEvent(TypedDict): + type: Literal["http.response.body"] + body: bytes + more_body: bool + + +class HTTPResponseTrailersEvent(TypedDict): + type: Literal["http.response.trailers"] + headers: Iterable[Tuple[bytes, bytes]] + more_trailers: bool + + +class HTTPResponsePathsendEvent(TypedDict): + type: Literal["http.response.pathsend"] + path: str + + +class HTTPServerPushEvent(TypedDict): + type: Literal["http.response.push"] + path: str + headers: Iterable[Tuple[bytes, bytes]] + + +class HTTPDisconnectEvent(TypedDict): + type: Literal["http.disconnect"] + + +class WebSocketConnectEvent(TypedDict): + type: Literal["websocket.connect"] + + +class WebSocketAcceptEvent(TypedDict): + type: Literal["websocket.accept"] + subprotocol: Optional[str] + headers: Iterable[Tuple[bytes, bytes]] + + +class WebSocketReceiveEvent(TypedDict): + type: Literal["websocket.receive"] + bytes: Optional[bytes] + text: Optional[str] + + +class WebSocketSendEvent(TypedDict): + type: Literal["websocket.send"] + bytes: Optional[bytes] + text: Optional[str] + + +class WebSocketResponseStartEvent(TypedDict): + type: Literal["websocket.http.response.start"] + status: int + headers: Iterable[Tuple[bytes, bytes]] + + +class WebSocketResponseBodyEvent(TypedDict): + type: Literal["websocket.http.response.body"] + body: bytes + more_body: bool + + +class WebSocketDisconnectEvent(TypedDict): + type: Literal["websocket.disconnect"] + code: int + + +class WebSocketCloseEvent(TypedDict): + type: Literal["websocket.close"] + code: int + reason: Optional[str] + + +class LifespanStartupEvent(TypedDict): + type: Literal["lifespan.startup"] + + +class LifespanShutdownEvent(TypedDict): + type: Literal["lifespan.shutdown"] + + +class LifespanStartupCompleteEvent(TypedDict): + type: Literal["lifespan.startup.complete"] + + +class LifespanStartupFailedEvent(TypedDict): + type: Literal["lifespan.startup.failed"] + message: str + + +class LifespanShutdownCompleteEvent(TypedDict): + type: Literal["lifespan.shutdown.complete"] + + +class LifespanShutdownFailedEvent(TypedDict): + type: Literal["lifespan.shutdown.failed"] + message: str + + +ASGIReceiveEvent = Union[ + HTTPRequestEvent, + HTTPDisconnectEvent, + WebSocketConnectEvent, + WebSocketReceiveEvent, + WebSocketDisconnectEvent, + LifespanStartupEvent, + LifespanShutdownEvent, +] + + +ASGISendEvent = Union[ + HTTPResponseStartEvent, + HTTPResponseBodyEvent, + HTTPResponseTrailersEvent, + HTTPServerPushEvent, + HTTPDisconnectEvent, + WebSocketAcceptEvent, + WebSocketSendEvent, + WebSocketResponseStartEvent, + WebSocketResponseBodyEvent, + WebSocketCloseEvent, + LifespanStartupCompleteEvent, + LifespanStartupFailedEvent, + LifespanShutdownCompleteEvent, + LifespanShutdownFailedEvent, +] + + +ASGIReceiveCallable = Callable[[], Awaitable[ASGIReceiveEvent]] +ASGISendCallable = Callable[[ASGISendEvent], Awaitable[None]] + + +class ASGI2Protocol(Protocol): + def __init__(self, scope: Scope) -> None: + ... + + async def __call__( + self, receive: ASGIReceiveCallable, send: ASGISendCallable + ) -> None: + ... + + +ASGI2Application = Type[ASGI2Protocol] +ASGI3Application = Callable[ + [ + Scope, + ASGIReceiveCallable, + ASGISendCallable, + ], + Awaitable[None], +] +ASGIApplication = Union[ASGI2Application, ASGI3Application] diff --git a/.venv/lib/python3.12/site-packages/asgiref/wsgi.py b/.venv/lib/python3.12/site-packages/asgiref/wsgi.py new file mode 100644 index 00000000..65af4279 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/wsgi.py @@ -0,0 +1,166 @@ +from io import BytesIO +from tempfile import SpooledTemporaryFile + +from asgiref.sync import AsyncToSync, sync_to_async + + +class WsgiToAsgi: + """ + Wraps a WSGI application to make it into an ASGI application. + """ + + def __init__(self, wsgi_application): + self.wsgi_application = wsgi_application + + async def __call__(self, scope, receive, send): + """ + ASGI application instantiation point. + We return a new WsgiToAsgiInstance here with the WSGI app + and the scope, ready to respond when it is __call__ed. + """ + await WsgiToAsgiInstance(self.wsgi_application)(scope, receive, send) + + +class WsgiToAsgiInstance: + """ + Per-socket instance of a wrapped WSGI application + """ + + def __init__(self, wsgi_application): + self.wsgi_application = wsgi_application + self.response_started = False + self.response_content_length = None + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + raise ValueError("WSGI wrapper received a non-HTTP scope") + self.scope = scope + with SpooledTemporaryFile(max_size=65536) as body: + # Alright, wait for the http.request messages + while True: + message = await receive() + if message["type"] != "http.request": + raise ValueError("WSGI wrapper received a non-HTTP-request message") + body.write(message.get("body", b"")) + if not message.get("more_body"): + break + body.seek(0) + # Wrap send so it can be called from the subthread + self.sync_send = AsyncToSync(send) + # Call the WSGI app + await self.run_wsgi_app(body) + + def build_environ(self, scope, body): + """ + Builds a scope and request body into a WSGI environ object. + """ + script_name = scope.get("root_path", "").encode("utf8").decode("latin1") + path_info = scope["path"].encode("utf8").decode("latin1") + if path_info.startswith(script_name): + path_info = path_info[len(script_name) :] + environ = { + "REQUEST_METHOD": scope["method"], + "SCRIPT_NAME": script_name, + "PATH_INFO": path_info, + "QUERY_STRING": scope["query_string"].decode("ascii"), + "SERVER_PROTOCOL": "HTTP/%s" % scope["http_version"], + "wsgi.version": (1, 0), + "wsgi.url_scheme": scope.get("scheme", "http"), + "wsgi.input": body, + "wsgi.errors": BytesIO(), + "wsgi.multithread": True, + "wsgi.multiprocess": True, + "wsgi.run_once": False, + } + # Get server name and port - required in WSGI, not in ASGI + if "server" in scope: + environ["SERVER_NAME"] = scope["server"][0] + environ["SERVER_PORT"] = str(scope["server"][1]) + else: + environ["SERVER_NAME"] = "localhost" + environ["SERVER_PORT"] = "80" + + if scope.get("client") is not None: + environ["REMOTE_ADDR"] = scope["client"][0] + + # Go through headers and make them into environ entries + for name, value in self.scope.get("headers", []): + name = name.decode("latin1") + if name == "content-length": + corrected_name = "CONTENT_LENGTH" + elif name == "content-type": + corrected_name = "CONTENT_TYPE" + else: + corrected_name = "HTTP_%s" % name.upper().replace("-", "_") + # HTTPbis say only ASCII chars are allowed in headers, but we latin1 just in case + value = value.decode("latin1") + if corrected_name in environ: + value = environ[corrected_name] + "," + value + environ[corrected_name] = value + return environ + + def start_response(self, status, response_headers, exc_info=None): + """ + WSGI start_response callable. + """ + # Don't allow re-calling once response has begun + if self.response_started: + raise exc_info[1].with_traceback(exc_info[2]) + # Don't allow re-calling without exc_info + if hasattr(self, "response_start") and exc_info is None: + raise ValueError( + "You cannot call start_response a second time without exc_info" + ) + # Extract status code + status_code, _ = status.split(" ", 1) + status_code = int(status_code) + # Extract headers + headers = [ + (name.lower().encode("ascii"), value.encode("ascii")) + for name, value in response_headers + ] + # Extract content-length + self.response_content_length = None + for name, value in response_headers: + if name.lower() == "content-length": + self.response_content_length = int(value) + # Build and send response start message. + self.response_start = { + "type": "http.response.start", + "status": status_code, + "headers": headers, + } + + @sync_to_async + def run_wsgi_app(self, body): + """ + Called in a subthread to run the WSGI app. We encapsulate like + this so that the start_response callable is called in the same thread. + """ + # Translate the scope and incoming request body into a WSGI environ + environ = self.build_environ(self.scope, body) + # Run the WSGI app + bytes_sent = 0 + for output in self.wsgi_application(environ, self.start_response): + # If this is the first response, include the response headers + if not self.response_started: + self.response_started = True + self.sync_send(self.response_start) + # If the application supplies a Content-Length header + if self.response_content_length is not None: + # The server should not transmit more bytes to the client than the header allows + bytes_allowed = self.response_content_length - bytes_sent + if len(output) > bytes_allowed: + output = output[:bytes_allowed] + self.sync_send( + {"type": "http.response.body", "body": output, "more_body": True} + ) + bytes_sent += len(output) + # The server should stop iterating over the response when enough data has been sent + if bytes_sent == self.response_content_length: + break + # Close connection + if not self.response_started: + self.response_started = True + self.sync_send(self.response_start) + self.sync_send({"type": "http.response.body"}) |