aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/dns/tokenizer.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/dns/tokenizer.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/tokenizer.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/tokenizer.py708
1 files changed, 708 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/tokenizer.py b/.venv/lib/python3.12/site-packages/dns/tokenizer.py
new file mode 100644
index 00000000..ab205bc3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/tokenizer.py
@@ -0,0 +1,708 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2003-2017 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Tokenize DNS zone file format"""
+
+import io
+import sys
+from typing import Any, List, Optional, Tuple
+
+import dns.exception
+import dns.name
+import dns.ttl
+
+_DELIMITERS = {" ", "\t", "\n", ";", "(", ")", '"'}
+_QUOTING_DELIMITERS = {'"'}
+
+EOF = 0
+EOL = 1
+WHITESPACE = 2
+IDENTIFIER = 3
+QUOTED_STRING = 4
+COMMENT = 5
+DELIMITER = 6
+
+
+class UngetBufferFull(dns.exception.DNSException):
+ """An attempt was made to unget a token when the unget buffer was full."""
+
+
+class Token:
+ """A DNS zone file format token.
+
+ ttype: The token type
+ value: The token value
+ has_escape: Does the token value contain escapes?
+ """
+
+ def __init__(
+ self,
+ ttype: int,
+ value: Any = "",
+ has_escape: bool = False,
+ comment: Optional[str] = None,
+ ):
+ """Initialize a token instance."""
+
+ self.ttype = ttype
+ self.value = value
+ self.has_escape = has_escape
+ self.comment = comment
+
+ def is_eof(self) -> bool:
+ return self.ttype == EOF
+
+ def is_eol(self) -> bool:
+ return self.ttype == EOL
+
+ def is_whitespace(self) -> bool:
+ return self.ttype == WHITESPACE
+
+ def is_identifier(self) -> bool:
+ return self.ttype == IDENTIFIER
+
+ def is_quoted_string(self) -> bool:
+ return self.ttype == QUOTED_STRING
+
+ def is_comment(self) -> bool:
+ return self.ttype == COMMENT
+
+ def is_delimiter(self) -> bool: # pragma: no cover (we don't return delimiters yet)
+ return self.ttype == DELIMITER
+
+ def is_eol_or_eof(self) -> bool:
+ return self.ttype == EOL or self.ttype == EOF
+
+ def __eq__(self, other):
+ if not isinstance(other, Token):
+ return False
+ return self.ttype == other.ttype and self.value == other.value
+
+ def __ne__(self, other):
+ if not isinstance(other, Token):
+ return True
+ return self.ttype != other.ttype or self.value != other.value
+
+ def __str__(self):
+ return '%d "%s"' % (self.ttype, self.value)
+
+ def unescape(self) -> "Token":
+ if not self.has_escape:
+ return self
+ unescaped = ""
+ l = len(self.value)
+ i = 0
+ while i < l:
+ c = self.value[i]
+ i += 1
+ if c == "\\":
+ if i >= l: # pragma: no cover (can't happen via get())
+ raise dns.exception.UnexpectedEnd
+ c = self.value[i]
+ i += 1
+ if c.isdigit():
+ if i >= l:
+ raise dns.exception.UnexpectedEnd
+ c2 = self.value[i]
+ i += 1
+ if i >= l:
+ raise dns.exception.UnexpectedEnd
+ c3 = self.value[i]
+ i += 1
+ if not (c2.isdigit() and c3.isdigit()):
+ raise dns.exception.SyntaxError
+ codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
+ if codepoint > 255:
+ raise dns.exception.SyntaxError
+ c = chr(codepoint)
+ unescaped += c
+ return Token(self.ttype, unescaped)
+
+ def unescape_to_bytes(self) -> "Token":
+ # We used to use unescape() for TXT-like records, but this
+ # caused problems as we'd process DNS escapes into Unicode code
+ # points instead of byte values, and then a to_text() of the
+ # processed data would not equal the original input. For
+ # example, \226 in the TXT record would have a to_text() of
+ # \195\162 because we applied UTF-8 encoding to Unicode code
+ # point 226.
+ #
+ # We now apply escapes while converting directly to bytes,
+ # avoiding this double encoding.
+ #
+ # This code also handles cases where the unicode input has
+ # non-ASCII code-points in it by converting it to UTF-8. TXT
+ # records aren't defined for Unicode, but this is the best we
+ # can do to preserve meaning. For example,
+ #
+ # foo\u200bbar
+ #
+ # (where \u200b is Unicode code point 0x200b) will be treated
+ # as if the input had been the UTF-8 encoding of that string,
+ # namely:
+ #
+ # foo\226\128\139bar
+ #
+ unescaped = b""
+ l = len(self.value)
+ i = 0
+ while i < l:
+ c = self.value[i]
+ i += 1
+ if c == "\\":
+ if i >= l: # pragma: no cover (can't happen via get())
+ raise dns.exception.UnexpectedEnd
+ c = self.value[i]
+ i += 1
+ if c.isdigit():
+ if i >= l:
+ raise dns.exception.UnexpectedEnd
+ c2 = self.value[i]
+ i += 1
+ if i >= l:
+ raise dns.exception.UnexpectedEnd
+ c3 = self.value[i]
+ i += 1
+ if not (c2.isdigit() and c3.isdigit()):
+ raise dns.exception.SyntaxError
+ codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
+ if codepoint > 255:
+ raise dns.exception.SyntaxError
+ unescaped += b"%c" % (codepoint)
+ else:
+ # Note that as mentioned above, if c is a Unicode
+ # code point outside of the ASCII range, then this
+ # += is converting that code point to its UTF-8
+ # encoding and appending multiple bytes to
+ # unescaped.
+ unescaped += c.encode()
+ else:
+ unescaped += c.encode()
+ return Token(self.ttype, bytes(unescaped))
+
+
+class Tokenizer:
+ """A DNS zone file format tokenizer.
+
+ A token object is basically a (type, value) tuple. The valid
+ types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING,
+ COMMENT, and DELIMITER.
+
+ file: The file to tokenize
+
+ ungotten_char: The most recently ungotten character, or None.
+
+ ungotten_token: The most recently ungotten token, or None.
+
+ multiline: The current multiline level. This value is increased
+ by one every time a '(' delimiter is read, and decreased by one every time
+ a ')' delimiter is read.
+
+ quoting: This variable is true if the tokenizer is currently
+ reading a quoted string.
+
+ eof: This variable is true if the tokenizer has encountered EOF.
+
+ delimiters: The current delimiter dictionary.
+
+ line_number: The current line number
+
+ filename: A filename that will be returned by the where() method.
+
+ idna_codec: A dns.name.IDNACodec, specifies the IDNA
+ encoder/decoder. If None, the default IDNA 2003
+ encoder/decoder is used.
+ """
+
+ def __init__(
+ self,
+ f: Any = sys.stdin,
+ filename: Optional[str] = None,
+ idna_codec: Optional[dns.name.IDNACodec] = None,
+ ):
+ """Initialize a tokenizer instance.
+
+ f: The file to tokenize. The default is sys.stdin.
+ This parameter may also be a string, in which case the tokenizer
+ will take its input from the contents of the string.
+
+ filename: the name of the filename that the where() method
+ will return.
+
+ idna_codec: A dns.name.IDNACodec, specifies the IDNA
+ encoder/decoder. If None, the default IDNA 2003
+ encoder/decoder is used.
+ """
+
+ if isinstance(f, str):
+ f = io.StringIO(f)
+ if filename is None:
+ filename = "<string>"
+ elif isinstance(f, bytes):
+ f = io.StringIO(f.decode())
+ if filename is None:
+ filename = "<string>"
+ else:
+ if filename is None:
+ if f is sys.stdin:
+ filename = "<stdin>"
+ else:
+ filename = "<file>"
+ self.file = f
+ self.ungotten_char: Optional[str] = None
+ self.ungotten_token: Optional[Token] = None
+ self.multiline = 0
+ self.quoting = False
+ self.eof = False
+ self.delimiters = _DELIMITERS
+ self.line_number = 1
+ assert filename is not None
+ self.filename = filename
+ if idna_codec is None:
+ self.idna_codec: dns.name.IDNACodec = dns.name.IDNA_2003
+ else:
+ self.idna_codec = idna_codec
+
+ def _get_char(self) -> str:
+ """Read a character from input."""
+
+ if self.ungotten_char is None:
+ if self.eof:
+ c = ""
+ else:
+ c = self.file.read(1)
+ if c == "":
+ self.eof = True
+ elif c == "\n":
+ self.line_number += 1
+ else:
+ c = self.ungotten_char
+ self.ungotten_char = None
+ return c
+
+ def where(self) -> Tuple[str, int]:
+ """Return the current location in the input.
+
+ Returns a (string, int) tuple. The first item is the filename of
+ the input, the second is the current line number.
+ """
+
+ return (self.filename, self.line_number)
+
+ def _unget_char(self, c: str) -> None:
+ """Unget a character.
+
+ The unget buffer for characters is only one character large; it is
+ an error to try to unget a character when the unget buffer is not
+ empty.
+
+ c: the character to unget
+ raises UngetBufferFull: there is already an ungotten char
+ """
+
+ if self.ungotten_char is not None:
+ # this should never happen!
+ raise UngetBufferFull # pragma: no cover
+ self.ungotten_char = c
+
+ def skip_whitespace(self) -> int:
+ """Consume input until a non-whitespace character is encountered.
+
+ The non-whitespace character is then ungotten, and the number of
+ whitespace characters consumed is returned.
+
+ If the tokenizer is in multiline mode, then newlines are whitespace.
+
+ Returns the number of characters skipped.
+ """
+
+ skipped = 0
+ while True:
+ c = self._get_char()
+ if c != " " and c != "\t":
+ if (c != "\n") or not self.multiline:
+ self._unget_char(c)
+ return skipped
+ skipped += 1
+
+ def get(self, want_leading: bool = False, want_comment: bool = False) -> Token:
+ """Get the next token.
+
+ want_leading: If True, return a WHITESPACE token if the
+ first character read is whitespace. The default is False.
+
+ want_comment: If True, return a COMMENT token if the
+ first token read is a comment. The default is False.
+
+ Raises dns.exception.UnexpectedEnd: input ended prematurely
+
+ Raises dns.exception.SyntaxError: input was badly formed
+
+ Returns a Token.
+ """
+
+ if self.ungotten_token is not None:
+ utoken = self.ungotten_token
+ self.ungotten_token = None
+ if utoken.is_whitespace():
+ if want_leading:
+ return utoken
+ elif utoken.is_comment():
+ if want_comment:
+ return utoken
+ else:
+ return utoken
+ skipped = self.skip_whitespace()
+ if want_leading and skipped > 0:
+ return Token(WHITESPACE, " ")
+ token = ""
+ ttype = IDENTIFIER
+ has_escape = False
+ while True:
+ c = self._get_char()
+ if c == "" or c in self.delimiters:
+ if c == "" and self.quoting:
+ raise dns.exception.UnexpectedEnd
+ if token == "" and ttype != QUOTED_STRING:
+ if c == "(":
+ self.multiline += 1
+ self.skip_whitespace()
+ continue
+ elif c == ")":
+ if self.multiline <= 0:
+ raise dns.exception.SyntaxError
+ self.multiline -= 1
+ self.skip_whitespace()
+ continue
+ elif c == '"':
+ if not self.quoting:
+ self.quoting = True
+ self.delimiters = _QUOTING_DELIMITERS
+ ttype = QUOTED_STRING
+ continue
+ else:
+ self.quoting = False
+ self.delimiters = _DELIMITERS
+ self.skip_whitespace()
+ continue
+ elif c == "\n":
+ return Token(EOL, "\n")
+ elif c == ";":
+ while 1:
+ c = self._get_char()
+ if c == "\n" or c == "":
+ break
+ token += c
+ if want_comment:
+ self._unget_char(c)
+ return Token(COMMENT, token)
+ elif c == "":
+ if self.multiline:
+ raise dns.exception.SyntaxError(
+ "unbalanced parentheses"
+ )
+ return Token(EOF, comment=token)
+ elif self.multiline:
+ self.skip_whitespace()
+ token = ""
+ continue
+ else:
+ return Token(EOL, "\n", comment=token)
+ else:
+ # This code exists in case we ever want a
+ # delimiter to be returned. It never produces
+ # a token currently.
+ token = c
+ ttype = DELIMITER
+ else:
+ self._unget_char(c)
+ break
+ elif self.quoting and c == "\n":
+ raise dns.exception.SyntaxError("newline in quoted string")
+ elif c == "\\":
+ #
+ # It's an escape. Put it and the next character into
+ # the token; it will be checked later for goodness.
+ #
+ token += c
+ has_escape = True
+ c = self._get_char()
+ if c == "" or (c == "\n" and not self.quoting):
+ raise dns.exception.UnexpectedEnd
+ token += c
+ if token == "" and ttype != QUOTED_STRING:
+ if self.multiline:
+ raise dns.exception.SyntaxError("unbalanced parentheses")
+ ttype = EOF
+ return Token(ttype, token, has_escape)
+
+ def unget(self, token: Token) -> None:
+ """Unget a token.
+
+ The unget buffer for tokens is only one token large; it is
+ an error to try to unget a token when the unget buffer is not
+ empty.
+
+ token: the token to unget
+
+ Raises UngetBufferFull: there is already an ungotten token
+ """
+
+ if self.ungotten_token is not None:
+ raise UngetBufferFull
+ self.ungotten_token = token
+
+ def next(self):
+ """Return the next item in an iteration.
+
+ Returns a Token.
+ """
+
+ token = self.get()
+ if token.is_eof():
+ raise StopIteration
+ return token
+
+ __next__ = next
+
+ def __iter__(self):
+ return self
+
+ # Helpers
+
+ def get_int(self, base: int = 10) -> int:
+ """Read the next token and interpret it as an unsigned integer.
+
+ Raises dns.exception.SyntaxError if not an unsigned integer.
+
+ Returns an int.
+ """
+
+ token = self.get().unescape()
+ if not token.is_identifier():
+ raise dns.exception.SyntaxError("expecting an identifier")
+ if not token.value.isdigit():
+ raise dns.exception.SyntaxError("expecting an integer")
+ return int(token.value, base)
+
+ def get_uint8(self) -> int:
+ """Read the next token and interpret it as an 8-bit unsigned
+ integer.
+
+ Raises dns.exception.SyntaxError if not an 8-bit unsigned integer.
+
+ Returns an int.
+ """
+
+ value = self.get_int()
+ if value < 0 or value > 255:
+ raise dns.exception.SyntaxError(
+ "%d is not an unsigned 8-bit integer" % value
+ )
+ return value
+
+ def get_uint16(self, base: int = 10) -> int:
+ """Read the next token and interpret it as a 16-bit unsigned
+ integer.
+
+ Raises dns.exception.SyntaxError if not a 16-bit unsigned integer.
+
+ Returns an int.
+ """
+
+ value = self.get_int(base=base)
+ if value < 0 or value > 65535:
+ if base == 8:
+ raise dns.exception.SyntaxError(
+ f"{value:o} is not an octal unsigned 16-bit integer"
+ )
+ else:
+ raise dns.exception.SyntaxError(
+ "%d is not an unsigned 16-bit integer" % value
+ )
+ return value
+
+ def get_uint32(self, base: int = 10) -> int:
+ """Read the next token and interpret it as a 32-bit unsigned
+ integer.
+
+ Raises dns.exception.SyntaxError if not a 32-bit unsigned integer.
+
+ Returns an int.
+ """
+
+ value = self.get_int(base=base)
+ if value < 0 or value > 4294967295:
+ raise dns.exception.SyntaxError(
+ "%d is not an unsigned 32-bit integer" % value
+ )
+ return value
+
+ def get_uint48(self, base: int = 10) -> int:
+ """Read the next token and interpret it as a 48-bit unsigned
+ integer.
+
+ Raises dns.exception.SyntaxError if not a 48-bit unsigned integer.
+
+ Returns an int.
+ """
+
+ value = self.get_int(base=base)
+ if value < 0 or value > 281474976710655:
+ raise dns.exception.SyntaxError(
+ "%d is not an unsigned 48-bit integer" % value
+ )
+ return value
+
+ def get_string(self, max_length: Optional[int] = None) -> str:
+ """Read the next token and interpret it as a string.
+
+ Raises dns.exception.SyntaxError if not a string.
+ Raises dns.exception.SyntaxError if token value length
+ exceeds max_length (if specified).
+
+ Returns a string.
+ """
+
+ token = self.get().unescape()
+ if not (token.is_identifier() or token.is_quoted_string()):
+ raise dns.exception.SyntaxError("expecting a string")
+ if max_length and len(token.value) > max_length:
+ raise dns.exception.SyntaxError("string too long")
+ return token.value
+
+ def get_identifier(self) -> str:
+ """Read the next token, which should be an identifier.
+
+ Raises dns.exception.SyntaxError if not an identifier.
+
+ Returns a string.
+ """
+
+ token = self.get().unescape()
+ if not token.is_identifier():
+ raise dns.exception.SyntaxError("expecting an identifier")
+ return token.value
+
+ def get_remaining(self, max_tokens: Optional[int] = None) -> List[Token]:
+ """Return the remaining tokens on the line, until an EOL or EOF is seen.
+
+ max_tokens: If not None, stop after this number of tokens.
+
+ Returns a list of tokens.
+ """
+
+ tokens = []
+ while True:
+ token = self.get()
+ if token.is_eol_or_eof():
+ self.unget(token)
+ break
+ tokens.append(token)
+ if len(tokens) == max_tokens:
+ break
+ return tokens
+
+ def concatenate_remaining_identifiers(self, allow_empty: bool = False) -> str:
+ """Read the remaining tokens on the line, which should be identifiers.
+
+ Raises dns.exception.SyntaxError if there are no remaining tokens,
+ unless `allow_empty=True` is given.
+
+ Raises dns.exception.SyntaxError if a token is seen that is not an
+ identifier.
+
+ Returns a string containing a concatenation of the remaining
+ identifiers.
+ """
+ s = ""
+ while True:
+ token = self.get().unescape()
+ if token.is_eol_or_eof():
+ self.unget(token)
+ break
+ if not token.is_identifier():
+ raise dns.exception.SyntaxError
+ s += token.value
+ if not (allow_empty or s):
+ raise dns.exception.SyntaxError("expecting another identifier")
+ return s
+
+ def as_name(
+ self,
+ token: Token,
+ origin: Optional[dns.name.Name] = None,
+ relativize: bool = False,
+ relativize_to: Optional[dns.name.Name] = None,
+ ) -> dns.name.Name:
+ """Try to interpret the token as a DNS name.
+
+ Raises dns.exception.SyntaxError if not a name.
+
+ Returns a dns.name.Name.
+ """
+ if not token.is_identifier():
+ raise dns.exception.SyntaxError("expecting an identifier")
+ name = dns.name.from_text(token.value, origin, self.idna_codec)
+ return name.choose_relativity(relativize_to or origin, relativize)
+
+ def get_name(
+ self,
+ origin: Optional[dns.name.Name] = None,
+ relativize: bool = False,
+ relativize_to: Optional[dns.name.Name] = None,
+ ) -> dns.name.Name:
+ """Read the next token and interpret it as a DNS name.
+
+ Raises dns.exception.SyntaxError if not a name.
+
+ Returns a dns.name.Name.
+ """
+
+ token = self.get()
+ return self.as_name(token, origin, relativize, relativize_to)
+
+ def get_eol_as_token(self) -> Token:
+ """Read the next token and raise an exception if it isn't EOL or
+ EOF.
+
+ Returns a string.
+ """
+
+ token = self.get()
+ if not token.is_eol_or_eof():
+ raise dns.exception.SyntaxError(
+ 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value)
+ )
+ return token
+
+ def get_eol(self) -> str:
+ return self.get_eol_as_token().value
+
+ def get_ttl(self) -> int:
+ """Read the next token and interpret it as a DNS TTL.
+
+ Raises dns.exception.SyntaxError or dns.ttl.BadTTL if not an
+ identifier or badly formed.
+
+ Returns an int.
+ """
+
+ token = self.get().unescape()
+ if not token.is_identifier():
+ raise dns.exception.SyntaxError("expecting an identifier")
+ return dns.ttl.from_text(token.value)