aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/dns/wire.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/dns/wire.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/wire.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/wire.py89
1 files changed, 89 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/wire.py b/.venv/lib/python3.12/site-packages/dns/wire.py
new file mode 100644
index 00000000..9f9b1573
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/wire.py
@@ -0,0 +1,89 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+import contextlib
+import struct
+from typing import Iterator, Optional, Tuple
+
+import dns.exception
+import dns.name
+
+
+class Parser:
+ def __init__(self, wire: bytes, current: int = 0):
+ self.wire = wire
+ self.current = 0
+ self.end = len(self.wire)
+ if current:
+ self.seek(current)
+ self.furthest = current
+
+ def remaining(self) -> int:
+ return self.end - self.current
+
+ def get_bytes(self, size: int) -> bytes:
+ assert size >= 0
+ if size > self.remaining():
+ raise dns.exception.FormError
+ output = self.wire[self.current : self.current + size]
+ self.current += size
+ self.furthest = max(self.furthest, self.current)
+ return output
+
+ def get_counted_bytes(self, length_size: int = 1) -> bytes:
+ length = int.from_bytes(self.get_bytes(length_size), "big")
+ return self.get_bytes(length)
+
+ def get_remaining(self) -> bytes:
+ return self.get_bytes(self.remaining())
+
+ def get_uint8(self) -> int:
+ return struct.unpack("!B", self.get_bytes(1))[0]
+
+ def get_uint16(self) -> int:
+ return struct.unpack("!H", self.get_bytes(2))[0]
+
+ def get_uint32(self) -> int:
+ return struct.unpack("!I", self.get_bytes(4))[0]
+
+ def get_uint48(self) -> int:
+ return int.from_bytes(self.get_bytes(6), "big")
+
+ def get_struct(self, format: str) -> Tuple:
+ return struct.unpack(format, self.get_bytes(struct.calcsize(format)))
+
+ def get_name(self, origin: Optional["dns.name.Name"] = None) -> "dns.name.Name":
+ name = dns.name.from_wire_parser(self)
+ if origin:
+ name = name.relativize(origin)
+ return name
+
+ def seek(self, where: int) -> None:
+ # Note that seeking to the end is OK! (If you try to read
+ # after such a seek, you'll get an exception as expected.)
+ if where < 0 or where > self.end:
+ raise dns.exception.FormError
+ self.current = where
+
+ @contextlib.contextmanager
+ def restrict_to(self, size: int) -> Iterator:
+ assert size >= 0
+ if size > self.remaining():
+ raise dns.exception.FormError
+ saved_end = self.end
+ try:
+ self.end = self.current + size
+ yield
+ # We make this check here and not in the finally as we
+ # don't want to raise if we're already raising for some
+ # other reason.
+ if self.current != self.end:
+ raise dns.exception.FormError
+ finally:
+ self.end = saved_end
+
+ @contextlib.contextmanager
+ def restore_furthest(self) -> Iterator:
+ try:
+ yield None
+ finally:
+ self.current = self.furthest