about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/dns/_asyncbackend.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/_asyncbackend.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/_asyncbackend.py100
1 files changed, 100 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/_asyncbackend.py b/.venv/lib/python3.12/site-packages/dns/_asyncbackend.py
new file mode 100644
index 00000000..f6760fd0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/_asyncbackend.py
@@ -0,0 +1,100 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# This is a nullcontext for both sync and async.  3.7 has a nullcontext,
+# but it is only for sync use.
+
+
+class NullContext:
+    def __init__(self, enter_result=None):
+        self.enter_result = enter_result
+
+    def __enter__(self):
+        return self.enter_result
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        pass
+
+    async def __aenter__(self):
+        return self.enter_result
+
+    async def __aexit__(self, exc_type, exc_value, traceback):
+        pass
+
+
+# These are declared here so backends can import them without creating
+# circular dependencies with dns.asyncbackend.
+
+
+class Socket:  # pragma: no cover
+    def __init__(self, family: int, type: int):
+        self.family = family
+        self.type = type
+
+    async def close(self):
+        pass
+
+    async def getpeername(self):
+        raise NotImplementedError
+
+    async def getsockname(self):
+        raise NotImplementedError
+
+    async def getpeercert(self, timeout):
+        raise NotImplementedError
+
+    async def __aenter__(self):
+        return self
+
+    async def __aexit__(self, exc_type, exc_value, traceback):
+        await self.close()
+
+
+class DatagramSocket(Socket):  # pragma: no cover
+    async def sendto(self, what, destination, timeout):
+        raise NotImplementedError
+
+    async def recvfrom(self, size, timeout):
+        raise NotImplementedError
+
+
+class StreamSocket(Socket):  # pragma: no cover
+    async def sendall(self, what, timeout):
+        raise NotImplementedError
+
+    async def recv(self, size, timeout):
+        raise NotImplementedError
+
+
+class NullTransport:
+    async def connect_tcp(self, host, port, timeout, local_address):
+        raise NotImplementedError
+
+
+class Backend:  # pragma: no cover
+    def name(self):
+        return "unknown"
+
+    async def make_socket(
+        self,
+        af,
+        socktype,
+        proto=0,
+        source=None,
+        destination=None,
+        timeout=None,
+        ssl_context=None,
+        server_hostname=None,
+    ):
+        raise NotImplementedError
+
+    def datagram_connection_required(self):
+        return False
+
+    async def sleep(self, interval):
+        raise NotImplementedError
+
+    def get_transport_class(self):
+        raise NotImplementedError
+
+    async def wait_for(self, awaitable, timeout):
+        raise NotImplementedError