diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/web_app.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/aiohttp/web_app.py | 620 |
1 files changed, 620 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/web_app.py b/.venv/lib/python3.12/site-packages/aiohttp/web_app.py new file mode 100644 index 00000000..4bdc5403 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiohttp/web_app.py @@ -0,0 +1,620 @@ +import asyncio +import logging +import warnings +from functools import lru_cache, partial, update_wrapper +from typing import ( + TYPE_CHECKING, + Any, + AsyncIterator, + Awaitable, + Callable, + Dict, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Optional, + Sequence, + Tuple, + Type, + TypeVar, + Union, + cast, + overload, +) + +from aiosignal import Signal +from frozenlist import FrozenList + +from . import hdrs +from .abc import ( + AbstractAccessLogger, + AbstractMatchInfo, + AbstractRouter, + AbstractStreamWriter, +) +from .helpers import DEBUG, AppKey +from .http_parser import RawRequestMessage +from .log import web_logger +from .streams import StreamReader +from .typedefs import Handler, Middleware +from .web_exceptions import NotAppKeyWarning +from .web_log import AccessLogger +from .web_middlewares import _fix_request_current_app +from .web_protocol import RequestHandler +from .web_request import Request +from .web_response import StreamResponse +from .web_routedef import AbstractRouteDef +from .web_server import Server +from .web_urldispatcher import ( + AbstractResource, + AbstractRoute, + Domain, + MaskDomain, + MatchedSubAppResource, + PrefixedSubAppResource, + SystemRoute, + UrlDispatcher, +) + +__all__ = ("Application", "CleanupError") + + +if TYPE_CHECKING: + _AppSignal = Signal[Callable[["Application"], Awaitable[None]]] + _RespPrepareSignal = Signal[Callable[[Request, StreamResponse], Awaitable[None]]] + _Middlewares = FrozenList[Middleware] + _MiddlewaresHandlers = Optional[Sequence[Tuple[Middleware, bool]]] + _Subapps = List["Application"] +else: + # No type checker mode, skip types + _AppSignal = Signal + _RespPrepareSignal = Signal + _Middlewares = FrozenList + _MiddlewaresHandlers = Optional[Sequence] + _Subapps = List + +_T = TypeVar("_T") +_U = TypeVar("_U") +_Resource = TypeVar("_Resource", bound=AbstractResource) + + +def _build_middlewares( + handler: Handler, apps: Tuple["Application", ...] +) -> Callable[[Request], Awaitable[StreamResponse]]: + """Apply middlewares to handler.""" + for app in apps[::-1]: + for m, _ in app._middlewares_handlers: # type: ignore[union-attr] + handler = update_wrapper(partial(m, handler=handler), handler) # type: ignore[misc] + return handler + + +_cached_build_middleware = lru_cache(maxsize=1024)(_build_middlewares) + + +class Application(MutableMapping[Union[str, AppKey[Any]], Any]): + ATTRS = frozenset( + [ + "logger", + "_debug", + "_router", + "_loop", + "_handler_args", + "_middlewares", + "_middlewares_handlers", + "_has_legacy_middlewares", + "_run_middlewares", + "_state", + "_frozen", + "_pre_frozen", + "_subapps", + "_on_response_prepare", + "_on_startup", + "_on_shutdown", + "_on_cleanup", + "_client_max_size", + "_cleanup_ctx", + ] + ) + + def __init__( + self, + *, + logger: logging.Logger = web_logger, + router: Optional[UrlDispatcher] = None, + middlewares: Iterable[Middleware] = (), + handler_args: Optional[Mapping[str, Any]] = None, + client_max_size: int = 1024**2, + loop: Optional[asyncio.AbstractEventLoop] = None, + debug: Any = ..., # mypy doesn't support ellipsis + ) -> None: + if router is None: + router = UrlDispatcher() + else: + warnings.warn( + "router argument is deprecated", DeprecationWarning, stacklevel=2 + ) + assert isinstance(router, AbstractRouter), router + + if loop is not None: + warnings.warn( + "loop argument is deprecated", DeprecationWarning, stacklevel=2 + ) + + if debug is not ...: + warnings.warn( + "debug argument is deprecated", DeprecationWarning, stacklevel=2 + ) + self._debug = debug + self._router: UrlDispatcher = router + self._loop = loop + self._handler_args = handler_args + self.logger = logger + + self._middlewares: _Middlewares = FrozenList(middlewares) + + # initialized on freezing + self._middlewares_handlers: _MiddlewaresHandlers = None + # initialized on freezing + self._run_middlewares: Optional[bool] = None + self._has_legacy_middlewares: bool = True + + self._state: Dict[Union[AppKey[Any], str], object] = {} + self._frozen = False + self._pre_frozen = False + self._subapps: _Subapps = [] + + self._on_response_prepare: _RespPrepareSignal = Signal(self) + self._on_startup: _AppSignal = Signal(self) + self._on_shutdown: _AppSignal = Signal(self) + self._on_cleanup: _AppSignal = Signal(self) + self._cleanup_ctx = CleanupContext() + self._on_startup.append(self._cleanup_ctx._on_startup) + self._on_cleanup.append(self._cleanup_ctx._on_cleanup) + self._client_max_size = client_max_size + + def __init_subclass__(cls: Type["Application"]) -> None: + warnings.warn( + "Inheritance class {} from web.Application " + "is discouraged".format(cls.__name__), + DeprecationWarning, + stacklevel=3, + ) + + if DEBUG: # pragma: no cover + + def __setattr__(self, name: str, val: Any) -> None: + if name not in self.ATTRS: + warnings.warn( + "Setting custom web.Application.{} attribute " + "is discouraged".format(name), + DeprecationWarning, + stacklevel=2, + ) + super().__setattr__(name, val) + + # MutableMapping API + + def __eq__(self, other: object) -> bool: + return self is other + + @overload # type: ignore[override] + def __getitem__(self, key: AppKey[_T]) -> _T: ... + + @overload + def __getitem__(self, key: str) -> Any: ... + + def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any: + return self._state[key] + + def _check_frozen(self) -> None: + if self._frozen: + warnings.warn( + "Changing state of started or joined application is deprecated", + DeprecationWarning, + stacklevel=3, + ) + + @overload # type: ignore[override] + def __setitem__(self, key: AppKey[_T], value: _T) -> None: ... + + @overload + def __setitem__(self, key: str, value: Any) -> None: ... + + def __setitem__(self, key: Union[str, AppKey[_T]], value: Any) -> None: + self._check_frozen() + if not isinstance(key, AppKey): + warnings.warn( + "It is recommended to use web.AppKey instances for keys.\n" + + "https://docs.aiohttp.org/en/stable/web_advanced.html" + + "#application-s-config", + category=NotAppKeyWarning, + stacklevel=2, + ) + self._state[key] = value + + def __delitem__(self, key: Union[str, AppKey[_T]]) -> None: + self._check_frozen() + del self._state[key] + + def __len__(self) -> int: + return len(self._state) + + def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]: + return iter(self._state) + + def __hash__(self) -> int: + return id(self) + + @overload # type: ignore[override] + def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]: ... + + @overload + def get(self, key: AppKey[_T], default: _U) -> Union[_T, _U]: ... + + @overload + def get(self, key: str, default: Any = ...) -> Any: ... + + def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any: + return self._state.get(key, default) + + ######## + @property + def loop(self) -> asyncio.AbstractEventLoop: + # Technically the loop can be None + # but we mask it by explicit type cast + # to provide more convenient type annotation + warnings.warn("loop property is deprecated", DeprecationWarning, stacklevel=2) + return cast(asyncio.AbstractEventLoop, self._loop) + + def _set_loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None: + if loop is None: + loop = asyncio.get_event_loop() + if self._loop is not None and self._loop is not loop: + raise RuntimeError( + "web.Application instance initialized with different loop" + ) + + self._loop = loop + + # set loop debug + if self._debug is ...: + self._debug = loop.get_debug() + + # set loop to sub applications + for subapp in self._subapps: + subapp._set_loop(loop) + + @property + def pre_frozen(self) -> bool: + return self._pre_frozen + + def pre_freeze(self) -> None: + if self._pre_frozen: + return + + self._pre_frozen = True + self._middlewares.freeze() + self._router.freeze() + self._on_response_prepare.freeze() + self._cleanup_ctx.freeze() + self._on_startup.freeze() + self._on_shutdown.freeze() + self._on_cleanup.freeze() + self._middlewares_handlers = tuple(self._prepare_middleware()) + self._has_legacy_middlewares = any( + not new_style for _, new_style in self._middlewares_handlers + ) + + # If current app and any subapp do not have middlewares avoid run all + # of the code footprint that it implies, which have a middleware + # hardcoded per app that sets up the current_app attribute. If no + # middlewares are configured the handler will receive the proper + # current_app without needing all of this code. + self._run_middlewares = True if self.middlewares else False + + for subapp in self._subapps: + subapp.pre_freeze() + self._run_middlewares = self._run_middlewares or subapp._run_middlewares + + @property + def frozen(self) -> bool: + return self._frozen + + def freeze(self) -> None: + if self._frozen: + return + + self.pre_freeze() + self._frozen = True + for subapp in self._subapps: + subapp.freeze() + + @property + def debug(self) -> bool: + warnings.warn("debug property is deprecated", DeprecationWarning, stacklevel=2) + return self._debug # type: ignore[no-any-return] + + def _reg_subapp_signals(self, subapp: "Application") -> None: + def reg_handler(signame: str) -> None: + subsig = getattr(subapp, signame) + + async def handler(app: "Application") -> None: + await subsig.send(subapp) + + appsig = getattr(self, signame) + appsig.append(handler) + + reg_handler("on_startup") + reg_handler("on_shutdown") + reg_handler("on_cleanup") + + def add_subapp(self, prefix: str, subapp: "Application") -> PrefixedSubAppResource: + if not isinstance(prefix, str): + raise TypeError("Prefix must be str") + prefix = prefix.rstrip("/") + if not prefix: + raise ValueError("Prefix cannot be empty") + factory = partial(PrefixedSubAppResource, prefix, subapp) + return self._add_subapp(factory, subapp) + + def _add_subapp( + self, resource_factory: Callable[[], _Resource], subapp: "Application" + ) -> _Resource: + if self.frozen: + raise RuntimeError("Cannot add sub application to frozen application") + if subapp.frozen: + raise RuntimeError("Cannot add frozen application") + resource = resource_factory() + self.router.register_resource(resource) + self._reg_subapp_signals(subapp) + self._subapps.append(subapp) + subapp.pre_freeze() + if self._loop is not None: + subapp._set_loop(self._loop) + return resource + + def add_domain(self, domain: str, subapp: "Application") -> MatchedSubAppResource: + if not isinstance(domain, str): + raise TypeError("Domain must be str") + elif "*" in domain: + rule: Domain = MaskDomain(domain) + else: + rule = Domain(domain) + factory = partial(MatchedSubAppResource, rule, subapp) + return self._add_subapp(factory, subapp) + + def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]: + return self.router.add_routes(routes) + + @property + def on_response_prepare(self) -> _RespPrepareSignal: + return self._on_response_prepare + + @property + def on_startup(self) -> _AppSignal: + return self._on_startup + + @property + def on_shutdown(self) -> _AppSignal: + return self._on_shutdown + + @property + def on_cleanup(self) -> _AppSignal: + return self._on_cleanup + + @property + def cleanup_ctx(self) -> "CleanupContext": + return self._cleanup_ctx + + @property + def router(self) -> UrlDispatcher: + return self._router + + @property + def middlewares(self) -> _Middlewares: + return self._middlewares + + def _make_handler( + self, + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + access_log_class: Type[AbstractAccessLogger] = AccessLogger, + **kwargs: Any, + ) -> Server: + + if not issubclass(access_log_class, AbstractAccessLogger): + raise TypeError( + "access_log_class must be subclass of " + "aiohttp.abc.AbstractAccessLogger, got {}".format(access_log_class) + ) + + self._set_loop(loop) + self.freeze() + + kwargs["debug"] = self._debug + kwargs["access_log_class"] = access_log_class + if self._handler_args: + for k, v in self._handler_args.items(): + kwargs[k] = v + + return Server( + self._handle, # type: ignore[arg-type] + request_factory=self._make_request, + loop=self._loop, + **kwargs, + ) + + def make_handler( + self, + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + access_log_class: Type[AbstractAccessLogger] = AccessLogger, + **kwargs: Any, + ) -> Server: + + warnings.warn( + "Application.make_handler(...) is deprecated, use AppRunner API instead", + DeprecationWarning, + stacklevel=2, + ) + + return self._make_handler( + loop=loop, access_log_class=access_log_class, **kwargs + ) + + async def startup(self) -> None: + """Causes on_startup signal + + Should be called in the event loop along with the request handler. + """ + await self.on_startup.send(self) + + async def shutdown(self) -> None: + """Causes on_shutdown signal + + Should be called before cleanup() + """ + await self.on_shutdown.send(self) + + async def cleanup(self) -> None: + """Causes on_cleanup signal + + Should be called after shutdown() + """ + if self.on_cleanup.frozen: + await self.on_cleanup.send(self) + else: + # If an exception occurs in startup, ensure cleanup contexts are completed. + await self._cleanup_ctx._on_cleanup(self) + + def _make_request( + self, + message: RawRequestMessage, + payload: StreamReader, + protocol: RequestHandler, + writer: AbstractStreamWriter, + task: "asyncio.Task[None]", + _cls: Type[Request] = Request, + ) -> Request: + if TYPE_CHECKING: + assert self._loop is not None + return _cls( + message, + payload, + protocol, + writer, + task, + self._loop, + client_max_size=self._client_max_size, + ) + + def _prepare_middleware(self) -> Iterator[Tuple[Middleware, bool]]: + for m in reversed(self._middlewares): + if getattr(m, "__middleware_version__", None) == 1: + yield m, True + else: + warnings.warn( + f'old-style middleware "{m!r}" deprecated, see #2252', + DeprecationWarning, + stacklevel=2, + ) + yield m, False + + yield _fix_request_current_app(self), True + + async def _handle(self, request: Request) -> StreamResponse: + loop = asyncio.get_event_loop() + debug = loop.get_debug() + match_info = await self._router.resolve(request) + if debug: # pragma: no cover + if not isinstance(match_info, AbstractMatchInfo): + raise TypeError( + "match_info should be AbstractMatchInfo " + "instance, not {!r}".format(match_info) + ) + match_info.add_app(self) + + match_info.freeze() + + request._match_info = match_info + + if request.headers.get(hdrs.EXPECT): + resp = await match_info.expect_handler(request) + await request.writer.drain() + if resp is not None: + return resp + + handler = match_info.handler + + if self._run_middlewares: + # If its a SystemRoute, don't cache building the middlewares since + # they are constructed for every MatchInfoError as a new handler + # is made each time. + if not self._has_legacy_middlewares and not isinstance( + match_info.route, SystemRoute + ): + handler = _cached_build_middleware(handler, match_info.apps) + else: + for app in match_info.apps[::-1]: + for m, new_style in app._middlewares_handlers: # type: ignore[union-attr] + if new_style: + handler = update_wrapper( + partial(m, handler=handler), handler # type: ignore[misc] + ) + else: + handler = await m(app, handler) # type: ignore[arg-type,assignment] + + return await handler(request) + + def __call__(self) -> "Application": + """gunicorn compatibility""" + return self + + def __repr__(self) -> str: + return f"<Application 0x{id(self):x}>" + + def __bool__(self) -> bool: + return True + + +class CleanupError(RuntimeError): + @property + def exceptions(self) -> List[BaseException]: + return cast(List[BaseException], self.args[1]) + + +if TYPE_CHECKING: + _CleanupContextBase = FrozenList[Callable[[Application], AsyncIterator[None]]] +else: + _CleanupContextBase = FrozenList + + +class CleanupContext(_CleanupContextBase): + def __init__(self) -> None: + super().__init__() + self._exits: List[AsyncIterator[None]] = [] + + async def _on_startup(self, app: Application) -> None: + for cb in self: + it = cb(app).__aiter__() + await it.__anext__() + self._exits.append(it) + + async def _on_cleanup(self, app: Application) -> None: + errors = [] + for it in reversed(self._exits): + try: + await it.__anext__() + except StopAsyncIteration: + pass + except (Exception, asyncio.CancelledError) as exc: + errors.append(exc) + else: + errors.append(RuntimeError(f"{it!r} has more than one 'yield'")) + if errors: + if len(errors) == 1: + raise errors[0] + else: + raise CleanupError("Multiple errors on cleanup stage", errors) |