aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/dns/_asyncbackend.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/_asyncbackend.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/_asyncbackend.py100
1 files changed, 100 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/_asyncbackend.py b/.venv/lib/python3.12/site-packages/dns/_asyncbackend.py
new file mode 100644
index 00000000..f6760fd0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/_asyncbackend.py
@@ -0,0 +1,100 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# This is a nullcontext for both sync and async. 3.7 has a nullcontext,
+# but it is only for sync use.
+
+
+class NullContext:
+ def __init__(self, enter_result=None):
+ self.enter_result = enter_result
+
+ def __enter__(self):
+ return self.enter_result
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+ async def __aenter__(self):
+ return self.enter_result
+
+ async def __aexit__(self, exc_type, exc_value, traceback):
+ pass
+
+
+# These are declared here so backends can import them without creating
+# circular dependencies with dns.asyncbackend.
+
+
+class Socket: # pragma: no cover
+ def __init__(self, family: int, type: int):
+ self.family = family
+ self.type = type
+
+ async def close(self):
+ pass
+
+ async def getpeername(self):
+ raise NotImplementedError
+
+ async def getsockname(self):
+ raise NotImplementedError
+
+ async def getpeercert(self, timeout):
+ raise NotImplementedError
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_value, traceback):
+ await self.close()
+
+
+class DatagramSocket(Socket): # pragma: no cover
+ async def sendto(self, what, destination, timeout):
+ raise NotImplementedError
+
+ async def recvfrom(self, size, timeout):
+ raise NotImplementedError
+
+
+class StreamSocket(Socket): # pragma: no cover
+ async def sendall(self, what, timeout):
+ raise NotImplementedError
+
+ async def recv(self, size, timeout):
+ raise NotImplementedError
+
+
+class NullTransport:
+ async def connect_tcp(self, host, port, timeout, local_address):
+ raise NotImplementedError
+
+
+class Backend: # pragma: no cover
+ def name(self):
+ return "unknown"
+
+ async def make_socket(
+ self,
+ af,
+ socktype,
+ proto=0,
+ source=None,
+ destination=None,
+ timeout=None,
+ ssl_context=None,
+ server_hostname=None,
+ ):
+ raise NotImplementedError
+
+ def datagram_connection_required(self):
+ return False
+
+ async def sleep(self, interval):
+ raise NotImplementedError
+
+ def get_transport_class(self):
+ raise NotImplementedError
+
+ async def wait_for(self, awaitable, timeout):
+ raise NotImplementedError