aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/asgiref/typing.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/asgiref/typing.py')
-rw-r--r--.venv/lib/python3.12/site-packages/asgiref/typing.py278
1 files changed, 278 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asgiref/typing.py b/.venv/lib/python3.12/site-packages/asgiref/typing.py
new file mode 100644
index 00000000..71c25ed8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/asgiref/typing.py
@@ -0,0 +1,278 @@
+import sys
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ Iterable,
+ Literal,
+ Optional,
+ Protocol,
+ Tuple,
+ Type,
+ TypedDict,
+ Union,
+)
+
+if sys.version_info >= (3, 11):
+ from typing import NotRequired
+else:
+ from typing_extensions import NotRequired
+
+__all__ = (
+ "ASGIVersions",
+ "HTTPScope",
+ "WebSocketScope",
+ "LifespanScope",
+ "WWWScope",
+ "Scope",
+ "HTTPRequestEvent",
+ "HTTPResponseStartEvent",
+ "HTTPResponseBodyEvent",
+ "HTTPResponseTrailersEvent",
+ "HTTPResponsePathsendEvent",
+ "HTTPServerPushEvent",
+ "HTTPDisconnectEvent",
+ "WebSocketConnectEvent",
+ "WebSocketAcceptEvent",
+ "WebSocketReceiveEvent",
+ "WebSocketSendEvent",
+ "WebSocketResponseStartEvent",
+ "WebSocketResponseBodyEvent",
+ "WebSocketDisconnectEvent",
+ "WebSocketCloseEvent",
+ "LifespanStartupEvent",
+ "LifespanShutdownEvent",
+ "LifespanStartupCompleteEvent",
+ "LifespanStartupFailedEvent",
+ "LifespanShutdownCompleteEvent",
+ "LifespanShutdownFailedEvent",
+ "ASGIReceiveEvent",
+ "ASGISendEvent",
+ "ASGIReceiveCallable",
+ "ASGISendCallable",
+ "ASGI2Protocol",
+ "ASGI2Application",
+ "ASGI3Application",
+ "ASGIApplication",
+)
+
+
+class ASGIVersions(TypedDict):
+ spec_version: str
+ version: Union[Literal["2.0"], Literal["3.0"]]
+
+
+class HTTPScope(TypedDict):
+ type: Literal["http"]
+ asgi: ASGIVersions
+ http_version: str
+ method: str
+ scheme: str
+ path: str
+ raw_path: bytes
+ query_string: bytes
+ root_path: str
+ headers: Iterable[Tuple[bytes, bytes]]
+ client: Optional[Tuple[str, int]]
+ server: Optional[Tuple[str, Optional[int]]]
+ state: NotRequired[Dict[str, Any]]
+ extensions: Optional[Dict[str, Dict[object, object]]]
+
+
+class WebSocketScope(TypedDict):
+ type: Literal["websocket"]
+ asgi: ASGIVersions
+ http_version: str
+ scheme: str
+ path: str
+ raw_path: bytes
+ query_string: bytes
+ root_path: str
+ headers: Iterable[Tuple[bytes, bytes]]
+ client: Optional[Tuple[str, int]]
+ server: Optional[Tuple[str, Optional[int]]]
+ subprotocols: Iterable[str]
+ state: NotRequired[Dict[str, Any]]
+ extensions: Optional[Dict[str, Dict[object, object]]]
+
+
+class LifespanScope(TypedDict):
+ type: Literal["lifespan"]
+ asgi: ASGIVersions
+ state: NotRequired[Dict[str, Any]]
+
+
+WWWScope = Union[HTTPScope, WebSocketScope]
+Scope = Union[HTTPScope, WebSocketScope, LifespanScope]
+
+
+class HTTPRequestEvent(TypedDict):
+ type: Literal["http.request"]
+ body: bytes
+ more_body: bool
+
+
+class HTTPResponseDebugEvent(TypedDict):
+ type: Literal["http.response.debug"]
+ info: Dict[str, object]
+
+
+class HTTPResponseStartEvent(TypedDict):
+ type: Literal["http.response.start"]
+ status: int
+ headers: Iterable[Tuple[bytes, bytes]]
+ trailers: bool
+
+
+class HTTPResponseBodyEvent(TypedDict):
+ type: Literal["http.response.body"]
+ body: bytes
+ more_body: bool
+
+
+class HTTPResponseTrailersEvent(TypedDict):
+ type: Literal["http.response.trailers"]
+ headers: Iterable[Tuple[bytes, bytes]]
+ more_trailers: bool
+
+
+class HTTPResponsePathsendEvent(TypedDict):
+ type: Literal["http.response.pathsend"]
+ path: str
+
+
+class HTTPServerPushEvent(TypedDict):
+ type: Literal["http.response.push"]
+ path: str
+ headers: Iterable[Tuple[bytes, bytes]]
+
+
+class HTTPDisconnectEvent(TypedDict):
+ type: Literal["http.disconnect"]
+
+
+class WebSocketConnectEvent(TypedDict):
+ type: Literal["websocket.connect"]
+
+
+class WebSocketAcceptEvent(TypedDict):
+ type: Literal["websocket.accept"]
+ subprotocol: Optional[str]
+ headers: Iterable[Tuple[bytes, bytes]]
+
+
+class WebSocketReceiveEvent(TypedDict):
+ type: Literal["websocket.receive"]
+ bytes: Optional[bytes]
+ text: Optional[str]
+
+
+class WebSocketSendEvent(TypedDict):
+ type: Literal["websocket.send"]
+ bytes: Optional[bytes]
+ text: Optional[str]
+
+
+class WebSocketResponseStartEvent(TypedDict):
+ type: Literal["websocket.http.response.start"]
+ status: int
+ headers: Iterable[Tuple[bytes, bytes]]
+
+
+class WebSocketResponseBodyEvent(TypedDict):
+ type: Literal["websocket.http.response.body"]
+ body: bytes
+ more_body: bool
+
+
+class WebSocketDisconnectEvent(TypedDict):
+ type: Literal["websocket.disconnect"]
+ code: int
+
+
+class WebSocketCloseEvent(TypedDict):
+ type: Literal["websocket.close"]
+ code: int
+ reason: Optional[str]
+
+
+class LifespanStartupEvent(TypedDict):
+ type: Literal["lifespan.startup"]
+
+
+class LifespanShutdownEvent(TypedDict):
+ type: Literal["lifespan.shutdown"]
+
+
+class LifespanStartupCompleteEvent(TypedDict):
+ type: Literal["lifespan.startup.complete"]
+
+
+class LifespanStartupFailedEvent(TypedDict):
+ type: Literal["lifespan.startup.failed"]
+ message: str
+
+
+class LifespanShutdownCompleteEvent(TypedDict):
+ type: Literal["lifespan.shutdown.complete"]
+
+
+class LifespanShutdownFailedEvent(TypedDict):
+ type: Literal["lifespan.shutdown.failed"]
+ message: str
+
+
+ASGIReceiveEvent = Union[
+ HTTPRequestEvent,
+ HTTPDisconnectEvent,
+ WebSocketConnectEvent,
+ WebSocketReceiveEvent,
+ WebSocketDisconnectEvent,
+ LifespanStartupEvent,
+ LifespanShutdownEvent,
+]
+
+
+ASGISendEvent = Union[
+ HTTPResponseStartEvent,
+ HTTPResponseBodyEvent,
+ HTTPResponseTrailersEvent,
+ HTTPServerPushEvent,
+ HTTPDisconnectEvent,
+ WebSocketAcceptEvent,
+ WebSocketSendEvent,
+ WebSocketResponseStartEvent,
+ WebSocketResponseBodyEvent,
+ WebSocketCloseEvent,
+ LifespanStartupCompleteEvent,
+ LifespanStartupFailedEvent,
+ LifespanShutdownCompleteEvent,
+ LifespanShutdownFailedEvent,
+]
+
+
+ASGIReceiveCallable = Callable[[], Awaitable[ASGIReceiveEvent]]
+ASGISendCallable = Callable[[ASGISendEvent], Awaitable[None]]
+
+
+class ASGI2Protocol(Protocol):
+ def __init__(self, scope: Scope) -> None:
+ ...
+
+ async def __call__(
+ self, receive: ASGIReceiveCallable, send: ASGISendCallable
+ ) -> None:
+ ...
+
+
+ASGI2Application = Type[ASGI2Protocol]
+ASGI3Application = Callable[
+ [
+ Scope,
+ ASGIReceiveCallable,
+ ASGISendCallable,
+ ],
+ Awaitable[None],
+]
+ASGIApplication = Union[ASGI2Application, ASGI3Application]