about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/lark/lexer.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/lark/lexer.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/lark/lexer.py')
-rw-r--r--.venv/lib/python3.12/site-packages/lark/lexer.py506
1 files changed, 506 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/lark/lexer.py b/.venv/lib/python3.12/site-packages/lark/lexer.py
new file mode 100644
index 00000000..a82cc180
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/lark/lexer.py
@@ -0,0 +1,506 @@
+# Lexer Implementation
+
+import re
+
+from .utils import Str, classify, get_regexp_width, Py36, Serialize, suppress
+from .exceptions import UnexpectedCharacters, LexError, UnexpectedToken
+
+###{standalone
+from warnings import warn
+from copy import copy
+
+
+class Pattern(Serialize):
+    raw = None
+    type = None
+
+    def __init__(self, value, flags=(), raw=None):
+        self.value = value
+        self.flags = frozenset(flags)
+        self.raw = raw
+
+    def __repr__(self):
+        return repr(self.to_regexp())
+
+    # Pattern Hashing assumes all subclasses have a different priority!
+    def __hash__(self):
+        return hash((type(self), self.value, self.flags))
+
+    def __eq__(self, other):
+        return type(self) == type(other) and self.value == other.value and self.flags == other.flags
+
+    def to_regexp(self):
+        raise NotImplementedError()
+
+    def min_width(self):
+        raise NotImplementedError()
+
+    def max_width(self):
+        raise NotImplementedError()
+
+    if Py36:
+        # Python 3.6 changed syntax for flags in regular expression
+        def _get_flags(self, value):
+            for f in self.flags:
+                value = ('(?%s:%s)' % (f, value))
+            return value
+
+    else:
+        def _get_flags(self, value):
+            for f in self.flags:
+                value = ('(?%s)' % f) + value
+            return value
+
+
+
+class PatternStr(Pattern):
+    __serialize_fields__ = 'value', 'flags'
+
+    type = "str"
+
+    def to_regexp(self):
+        return self._get_flags(re.escape(self.value))
+
+    @property
+    def min_width(self):
+        return len(self.value)
+    max_width = min_width
+
+
+class PatternRE(Pattern):
+    __serialize_fields__ = 'value', 'flags', '_width'
+
+    type = "re"
+
+    def to_regexp(self):
+        return self._get_flags(self.value)
+
+    _width = None
+    def _get_width(self):
+        if self._width is None:
+            self._width = get_regexp_width(self.to_regexp())
+        return self._width
+
+    @property
+    def min_width(self):
+        return self._get_width()[0]
+
+    @property
+    def max_width(self):
+        return self._get_width()[1]
+
+
+class TerminalDef(Serialize):
+    __serialize_fields__ = 'name', 'pattern', 'priority'
+    __serialize_namespace__ = PatternStr, PatternRE
+
+    def __init__(self, name, pattern, priority=1):
+        assert isinstance(pattern, Pattern), pattern
+        self.name = name
+        self.pattern = pattern
+        self.priority = priority
+
+    def __repr__(self):
+        return '%s(%r, %r)' % (type(self).__name__, self.name, self.pattern)
+
+    def user_repr(self):
+        if self.name.startswith('__'): # We represent a generated terminal
+            return self.pattern.raw or self.name
+        else:
+            return self.name
+
+
+class Token(Str):
+    """A string with meta-information, that is produced by the lexer.
+
+    When parsing text, the resulting chunks of the input that haven't been discarded,
+    will end up in the tree as Token instances. The Token class inherits from Python's ``str``,
+    so normal string comparisons and operations will work as expected.
+
+    Attributes:
+        type: Name of the token (as specified in grammar)
+        value: Value of the token (redundant, as ``token.value == token`` will always be true)
+        start_pos: The index of the token in the text
+        line: The line of the token in the text (starting with 1)
+        column: The column of the token in the text (starting with 1)
+        end_line: The line where the token ends
+        end_column: The next column after the end of the token. For example,
+            if the token is a single character with a column value of 4,
+            end_column will be 5.
+        end_pos: the index where the token ends (basically ``start_pos + len(token)``)
+    """
+    __slots__ = ('type', 'start_pos', 'value', 'line', 'column', 'end_line', 'end_column', 'end_pos')
+
+    def __new__(cls, type_, value, start_pos=None, line=None, column=None, end_line=None, end_column=None, end_pos=None, pos_in_stream=None):
+        try:
+            inst = super(Token, cls).__new__(cls, value)
+        except UnicodeDecodeError:
+            value = value.decode('latin1')
+            inst = super(Token, cls).__new__(cls, value)
+
+        inst.type = type_
+        inst.start_pos = start_pos if start_pos is not None else pos_in_stream
+        inst.value = value
+        inst.line = line
+        inst.column = column
+        inst.end_line = end_line
+        inst.end_column = end_column
+        inst.end_pos = end_pos
+        return inst
+
+    @property
+    def pos_in_stream(self):
+        warn("Attribute Token.pos_in_stream was renamed to Token.start_pos", DeprecationWarning, 2)
+        return self.start_pos
+
+    def update(self, type_=None, value=None):
+        return Token.new_borrow_pos(
+            type_ if type_ is not None else self.type,
+            value if value is not None else self.value,
+            self
+        )
+
+    @classmethod
+    def new_borrow_pos(cls, type_, value, borrow_t):
+        return cls(type_, value, borrow_t.start_pos, borrow_t.line, borrow_t.column, borrow_t.end_line, borrow_t.end_column, borrow_t.end_pos)
+
+    def __reduce__(self):
+        return (self.__class__, (self.type, self.value, self.start_pos, self.line, self.column))
+
+    def __repr__(self):
+        return 'Token(%r, %r)' % (self.type, self.value)
+
+    def __deepcopy__(self, memo):
+        return Token(self.type, self.value, self.start_pos, self.line, self.column)
+
+    def __eq__(self, other):
+        if isinstance(other, Token) and self.type != other.type:
+            return False
+
+        return Str.__eq__(self, other)
+
+    __hash__ = Str.__hash__
+
+
+class LineCounter:
+    __slots__ = 'char_pos', 'line', 'column', 'line_start_pos', 'newline_char'
+
+    def __init__(self, newline_char):
+        self.newline_char = newline_char
+        self.char_pos = 0
+        self.line = 1
+        self.column = 1
+        self.line_start_pos = 0
+
+    def __eq__(self, other):
+        if not isinstance(other, LineCounter):
+            return NotImplemented
+
+        return self.char_pos == other.char_pos and self.newline_char == other.newline_char
+
+    def feed(self, token, test_newline=True):
+        """Consume a token and calculate the new line & column.
+
+        As an optional optimization, set test_newline=False if token doesn't contain a newline.
+        """
+        if test_newline:
+            newlines = token.count(self.newline_char)
+            if newlines:
+                self.line += newlines
+                self.line_start_pos = self.char_pos + token.rindex(self.newline_char) + 1
+
+        self.char_pos += len(token)
+        self.column = self.char_pos - self.line_start_pos + 1
+
+
+class UnlessCallback:
+    def __init__(self, scanner):
+        self.scanner = scanner
+
+    def __call__(self, t):
+        res = self.scanner.match(t.value, 0)
+        if res:
+            _value, t.type = res
+        return t
+
+
+class CallChain:
+    def __init__(self, callback1, callback2, cond):
+        self.callback1 = callback1
+        self.callback2 = callback2
+        self.cond = cond
+
+    def __call__(self, t):
+        t2 = self.callback1(t)
+        return self.callback2(t) if self.cond(t2) else t2
+
+
+def _get_match(re_, regexp, s, flags):
+    m = re_.match(regexp, s, flags)
+    if m:
+        return m.group(0)
+
+def _create_unless(terminals, g_regex_flags, re_, use_bytes):
+    tokens_by_type = classify(terminals, lambda t: type(t.pattern))
+    assert len(tokens_by_type) <= 2, tokens_by_type.keys()
+    embedded_strs = set()
+    callback = {}
+    for retok in tokens_by_type.get(PatternRE, []):
+        unless = []
+        for strtok in tokens_by_type.get(PatternStr, []):
+            if strtok.priority > retok.priority:
+                continue
+            s = strtok.pattern.value
+            if s == _get_match(re_, retok.pattern.to_regexp(), s, g_regex_flags):
+                unless.append(strtok)
+                if strtok.pattern.flags <= retok.pattern.flags:
+                    embedded_strs.add(strtok)
+        if unless:
+            callback[retok.name] = UnlessCallback(Scanner(unless, g_regex_flags, re_, match_whole=True, use_bytes=use_bytes))
+
+    new_terminals = [t for t in terminals if t not in embedded_strs]
+    return new_terminals, callback
+
+
+
+class Scanner:
+    def __init__(self, terminals, g_regex_flags, re_, use_bytes, match_whole=False):
+        self.terminals = terminals
+        self.g_regex_flags = g_regex_flags
+        self.re_ = re_
+        self.use_bytes = use_bytes
+        self.match_whole = match_whole
+
+        self.allowed_types = {t.name for t in self.terminals}
+
+        self._mres = self._build_mres(terminals, len(terminals))
+
+    def _build_mres(self, terminals, max_size):
+        # Python sets an unreasonable group limit (currently 100) in its re module
+        # Worse, the only way to know we reached it is by catching an AssertionError!
+        # This function recursively tries less and less groups until it's successful.
+        postfix = '$' if self.match_whole else ''
+        mres = []
+        while terminals:
+            pattern = u'|'.join(u'(?P<%s>%s)' % (t.name, t.pattern.to_regexp() + postfix) for t in terminals[:max_size])
+            if self.use_bytes:
+                pattern = pattern.encode('latin-1')
+            try:
+                mre = self.re_.compile(pattern, self.g_regex_flags)
+            except AssertionError:  # Yes, this is what Python provides us.. :/
+                return self._build_mres(terminals, max_size//2)
+
+            mres.append((mre, {i: n for n, i in mre.groupindex.items()}))
+            terminals = terminals[max_size:]
+        return mres
+
+    def match(self, text, pos):
+        for mre, type_from_index in self._mres:
+            m = mre.match(text, pos)
+            if m:
+                return m.group(0), type_from_index[m.lastindex]
+
+
+def _regexp_has_newline(r):
+    r"""Expressions that may indicate newlines in a regexp:
+        - newlines (\n)
+        - escaped newline (\\n)
+        - anything but ([^...])
+        - any-char (.) when the flag (?s) exists
+        - spaces (\s)
+    """
+    return '\n' in r or '\\n' in r or '\\s' in r or '[^' in r or ('(?s' in r and '.' in r)
+
+
+class Lexer(object):
+    """Lexer interface
+
+    Method Signatures:
+        lex(self, text) -> Iterator[Token]
+    """
+    lex = NotImplemented
+
+    def make_lexer_state(self, text):
+        line_ctr = LineCounter(b'\n' if isinstance(text, bytes) else '\n')
+        return LexerState(text, line_ctr)
+
+
+class TraditionalLexer(Lexer):
+
+    def __init__(self, conf):
+        terminals = list(conf.terminals)
+        assert all(isinstance(t, TerminalDef) for t in terminals), terminals
+
+        self.re = conf.re_module
+
+        if not conf.skip_validation:
+            # Sanitization
+            for t in terminals:
+                try:
+                    self.re.compile(t.pattern.to_regexp(), conf.g_regex_flags)
+                except self.re.error:
+                    raise LexError("Cannot compile token %s: %s" % (t.name, t.pattern))
+
+                if t.pattern.min_width == 0:
+                    raise LexError("Lexer does not allow zero-width terminals. (%s: %s)" % (t.name, t.pattern))
+
+            if not (set(conf.ignore) <= {t.name for t in terminals}):
+                raise LexError("Ignore terminals are not defined: %s" % (set(conf.ignore) - {t.name for t in terminals}))
+
+        # Init
+        self.newline_types = frozenset(t.name for t in terminals if _regexp_has_newline(t.pattern.to_regexp()))
+        self.ignore_types = frozenset(conf.ignore)
+
+        terminals.sort(key=lambda x: (-x.priority, -x.pattern.max_width, -len(x.pattern.value), x.name))
+        self.terminals = terminals
+        self.user_callbacks = conf.callbacks
+        self.g_regex_flags = conf.g_regex_flags
+        self.use_bytes = conf.use_bytes
+        self.terminals_by_name = conf.terminals_by_name
+
+        self._scanner = None
+
+    def _build_scanner(self):
+        terminals, self.callback = _create_unless(self.terminals, self.g_regex_flags, self.re, self.use_bytes)
+        assert all(self.callback.values())
+
+        for type_, f in self.user_callbacks.items():
+            if type_ in self.callback:
+                # Already a callback there, probably UnlessCallback
+                self.callback[type_] = CallChain(self.callback[type_], f, lambda t: t.type == type_)
+            else:
+                self.callback[type_] = f
+
+        self._scanner = Scanner(terminals, self.g_regex_flags, self.re, self.use_bytes)
+
+    @property
+    def scanner(self):
+        if self._scanner is None:
+            self._build_scanner()
+        return self._scanner
+
+    def match(self, text, pos):
+        return self.scanner.match(text, pos)
+
+    def lex(self, state, parser_state):
+        with suppress(EOFError):
+            while True:
+                yield self.next_token(state, parser_state)
+
+    def next_token(self, lex_state, parser_state=None):
+        line_ctr = lex_state.line_ctr
+        while line_ctr.char_pos < len(lex_state.text):
+            res = self.match(lex_state.text, line_ctr.char_pos)
+            if not res:
+                allowed = self.scanner.allowed_types - self.ignore_types
+                if not allowed:
+                    allowed = {"<END-OF-FILE>"}
+                raise UnexpectedCharacters(lex_state.text, line_ctr.char_pos, line_ctr.line, line_ctr.column,
+                                           allowed=allowed, token_history=lex_state.last_token and [lex_state.last_token],
+                                           state=parser_state, terminals_by_name=self.terminals_by_name)
+
+            value, type_ = res
+
+            if type_ not in self.ignore_types:
+                t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column)
+                line_ctr.feed(value, type_ in self.newline_types)
+                t.end_line = line_ctr.line
+                t.end_column = line_ctr.column
+                t.end_pos = line_ctr.char_pos
+                if t.type in self.callback:
+                    t = self.callback[t.type](t)
+                    if not isinstance(t, Token):
+                        raise LexError("Callbacks must return a token (returned %r)" % t)
+                lex_state.last_token = t
+                return t
+            else:
+                if type_ in self.callback:
+                    t2 = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column)
+                    self.callback[type_](t2)
+                line_ctr.feed(value, type_ in self.newline_types)
+
+        # EOF
+        raise EOFError(self)
+
+
+class LexerState(object):
+    __slots__ = 'text', 'line_ctr', 'last_token'
+
+    def __init__(self, text, line_ctr, last_token=None):
+        self.text = text
+        self.line_ctr = line_ctr
+        self.last_token = last_token
+
+    def __eq__(self, other):
+        if not isinstance(other, LexerState):
+            return NotImplemented
+
+        return self.text is other.text and self.line_ctr == other.line_ctr and self.last_token == other.last_token
+
+    def __copy__(self):
+        return type(self)(self.text, copy(self.line_ctr), self.last_token)
+
+
+class ContextualLexer(Lexer):
+
+    def __init__(self, conf, states, always_accept=()):
+        terminals = list(conf.terminals)
+        terminals_by_name = conf.terminals_by_name
+
+        trad_conf = copy(conf)
+        trad_conf.terminals = terminals
+
+        lexer_by_tokens = {}
+        self.lexers = {}
+        for state, accepts in states.items():
+            key = frozenset(accepts)
+            try:
+                lexer = lexer_by_tokens[key]
+            except KeyError:
+                accepts = set(accepts) | set(conf.ignore) | set(always_accept)
+                lexer_conf = copy(trad_conf)
+                lexer_conf.terminals = [terminals_by_name[n] for n in accepts if n in terminals_by_name]
+                lexer = TraditionalLexer(lexer_conf)
+                lexer_by_tokens[key] = lexer
+
+            self.lexers[state] = lexer
+
+        assert trad_conf.terminals is terminals
+        self.root_lexer = TraditionalLexer(trad_conf)
+
+    def make_lexer_state(self, text):
+        return self.root_lexer.make_lexer_state(text)
+
+    def lex(self, lexer_state, parser_state):
+        try:
+            while True:
+                lexer = self.lexers[parser_state.position]
+                yield lexer.next_token(lexer_state, parser_state)
+        except EOFError:
+            pass
+        except UnexpectedCharacters as e:
+            # In the contextual lexer, UnexpectedCharacters can mean that the terminal is defined, but not in the current context.
+            # This tests the input against the global context, to provide a nicer error.
+            try:
+                last_token = lexer_state.last_token  # Save last_token. Calling root_lexer.next_token will change this to the wrong token
+                token = self.root_lexer.next_token(lexer_state, parser_state)
+                raise UnexpectedToken(token, e.allowed, state=parser_state, token_history=[last_token], terminals_by_name=self.root_lexer.terminals_by_name)
+            except UnexpectedCharacters:
+                raise e  # Raise the original UnexpectedCharacters. The root lexer raises it with the wrong expected set.
+
+class LexerThread(object):
+    """A thread that ties a lexer instance and a lexer state, to be used by the parser"""
+
+    def __init__(self, lexer, text):
+        self.lexer = lexer
+        self.state = lexer.make_lexer_state(text)
+
+    def lex(self, parser_state):
+        return self.lexer.lex(self.state, parser_state)
+
+    def __copy__(self):
+        copied = object.__new__(LexerThread)
+        copied.lexer = self.lexer
+        copied.state = copy(self.state)
+        return copied
+###}