about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiohttp/web_app.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/web_app.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohttp/web_app.py620
1 files changed, 620 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/web_app.py b/.venv/lib/python3.12/site-packages/aiohttp/web_app.py
new file mode 100644
index 00000000..4bdc5403
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohttp/web_app.py
@@ -0,0 +1,620 @@
+import asyncio
+import logging
+import warnings
+from functools import lru_cache, partial, update_wrapper
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    AsyncIterator,
+    Awaitable,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Optional,
+    Sequence,
+    Tuple,
+    Type,
+    TypeVar,
+    Union,
+    cast,
+    overload,
+)
+
+from aiosignal import Signal
+from frozenlist import FrozenList
+
+from . import hdrs
+from .abc import (
+    AbstractAccessLogger,
+    AbstractMatchInfo,
+    AbstractRouter,
+    AbstractStreamWriter,
+)
+from .helpers import DEBUG, AppKey
+from .http_parser import RawRequestMessage
+from .log import web_logger
+from .streams import StreamReader
+from .typedefs import Handler, Middleware
+from .web_exceptions import NotAppKeyWarning
+from .web_log import AccessLogger
+from .web_middlewares import _fix_request_current_app
+from .web_protocol import RequestHandler
+from .web_request import Request
+from .web_response import StreamResponse
+from .web_routedef import AbstractRouteDef
+from .web_server import Server
+from .web_urldispatcher import (
+    AbstractResource,
+    AbstractRoute,
+    Domain,
+    MaskDomain,
+    MatchedSubAppResource,
+    PrefixedSubAppResource,
+    SystemRoute,
+    UrlDispatcher,
+)
+
+__all__ = ("Application", "CleanupError")
+
+
+if TYPE_CHECKING:
+    _AppSignal = Signal[Callable[["Application"], Awaitable[None]]]
+    _RespPrepareSignal = Signal[Callable[[Request, StreamResponse], Awaitable[None]]]
+    _Middlewares = FrozenList[Middleware]
+    _MiddlewaresHandlers = Optional[Sequence[Tuple[Middleware, bool]]]
+    _Subapps = List["Application"]
+else:
+    # No type checker mode, skip types
+    _AppSignal = Signal
+    _RespPrepareSignal = Signal
+    _Middlewares = FrozenList
+    _MiddlewaresHandlers = Optional[Sequence]
+    _Subapps = List
+
+_T = TypeVar("_T")
+_U = TypeVar("_U")
+_Resource = TypeVar("_Resource", bound=AbstractResource)
+
+
+def _build_middlewares(
+    handler: Handler, apps: Tuple["Application", ...]
+) -> Callable[[Request], Awaitable[StreamResponse]]:
+    """Apply middlewares to handler."""
+    for app in apps[::-1]:
+        for m, _ in app._middlewares_handlers:  # type: ignore[union-attr]
+            handler = update_wrapper(partial(m, handler=handler), handler)  # type: ignore[misc]
+    return handler
+
+
+_cached_build_middleware = lru_cache(maxsize=1024)(_build_middlewares)
+
+
+class Application(MutableMapping[Union[str, AppKey[Any]], Any]):
+    ATTRS = frozenset(
+        [
+            "logger",
+            "_debug",
+            "_router",
+            "_loop",
+            "_handler_args",
+            "_middlewares",
+            "_middlewares_handlers",
+            "_has_legacy_middlewares",
+            "_run_middlewares",
+            "_state",
+            "_frozen",
+            "_pre_frozen",
+            "_subapps",
+            "_on_response_prepare",
+            "_on_startup",
+            "_on_shutdown",
+            "_on_cleanup",
+            "_client_max_size",
+            "_cleanup_ctx",
+        ]
+    )
+
+    def __init__(
+        self,
+        *,
+        logger: logging.Logger = web_logger,
+        router: Optional[UrlDispatcher] = None,
+        middlewares: Iterable[Middleware] = (),
+        handler_args: Optional[Mapping[str, Any]] = None,
+        client_max_size: int = 1024**2,
+        loop: Optional[asyncio.AbstractEventLoop] = None,
+        debug: Any = ...,  # mypy doesn't support ellipsis
+    ) -> None:
+        if router is None:
+            router = UrlDispatcher()
+        else:
+            warnings.warn(
+                "router argument is deprecated", DeprecationWarning, stacklevel=2
+            )
+        assert isinstance(router, AbstractRouter), router
+
+        if loop is not None:
+            warnings.warn(
+                "loop argument is deprecated", DeprecationWarning, stacklevel=2
+            )
+
+        if debug is not ...:
+            warnings.warn(
+                "debug argument is deprecated", DeprecationWarning, stacklevel=2
+            )
+        self._debug = debug
+        self._router: UrlDispatcher = router
+        self._loop = loop
+        self._handler_args = handler_args
+        self.logger = logger
+
+        self._middlewares: _Middlewares = FrozenList(middlewares)
+
+        # initialized on freezing
+        self._middlewares_handlers: _MiddlewaresHandlers = None
+        # initialized on freezing
+        self._run_middlewares: Optional[bool] = None
+        self._has_legacy_middlewares: bool = True
+
+        self._state: Dict[Union[AppKey[Any], str], object] = {}
+        self._frozen = False
+        self._pre_frozen = False
+        self._subapps: _Subapps = []
+
+        self._on_response_prepare: _RespPrepareSignal = Signal(self)
+        self._on_startup: _AppSignal = Signal(self)
+        self._on_shutdown: _AppSignal = Signal(self)
+        self._on_cleanup: _AppSignal = Signal(self)
+        self._cleanup_ctx = CleanupContext()
+        self._on_startup.append(self._cleanup_ctx._on_startup)
+        self._on_cleanup.append(self._cleanup_ctx._on_cleanup)
+        self._client_max_size = client_max_size
+
+    def __init_subclass__(cls: Type["Application"]) -> None:
+        warnings.warn(
+            "Inheritance class {} from web.Application "
+            "is discouraged".format(cls.__name__),
+            DeprecationWarning,
+            stacklevel=3,
+        )
+
+    if DEBUG:  # pragma: no cover
+
+        def __setattr__(self, name: str, val: Any) -> None:
+            if name not in self.ATTRS:
+                warnings.warn(
+                    "Setting custom web.Application.{} attribute "
+                    "is discouraged".format(name),
+                    DeprecationWarning,
+                    stacklevel=2,
+                )
+            super().__setattr__(name, val)
+
+    # MutableMapping API
+
+    def __eq__(self, other: object) -> bool:
+        return self is other
+
+    @overload  # type: ignore[override]
+    def __getitem__(self, key: AppKey[_T]) -> _T: ...
+
+    @overload
+    def __getitem__(self, key: str) -> Any: ...
+
+    def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any:
+        return self._state[key]
+
+    def _check_frozen(self) -> None:
+        if self._frozen:
+            warnings.warn(
+                "Changing state of started or joined application is deprecated",
+                DeprecationWarning,
+                stacklevel=3,
+            )
+
+    @overload  # type: ignore[override]
+    def __setitem__(self, key: AppKey[_T], value: _T) -> None: ...
+
+    @overload
+    def __setitem__(self, key: str, value: Any) -> None: ...
+
+    def __setitem__(self, key: Union[str, AppKey[_T]], value: Any) -> None:
+        self._check_frozen()
+        if not isinstance(key, AppKey):
+            warnings.warn(
+                "It is recommended to use web.AppKey instances for keys.\n"
+                + "https://docs.aiohttp.org/en/stable/web_advanced.html"
+                + "#application-s-config",
+                category=NotAppKeyWarning,
+                stacklevel=2,
+            )
+        self._state[key] = value
+
+    def __delitem__(self, key: Union[str, AppKey[_T]]) -> None:
+        self._check_frozen()
+        del self._state[key]
+
+    def __len__(self) -> int:
+        return len(self._state)
+
+    def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]:
+        return iter(self._state)
+
+    def __hash__(self) -> int:
+        return id(self)
+
+    @overload  # type: ignore[override]
+    def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]: ...
+
+    @overload
+    def get(self, key: AppKey[_T], default: _U) -> Union[_T, _U]: ...
+
+    @overload
+    def get(self, key: str, default: Any = ...) -> Any: ...
+
+    def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any:
+        return self._state.get(key, default)
+
+    ########
+    @property
+    def loop(self) -> asyncio.AbstractEventLoop:
+        # Technically the loop can be None
+        # but we mask it by explicit type cast
+        # to provide more convenient type annotation
+        warnings.warn("loop property is deprecated", DeprecationWarning, stacklevel=2)
+        return cast(asyncio.AbstractEventLoop, self._loop)
+
+    def _set_loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
+        if loop is None:
+            loop = asyncio.get_event_loop()
+        if self._loop is not None and self._loop is not loop:
+            raise RuntimeError(
+                "web.Application instance initialized with different loop"
+            )
+
+        self._loop = loop
+
+        # set loop debug
+        if self._debug is ...:
+            self._debug = loop.get_debug()
+
+        # set loop to sub applications
+        for subapp in self._subapps:
+            subapp._set_loop(loop)
+
+    @property
+    def pre_frozen(self) -> bool:
+        return self._pre_frozen
+
+    def pre_freeze(self) -> None:
+        if self._pre_frozen:
+            return
+
+        self._pre_frozen = True
+        self._middlewares.freeze()
+        self._router.freeze()
+        self._on_response_prepare.freeze()
+        self._cleanup_ctx.freeze()
+        self._on_startup.freeze()
+        self._on_shutdown.freeze()
+        self._on_cleanup.freeze()
+        self._middlewares_handlers = tuple(self._prepare_middleware())
+        self._has_legacy_middlewares = any(
+            not new_style for _, new_style in self._middlewares_handlers
+        )
+
+        # If current app and any subapp do not have middlewares avoid run all
+        # of the code footprint that it implies, which have a middleware
+        # hardcoded per app that sets up the current_app attribute. If no
+        # middlewares are configured the handler will receive the proper
+        # current_app without needing all of this code.
+        self._run_middlewares = True if self.middlewares else False
+
+        for subapp in self._subapps:
+            subapp.pre_freeze()
+            self._run_middlewares = self._run_middlewares or subapp._run_middlewares
+
+    @property
+    def frozen(self) -> bool:
+        return self._frozen
+
+    def freeze(self) -> None:
+        if self._frozen:
+            return
+
+        self.pre_freeze()
+        self._frozen = True
+        for subapp in self._subapps:
+            subapp.freeze()
+
+    @property
+    def debug(self) -> bool:
+        warnings.warn("debug property is deprecated", DeprecationWarning, stacklevel=2)
+        return self._debug  # type: ignore[no-any-return]
+
+    def _reg_subapp_signals(self, subapp: "Application") -> None:
+        def reg_handler(signame: str) -> None:
+            subsig = getattr(subapp, signame)
+
+            async def handler(app: "Application") -> None:
+                await subsig.send(subapp)
+
+            appsig = getattr(self, signame)
+            appsig.append(handler)
+
+        reg_handler("on_startup")
+        reg_handler("on_shutdown")
+        reg_handler("on_cleanup")
+
+    def add_subapp(self, prefix: str, subapp: "Application") -> PrefixedSubAppResource:
+        if not isinstance(prefix, str):
+            raise TypeError("Prefix must be str")
+        prefix = prefix.rstrip("/")
+        if not prefix:
+            raise ValueError("Prefix cannot be empty")
+        factory = partial(PrefixedSubAppResource, prefix, subapp)
+        return self._add_subapp(factory, subapp)
+
+    def _add_subapp(
+        self, resource_factory: Callable[[], _Resource], subapp: "Application"
+    ) -> _Resource:
+        if self.frozen:
+            raise RuntimeError("Cannot add sub application to frozen application")
+        if subapp.frozen:
+            raise RuntimeError("Cannot add frozen application")
+        resource = resource_factory()
+        self.router.register_resource(resource)
+        self._reg_subapp_signals(subapp)
+        self._subapps.append(subapp)
+        subapp.pre_freeze()
+        if self._loop is not None:
+            subapp._set_loop(self._loop)
+        return resource
+
+    def add_domain(self, domain: str, subapp: "Application") -> MatchedSubAppResource:
+        if not isinstance(domain, str):
+            raise TypeError("Domain must be str")
+        elif "*" in domain:
+            rule: Domain = MaskDomain(domain)
+        else:
+            rule = Domain(domain)
+        factory = partial(MatchedSubAppResource, rule, subapp)
+        return self._add_subapp(factory, subapp)
+
+    def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
+        return self.router.add_routes(routes)
+
+    @property
+    def on_response_prepare(self) -> _RespPrepareSignal:
+        return self._on_response_prepare
+
+    @property
+    def on_startup(self) -> _AppSignal:
+        return self._on_startup
+
+    @property
+    def on_shutdown(self) -> _AppSignal:
+        return self._on_shutdown
+
+    @property
+    def on_cleanup(self) -> _AppSignal:
+        return self._on_cleanup
+
+    @property
+    def cleanup_ctx(self) -> "CleanupContext":
+        return self._cleanup_ctx
+
+    @property
+    def router(self) -> UrlDispatcher:
+        return self._router
+
+    @property
+    def middlewares(self) -> _Middlewares:
+        return self._middlewares
+
+    def _make_handler(
+        self,
+        *,
+        loop: Optional[asyncio.AbstractEventLoop] = None,
+        access_log_class: Type[AbstractAccessLogger] = AccessLogger,
+        **kwargs: Any,
+    ) -> Server:
+
+        if not issubclass(access_log_class, AbstractAccessLogger):
+            raise TypeError(
+                "access_log_class must be subclass of "
+                "aiohttp.abc.AbstractAccessLogger, got {}".format(access_log_class)
+            )
+
+        self._set_loop(loop)
+        self.freeze()
+
+        kwargs["debug"] = self._debug
+        kwargs["access_log_class"] = access_log_class
+        if self._handler_args:
+            for k, v in self._handler_args.items():
+                kwargs[k] = v
+
+        return Server(
+            self._handle,  # type: ignore[arg-type]
+            request_factory=self._make_request,
+            loop=self._loop,
+            **kwargs,
+        )
+
+    def make_handler(
+        self,
+        *,
+        loop: Optional[asyncio.AbstractEventLoop] = None,
+        access_log_class: Type[AbstractAccessLogger] = AccessLogger,
+        **kwargs: Any,
+    ) -> Server:
+
+        warnings.warn(
+            "Application.make_handler(...) is deprecated, use AppRunner API instead",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+
+        return self._make_handler(
+            loop=loop, access_log_class=access_log_class, **kwargs
+        )
+
+    async def startup(self) -> None:
+        """Causes on_startup signal
+
+        Should be called in the event loop along with the request handler.
+        """
+        await self.on_startup.send(self)
+
+    async def shutdown(self) -> None:
+        """Causes on_shutdown signal
+
+        Should be called before cleanup()
+        """
+        await self.on_shutdown.send(self)
+
+    async def cleanup(self) -> None:
+        """Causes on_cleanup signal
+
+        Should be called after shutdown()
+        """
+        if self.on_cleanup.frozen:
+            await self.on_cleanup.send(self)
+        else:
+            # If an exception occurs in startup, ensure cleanup contexts are completed.
+            await self._cleanup_ctx._on_cleanup(self)
+
+    def _make_request(
+        self,
+        message: RawRequestMessage,
+        payload: StreamReader,
+        protocol: RequestHandler,
+        writer: AbstractStreamWriter,
+        task: "asyncio.Task[None]",
+        _cls: Type[Request] = Request,
+    ) -> Request:
+        if TYPE_CHECKING:
+            assert self._loop is not None
+        return _cls(
+            message,
+            payload,
+            protocol,
+            writer,
+            task,
+            self._loop,
+            client_max_size=self._client_max_size,
+        )
+
+    def _prepare_middleware(self) -> Iterator[Tuple[Middleware, bool]]:
+        for m in reversed(self._middlewares):
+            if getattr(m, "__middleware_version__", None) == 1:
+                yield m, True
+            else:
+                warnings.warn(
+                    f'old-style middleware "{m!r}" deprecated, see #2252',
+                    DeprecationWarning,
+                    stacklevel=2,
+                )
+                yield m, False
+
+        yield _fix_request_current_app(self), True
+
+    async def _handle(self, request: Request) -> StreamResponse:
+        loop = asyncio.get_event_loop()
+        debug = loop.get_debug()
+        match_info = await self._router.resolve(request)
+        if debug:  # pragma: no cover
+            if not isinstance(match_info, AbstractMatchInfo):
+                raise TypeError(
+                    "match_info should be AbstractMatchInfo "
+                    "instance, not {!r}".format(match_info)
+                )
+        match_info.add_app(self)
+
+        match_info.freeze()
+
+        request._match_info = match_info
+
+        if request.headers.get(hdrs.EXPECT):
+            resp = await match_info.expect_handler(request)
+            await request.writer.drain()
+            if resp is not None:
+                return resp
+
+        handler = match_info.handler
+
+        if self._run_middlewares:
+            # If its a SystemRoute, don't cache building the middlewares since
+            # they are constructed for every MatchInfoError as a new handler
+            # is made each time.
+            if not self._has_legacy_middlewares and not isinstance(
+                match_info.route, SystemRoute
+            ):
+                handler = _cached_build_middleware(handler, match_info.apps)
+            else:
+                for app in match_info.apps[::-1]:
+                    for m, new_style in app._middlewares_handlers:  # type: ignore[union-attr]
+                        if new_style:
+                            handler = update_wrapper(
+                                partial(m, handler=handler), handler  # type: ignore[misc]
+                            )
+                        else:
+                            handler = await m(app, handler)  # type: ignore[arg-type,assignment]
+
+        return await handler(request)
+
+    def __call__(self) -> "Application":
+        """gunicorn compatibility"""
+        return self
+
+    def __repr__(self) -> str:
+        return f"<Application 0x{id(self):x}>"
+
+    def __bool__(self) -> bool:
+        return True
+
+
+class CleanupError(RuntimeError):
+    @property
+    def exceptions(self) -> List[BaseException]:
+        return cast(List[BaseException], self.args[1])
+
+
+if TYPE_CHECKING:
+    _CleanupContextBase = FrozenList[Callable[[Application], AsyncIterator[None]]]
+else:
+    _CleanupContextBase = FrozenList
+
+
+class CleanupContext(_CleanupContextBase):
+    def __init__(self) -> None:
+        super().__init__()
+        self._exits: List[AsyncIterator[None]] = []
+
+    async def _on_startup(self, app: Application) -> None:
+        for cb in self:
+            it = cb(app).__aiter__()
+            await it.__anext__()
+            self._exits.append(it)
+
+    async def _on_cleanup(self, app: Application) -> None:
+        errors = []
+        for it in reversed(self._exits):
+            try:
+                await it.__anext__()
+            except StopAsyncIteration:
+                pass
+            except (Exception, asyncio.CancelledError) as exc:
+                errors.append(exc)
+            else:
+                errors.append(RuntimeError(f"{it!r} has more than one 'yield'"))
+        if errors:
+            if len(errors) == 1:
+                raise errors[0]
+            else:
+                raise CleanupError("Multiple errors on cleanup stage", errors)